asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/3] asterixdb git commit: Small Cleanup Towards Fixing LifeCycle Issues
Date Mon, 12 Sep 2016 09:13:35 GMT
Small Cleanup Towards Fixing LifeCycle Issues

Before this change, dataset lifecycle manager was providing a set
of functionalities that are loosly related to management of datasets
and indexes. However, it was not clear what the possible states of
a dataset or an index and what is the responsibility of each object.

This change takes the first step towards fixing this area. Indexes
of a dataset are now grouped together under a single lifecycle class

A resource aka dataset must be created outside the lifecycle manager
and registered with it before it can be assigned resources (memory)
and before it can be used by any operation. This is still not the
case.

Change-Id: I84005a33837725f41ae63297a3711215dccce1d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1148
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/11a02942
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/11a02942
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/11a02942

Branch: refs/heads/master
Commit: 11a02942f73fb982762ac2be3bc5642bda1eafdc
Parents: dd36605
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Mon Sep 12 10:17:03 2016 +0300
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Mon Sep 12 02:12:40 2016 -0700

----------------------------------------------------------------------
 .../active/message/ActiveManagerMessage.java    |   8 +-
 .../active/message/ActivePartitionMessage.java  |   8 +-
 ...rixAppRuntimeContextProviderForRecovery.java |   6 -
 .../app/nc/AsterixNCAppRuntimeContext.java      |  14 +-
 .../asterix/messaging/CCMessageBroker.java      |   5 +-
 .../asterix/messaging/NCMessageBroker.java      |  55 +-
 .../common/api/IAsterixAppRuntimeContext.java   |   2 -
 .../common/api/IDatasetLifecycleManager.java    |  18 +-
 .../common/context/BaseOperationTracker.java    |   1 -
 .../context/CorrelatedPrefixMergePolicy.java    |   4 +-
 .../CorrelatedPrefixMergePolicyFactory.java     |   4 +-
 .../asterix/common/context/DatasetInfo.java     | 187 +++++++
 .../common/context/DatasetLifecycleManager.java | 532 ++++++-------------
 .../asterix/common/context/DatasetResource.java | 142 +++++
 .../context/DatasetVirtualBufferCaches.java     |  83 +++
 .../asterix/common/context/IndexInfo.java       |  45 ++
 .../org/apache/asterix/common/context/Info.java |  53 ++
 .../context/PrimaryIndexOperationTracker.java   |   1 -
 .../messaging/AbstractApplicationMessage.java   |  36 --
 .../messaging/api/IApplicationMessage.java      |  24 +-
 .../api/IApplicationMessageCallback.java        |  30 --
 .../common/messaging/api/ICCMessageBroker.java  |   1 -
 .../common/messaging/api/INCMessageBroker.java  |   4 +-
 .../IAsterixAppRuntimeContextProvider.java      |   3 -
 .../apache/asterix/metadata/MetadataNode.java   |  14 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  10 +-
 .../management/ReplicationChannel.java          |   4 +-
 .../message/AbstractFailbackPlanMessage.java    |   4 +-
 .../message/CompleteFailbackRequestMessage.java |  10 +-
 .../CompleteFailbackResponseMessage.java        |   8 +-
 ...PreparePartitionsFailbackRequestMessage.java |  10 +-
 ...reparePartitionsFailbackResponseMessage.java |   4 +-
 .../runtime/message/ReplicaEventMessage.java    |   8 +-
 .../message/ReportMaxResourceIdMessage.java     |  10 +-
 .../ReportMaxResourceIdRequestMessage.java      |   8 +-
 .../message/ResourceIdRequestMessage.java       |  10 +-
 .../ResourceIdRequestResponseMessage.java       |  19 +-
 .../TakeoverMetadataNodeRequestMessage.java     |  10 +-
 .../TakeoverMetadataNodeResponseMessage.java    |   8 +-
 .../TakeoverPartitionsRequestMessage.java       |  14 +-
 .../TakeoverPartitionsResponseMessage.java      |   8 +-
 .../transaction/GlobalResourceIdFactory.java    |  21 +-
 .../runtime/util/AsterixClusterProperties.java  |   2 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 .../UpsertOperationCallbackFactory.java         |   6 +-
 .../service/recovery/CheckpointThread.java      |  28 +-
 .../service/recovery/RecoveryManager.java       |   4 +-
 .../transaction/TransactionSubsystem.java       |   9 +-
 .../locking/TestRuntimeContextProvider.java     |  10 +-
 .../apache/hyracks/api/messages/IMessage.java   |   3 -
 .../hyracks/api/messages/IMessageBroker.java    |   3 -
 .../helper/IndexLifecycleManagerProvider.java   |   4 +-
 .../examples/btree/helper/RuntimeContext.java   |   6 +-
 .../hyracks/storage/am/common/api/IIndex.java   |   2 +-
 .../am/common/api/IIndexLifecycleManager.java   |  38 --
 .../api/IIndexLifecycleManagerProvider.java     |   2 +-
 .../common/api/IResourceLifecycleManager.java   |  83 +++
 .../am/common/dataflow/IndexDataflowHelper.java |  11 +-
 .../common/dataflow/IndexLifecycleManager.java  |  10 +-
 .../dataflow/ExternalBTreeDataflowHelper.java   |   2 +-
 .../ExternalBTreeWithBuddyDataflowHelper.java   |   2 +-
 .../lsm/common/api/ILSMMergePolicyFactory.java  |   4 +-
 .../impls/ConstantMergePolicyFactory.java       |   4 +-
 .../lsm/common/impls/NoMergePolicyFactory.java  |   4 +-
 .../common/impls/PrefixMergePolicyFactory.java  |   4 +-
 .../dataflow/ExternalRTreeDataflowHelper.java   |   2 +-
 .../TestIndexLifecycleManagerProvider.java      |   7 +-
 .../TestStorageManagerComponentHolder.java      |  11 +-
 71 files changed, 962 insertions(+), 769 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 50fa257..392bec8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -22,12 +22,12 @@ import java.io.Serializable;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ActiveManagerMessage extends AbstractApplicationMessage {
+public class ActiveManagerMessage implements IApplicationMessage {
     public static final byte STOP_ACTIVITY = 0x00;
 
     private static final long serialVersionUID = 1L;
@@ -62,7 +62,7 @@ public class ActiveManagerMessage extends AbstractApplicationMessage {
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_MANAGER_MESSAGE";
+    public String toString() {
+        return ActiveManagerMessage.class.getSimpleName();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 02affc4..fc67d3c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -22,12 +22,12 @@ import java.io.Serializable;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class ActivePartitionMessage extends AbstractApplicationMessage {
+public class ActivePartitionMessage implements IApplicationMessage {
 
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
@@ -70,7 +70,7 @@ public class ActivePartitionMessage extends AbstractApplicationMessage {
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_ENTITY_TO_CC_MESSAGE";
+    public String toString() {
+        return ActivePartitionMessage.class.getSimpleName();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 265025f..ec0aaa8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -30,7 +30,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppRuntimeContextProvider {
 
@@ -76,11 +75,6 @@ public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppR
     }
 
     @Override
-    public IResourceIdFactory getResourceIdFactory() {
-        return asterixAppRuntimeContext.getResourceIdFactory();
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return asterixAppRuntimeContext.getIOManager();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 9127ee5..ce20c9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -60,7 +60,6 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
@@ -73,6 +72,7 @@ import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.IApplicationConfig;
@@ -105,6 +105,7 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
+    private final IResourceIdFactory resourceIdFactory;
 
     private AsterixCompilerProperties compilerProperties;
     private AsterixExternalProperties externalProperties;
@@ -124,7 +125,6 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
-    private IResourceIdFactory resourceIdFactory;
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
@@ -171,6 +171,7 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
         }
         allExtensions.addAll(new AsterixExtensionProperties(propertiesAccessor).getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 
     @Override
@@ -193,7 +194,7 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+                        ioManager, ncApplicationContext.getNodeId(), metadataProperties);
 
         localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
                 .createRepository();
@@ -209,7 +210,6 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
             //delete any storage data before the resource factory is initialized
             localResourceRepository.deleteStorageData(true);
         }
-        initializeResourceIdFactory();
 
         datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
@@ -290,6 +290,7 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
         lccm.register((ILifeCycleComponent) datasetLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+
     }
 
     @Override
@@ -442,11 +443,6 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
     }
 
     @Override
-    public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
-    }
-
-    @Override
     public void initializeMetadata(boolean newUniverse) throws Exception {
         IAsterixStateProxy proxy;
         if (LOGGER.isLoggable(Level.INFO)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1345aa7..d1d7ff7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.hyracks.api.messages.IMessage;
@@ -41,9 +40,9 @@ public class CCMessageBroker implements ICCMessageBroker {
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ccs);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 9851b61..1beff82 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -20,19 +20,14 @@ package org.apache.asterix.messaging;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,8 +39,6 @@ public class NCMessageBroker implements INCMessageBroker {
     private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 
     private final NodeControllerService ncs;
-    private final AtomicLong messageId = new AtomicLong(0);
-    private final Map<Long, IApplicationMessageCallback> callbacks;
     private final IAsterixAppRuntimeContext appContext;
     private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
@@ -54,7 +47,6 @@ public class NCMessageBroker implements INCMessageBroker {
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
         this.ncs = ncs;
         appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
-        callbacks = new ConcurrentHashMap<>();
         maxMsgSize = messagingProperties.getFrameSize();
         int messagingMemoryBudget = messagingProperties.getFrameSize() * messagingProperties.getFrameCount();
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
@@ -65,27 +57,15 @@ public class NCMessageBroker implements INCMessageBroker {
     }
 
     @Override
-    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+    public void sendMessageToCC(IApplicationMessage message) throws Exception {
+        ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
     }
 
     @Override
-    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+    public void sendMessageToNC(String nodeId, IApplicationMessage message)
             throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
-            sendMessageToChannel(messagingChannel, message);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+        IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
+        sendMessageToChannel(messagingChannel, message);
     }
 
     @Override
@@ -95,14 +75,9 @@ public class NCMessageBroker implements INCMessageBroker {
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
-        }
-        //if the received message is a response to a sent message, deliver it to the sender
-        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-        if (callback != null) {
-            callback.deliverMessageResponse(absMessage);
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ncs);
     }
@@ -111,18 +86,6 @@ public class NCMessageBroker implements INCMessageBroker {
         return messagingFramePool;
     }
 
-    private void registerMsgCallback(IApplicationMessage message, IApplicationMessageCallback callback) {
-        if (callback != null) {
-            long uniqueMessageId = messageId.incrementAndGet();
-            message.setId(uniqueMessageId);
-            callbacks.put(uniqueMessageId, callback);
-        }
-    }
-
-    private void handleMsgDeliveryFailure(IApplicationMessage message) {
-        callbacks.remove(message.getId());
-    }
-
     private void sendMessageToChannel(IChannelControlBlock ccb, IApplicationMessage msg) throws IOException {
         byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
         if (serializedMsg.length > maxMsgSize) {
@@ -161,8 +124,8 @@ public class NCMessageBroker implements INCMessageBroker {
                     Thread.currentThread().interrupt();
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
-                        LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
-                                + msg.type(), e);
+                        LOGGER.log(Level.WARNING, "Could not process message : "
+                                + msg, e);
                     } else {
                         if (LOGGER.isLoggable(Level.WARNING)) {
                             LOGGER.log(Level.WARNING, "Could not process message", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 046d5c1..c009152 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -86,8 +86,6 @@ public interface IAsterixAppRuntimeContext {
 
     public ILibraryManager getLibraryManager();
 
-    public void initializeResourceIdFactory() throws HyracksDataException;
-
     /**
      * Exports the metadata node to the metadata RMI port.
      *

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 552ce22..04e30ff 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,22 +20,22 @@ package org.apache.asterix.common.api;
 
 import java.util.List;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 
-public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
+public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
     /**
-     * @param datasetID
-     * @param resourceID
+     * @param datasetId
+     * @param indexId
      * @return The corresponding index, or null if it is not found in the registered indexes.
      * @throws HyracksDataException
      */
-    IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
+    IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
 
     /**
      * Flushes all open datasets synchronously.
@@ -75,7 +75,7 @@ public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
      * @param datasetID
      * @return
      */
-    ILSMOperationTracker getOperationTracker(int datasetID);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetID);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 21500b7..5c1d094 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.context;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 3d112ef..70339f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -45,7 +45,7 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
     private final IDatasetLifecycleManager datasetLifecycleManager;
     private final int datasetID;
 
-    public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
+    public CorrelatedPrefixMergePolicy(IResourceLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index ce405fc..3b65123 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -27,7 +27,7 @@ import java.util.Set;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -61,7 +61,7 @@ public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactor
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(ilcm, datasetID);
         policy.configure(properties);
         return policy;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
new file mode 100644
index 0000000..6a2cc56
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+    private final Map<Long, IndexInfo> indexes;
+    private final int datasetID;
+    private long lastAccess;
+    private int numActiveIOOps;
+    private boolean isExternal;
+    private boolean isRegistered;
+    private boolean memoryAllocated;
+    private boolean durable;
+
+    public DatasetInfo(int datasetID) {
+        this.indexes = new HashMap<>();
+        this.setLastAccess(-1);
+        this.datasetID = datasetID;
+        this.setRegistered(false);
+        this.setMemoryAllocated(false);
+    }
+
+    @Override
+    public void touch() {
+        super.touch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    @Override
+    public void untouch() {
+        super.untouch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    public synchronized void declareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() + 1);
+    }
+
+    public synchronized void undeclareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() - 1);
+        //notify threads waiting on this dataset info
+        notifyAll();
+    }
+
+    public synchronized Set<ILSMIndex> getDatasetIndexes() {
+        Set<ILSMIndex> datasetIndexes = new HashSet<>();
+        for (IndexInfo iInfo : getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                datasetIndexes.add(iInfo.getIndex());
+            }
+        }
+
+        return datasetIndexes;
+    }
+
+    @Override
+    public int compareTo(DatasetInfo i) {
+        // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+        //
+        // Example sort order:
+        // -------------------
+        // (F, 0, 70)       <-- largest
+        // (F, 0, 60)
+        // (T, 10, 80)
+        // (T, 10, 70)
+        // (T, 9, 90)
+        // (T, 0, 100)      <-- smallest
+        if (isOpen() && !i.isOpen()) {
+            return -1;
+        } else if (!isOpen() && i.isOpen()) {
+            return 1;
+        } else {
+            if (getReferenceCount() < i.getReferenceCount()) {
+                return -1;
+            } else if (getReferenceCount() > i.getReferenceCount()) {
+                return 1;
+            } else {
+                if (getLastAccess() < i.getLastAccess()) {
+                    return -1;
+                } else if (getLastAccess() > i.getLastAccess()) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetInfo) {
+            return datasetID == ((DatasetInfo) obj).datasetID;
+        }
+        return false;
+    };
+
+    @Override
+    public int hashCode() {
+        return datasetID;
+    }
+
+    @Override
+    public String toString() {
+        return "DatasetID: " + getDatasetID() + ", isOpen: " + isOpen() + ", refCount: " + getReferenceCount()
+                + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", memoryAllocated: "
+                + isMemoryAllocated() + ", isDurable: " + isDurable();
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public int getNumActiveIOOps() {
+        return numActiveIOOps;
+    }
+
+    public void setNumActiveIOOps(int numActiveIOOps) {
+        this.numActiveIOOps = numActiveIOOps;
+    }
+
+    public boolean isExternal() {
+        return isExternal;
+    }
+
+    public void setExternal(boolean isExternal) {
+        this.isExternal = isExternal;
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return indexes;
+    }
+
+    public boolean isRegistered() {
+        return isRegistered;
+    }
+
+    public void setRegistered(boolean isRegistered) {
+        this.isRegistered = isRegistered;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
+
+    public int getDatasetID() {
+        return datasetID;
+    }
+
+    public boolean isMemoryAllocated() {
+        return memoryAllocated;
+    }
+
+    public void setMemoryAllocated(boolean memoryAllocated) {
+        this.memoryAllocated = memoryAllocated;
+    }
+
+    public long getLastAccess() {
+        return lastAccess;
+    }
+
+    public void setLastAccess(long lastAccess) {
+        this.lastAccess = lastAccess;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f4eec05..698dfb4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -22,11 +22,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
@@ -40,22 +38,16 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
+    private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
     private final AsterixStorageProperties storageProperties;
-    private final Map<Integer, DatasetVirtualBufferCaches> datasetVirtualBufferCachesMap;
-    private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
-    private final Map<Integer, DatasetInfo> datasetInfos;
     private final ILocalResourceRepository resourceRepository;
     private final int firstAvilableUserDatasetID;
     private final long capacity;
@@ -63,7 +55,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private final ILogManager logManager;
     private final LogRecord logRecord;
     private final int numPartitions;
-    private boolean stopped = false;
+    private volatile boolean stopped = false;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
             ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
@@ -73,16 +65,13 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         this.resourceRepository = resourceRepository;
         this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
         this.numPartitions = numPartitions;
-        datasetVirtualBufferCachesMap = new HashMap<>();
-        datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
-        datasetInfos = new HashMap<Integer, DatasetInfo>();
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+    public synchronized IIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -92,15 +81,11 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     @Override
     public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
-            return null;
-        }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
-        if (iInfo == null) {
+        DatasetResource datasetResource = datasets.get(datasetID);
+        if (datasetResource == null) {
             return null;
         }
-        return iInfo.index;
+        return datasetResource.getIndex(resourceID);
     }
 
     @Override
@@ -108,20 +93,11 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
-            dsInfo = getDatasetInfo(did);
-        }
-        if (!dsInfo.isRegistered) {
-            dsInfo.isExternal = !index.hasMemoryComponents();
-            dsInfo.isRegistered = true;
-            dsInfo.durable = ((ILSMIndex) index).isDurable();
+        DatasetResource datasetResource = datasets.get(did);
+        if (datasetResource == null) {
+            datasetResource = getDatasetLifecycle(did);
         }
-
-        if (dsInfo.indexes.containsKey(resourceID)) {
-            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
-        }
-        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
+        datasetResource.register(resourceID, index);
     }
 
     public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -146,24 +122,25 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        IndexInfo iInfo = dsInfo == null ? null : dsInfo.indexes.get(resourceID);
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-        if (dsInfo == null || iInfo == null) {
+        if (dsr == null || iInfo == null) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
         }
 
-        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
-        if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
             throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
-                    + iInfo.referenceCount + ", Operation tracker number of active operations = "
+                    + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
                     + opTracker.getNumActiveOperations() + ")");
         }
 
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
         // First wait for any ongoing IO operations
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -173,16 +150,17 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             }
         }
 
-        if (iInfo.isOpen) {
-            ILSMOperationTracker indexOpTracker = iInfo.index.getOperationTracker();
+        if (iInfo.isOpen()) {
+            ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
             synchronized (indexOpTracker) {
-                iInfo.index.deactivate(false);
+                iInfo.getIndex().deactivate(false);
             }
         }
 
-        dsInfo.indexes.remove(resourceID);
-        if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
-            removeDatasetFromCache(dsInfo.datasetID);
+        dsInfo.getIndexes().remove(resourceID);
+        if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                && !dsInfo.isExternal()) {
+            removeDatasetFromCache(dsInfo.getDatasetID());
         }
     }
 
@@ -192,29 +170,28 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null || !dsInfo.isRegistered) {
+        DatasetResource dsr = datasets.get(did);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        if (dsInfo == null || !dsInfo.isRegistered()) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
 
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsInfo.getIndexes().get(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
-        if (!dsInfo.isOpen && !dsInfo.isExternal) {
-            initializeDatasetVirtualBufferCache(did);
-        }
 
-        dsInfo.isOpen = true;
-        dsInfo.touch();
-        if (!iInfo.isOpen) {
-            ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        dsr.open(true);
+        dsr.touch();
+
+        if (!iInfo.isOpen()) {
+            ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
             synchronized (opTracker) {
-                iInfo.index.activate();
+                iInfo.getIndex().activate();
             }
-            iInfo.isOpen = true;
+            iInfo.setOpen(true);
         }
         iInfo.touch();
     }
@@ -225,14 +202,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
          * that is not being used (refcount == 0) and has been least recently used, excluding metadata datasets.
          * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
          */
-        List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
-        Collections.sort(datasetInfosList);
-        for (DatasetInfo dsInfo : datasetInfosList) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
-                    .get(dsInfo.datasetID);
-            if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
-                    && dsInfo.isOpen && dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
+        Collections.sort(datasetsResources);
+        for (DatasetResource dsr : datasetsResources) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            if (opTracker != null && opTracker.getNumActiveOperations() == 0
+                    && dsr.getDatasetInfo().getReferenceCount() == 0
+                    && dsr.getDatasetInfo().isOpen()
+                    && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
                 return true;
             }
         }
@@ -240,15 +218,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
-        if (iInfo.isOpen) {
-            ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+        if (iInfo.isOpen()) {
+            ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+            accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
         }
 
         // Wait for the above flush op.
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -259,52 +237,64 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         }
     }
 
-    @Override
-    public DatasetInfo getDatasetInfo(int datasetID) {
-        synchronized (datasetInfos) {
-            DatasetInfo dsInfo = datasetInfos.get(datasetID);
-            if (dsInfo == null) {
-                dsInfo = new DatasetInfo(datasetID);
-                datasetInfos.put(datasetID, dsInfo);
+    public DatasetResource getDatasetLifecycle(int did) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr != null) {
+            return dsr;
+        }
+        synchronized (datasets) {
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                DatasetInfo dsInfo = new DatasetInfo(did);
+                PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
+                DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
+                        getFirstAvilableUserDatasetID(),
+                        getNumPartitions());
+                dsr = new DatasetResource(dsInfo, opTracker, vbcs);
+                datasets.put(did, dsr);
             }
-            return dsInfo;
+            return dsr;
         }
     }
 
     @Override
+    public DatasetInfo getDatasetInfo(int datasetID) {
+        return getDatasetLifecycle(datasetID).getDatasetInfo();
+    }
+
+    @Override
     public synchronized void close(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsr.getIndexInfo(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
         iInfo.untouch();
-        dsInfo.untouch();
+        dsr.untouch();
     }
 
     @Override
-    public synchronized List<IIndex> getOpenIndexes() {
+    public synchronized List<IIndex> getOpenResources() {
         List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
         List<IIndex> openIndexes = new ArrayList<IIndex>();
         for (IndexInfo iInfo : openIndexesInfo) {
-            openIndexes.add(iInfo.index);
+            openIndexes.add(iInfo.getIndex());
         }
         return openIndexes;
     }
 
     @Override
     public synchronized List<IndexInfo> getOpenIndexesInfo() {
-        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                if (iInfo.isOpen) {
+        List<IndexInfo> openIndexesInfo = new ArrayList<>();
+        for (DatasetResource dsr : datasets.values()) {
+            for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                if (iInfo.isOpen()) {
                     openIndexesInfo.add(iInfo);
                 }
             }
@@ -313,46 +303,23 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches vbcs = datasetVirtualBufferCachesMap.get(datasetID);
-            if (vbcs == null) {
-                vbcs = initializeDatasetVirtualBufferCache(datasetID);
-            }
-            return vbcs;
-        }
+        return getDatasetLifecycle(datasetID).getVirtualBufferCaches();
     }
 
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
         DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
-        return dvbcs.getVirtualBufferCaches(ioDeviceNum);
+        return dvbcs.getVirtualBufferCaches(this, ioDeviceNum);
     }
 
     private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
         deallocateDatasetMemory(datasetID);
-        datasetInfos.remove(datasetID);
-        datasetVirtualBufferCachesMap.remove(datasetID);
-        datasetOpTrackers.remove(datasetID);
-    }
-
-    private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches dvbcs = new DatasetVirtualBufferCaches(datasetID);
-            datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
-            return dvbcs;
-        }
+        datasets.remove(datasetID);
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(int datasetID) {
-        synchronized (datasetOpTrackers) {
-            ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
-            if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(datasetID, logManager, getDatasetInfo(datasetID));
-                datasetOpTrackers.put(datasetID, opTracker);
-            }
-            return opTracker;
-        }
+    public PrimaryIndexOperationTracker getOperationTracker(int datasetID) {
+        return datasets.get(datasetID).getOpTracker();
     }
 
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -361,145 +328,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         }
     }
 
-    private static abstract class Info {
-        protected int referenceCount;
-        protected boolean isOpen;
-
-        public Info() {
-            referenceCount = 0;
-            isOpen = false;
-        }
-
-        public void touch() {
-            ++referenceCount;
-        }
-
-        public void untouch() {
-            --referenceCount;
-        }
-    }
-
-    public static class IndexInfo extends Info {
-        private final ILSMIndex index;
-        private final long resourceId;
-        private final int datasetId;
-
-        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
-            this.index = index;
-            this.datasetId = datasetId;
-            this.resourceId = resourceId;
-        }
-
-        public ILSMIndex getIndex() {
-            return index;
-        }
-
-        public long getResourceId() {
-            return resourceId;
-        }
-
-        public int getDatasetId() {
-            return datasetId;
-        }
-    }
-
-    public static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
-        private final Map<Long, IndexInfo> indexes;
-        private final int datasetID;
-        private long lastAccess;
-        private int numActiveIOOps;
-        private boolean isExternal;
-        private boolean isRegistered;
-        private boolean memoryAllocated;
-        private boolean durable;
-
-        public DatasetInfo(int datasetID) {
-            this.indexes = new HashMap<Long, IndexInfo>();
-            this.lastAccess = -1;
-            this.datasetID = datasetID;
-            this.isRegistered = false;
-            this.memoryAllocated = false;
-        }
-
-        @Override
-        public void touch() {
-            super.touch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        @Override
-        public void untouch() {
-            super.untouch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        public synchronized void declareActiveIOOperation() {
-            numActiveIOOps++;
-        }
-
-        public synchronized void undeclareActiveIOOperation() {
-            numActiveIOOps--;
-            //notify threads waiting on this dataset info
-            notifyAll();
-        }
-
-        public synchronized Set<ILSMIndex> getDatasetIndexes() {
-            Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
-            for (IndexInfo iInfo : indexes.values()) {
-                if (iInfo.isOpen) {
-                    datasetIndexes.add(iInfo.index);
-                }
-            }
-
-            return datasetIndexes;
-        }
-
-        @Override
-        public int compareTo(DatasetInfo i) {
-            // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
-            //
-            // Example sort order:
-            // -------------------
-            // (F, 0, 70)       <-- largest
-            // (F, 0, 60)
-            // (T, 10, 80)
-            // (T, 10, 70)
-            // (T, 9, 90)
-            // (T, 0, 100)      <-- smallest
-            if (isOpen && !i.isOpen) {
-                return -1;
-            } else if (!isOpen && i.isOpen) {
-                return 1;
-            } else {
-                if (referenceCount < i.referenceCount) {
-                    return -1;
-                } else if (referenceCount > i.referenceCount) {
-                    return 1;
-                } else {
-                    if (lastAccess < i.lastAccess) {
-                        return -1;
-                    } else if (lastAccess > i.lastAccess) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }
-            }
-
-        }
-
-        @Override
-        public String toString() {
-            return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
-                    + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
-                    + memoryAllocated + ", isDurable: " + durable;
-        }
-
-        public boolean isDurable() {
-            return durable;
-        }
-    }
-
     @Override
     public synchronized void start() {
         used = 0;
@@ -507,30 +335,29 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            flushDatasetOpenIndexes(dsInfo, false);
+        for (DatasetResource dsr : datasets.values()) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
         }
     }
 
     @Override
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
-        DatasetInfo datasetInfo = datasetInfos.get(datasetId);
-        if (datasetInfo != null) {
-            flushDatasetOpenIndexes(datasetInfo, asyncFlush);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr != null) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), asyncFlush);
         }
     }
 
     @Override
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
-                    dsInfo.datasetID);
+        for (DatasetResource dsr : datasets.values()) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
             synchronized (opTracker) {
-                for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                             .getIOOperationCallback();
-                    if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
+                    if (!(((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()
                             || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
                             || opTracker.isFlushOnExit())) {
                         long firstLSN = ioCallback.getFirstLSN();
@@ -552,10 +379,10 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal && dsInfo.durable) {
+        if (!dsInfo.isExternal() && dsInfo.isDurable()) {
             synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
-                        dsInfo.indexes.size());
+                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
+                        dsInfo.getIndexes().size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -569,22 +396,22 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                     throw new HyracksDataException(e);
                 }
             }
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 //update resource lsn
-                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                         .getIOOperationCallback();
                 ioOpCallback.updateLastLSN(logRecord.getLSN());
             }
         }
 
         if (asyncFlush) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+                ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
-                accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+                accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
             }
         } else {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 // TODO: This is not efficient since we flush the indexes sequentially.
                 // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
                 // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
@@ -596,7 +423,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
         // First wait for any ongoing IO operations
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     dsInfo.wait();
                 } catch (InterruptedException e) {
@@ -609,34 +436,33 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        for (IndexInfo iInfo : dsInfo.indexes.values()) {
-            if (iInfo.isOpen) {
-                ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
                 synchronized (opTracker) {
-                    iInfo.index.deactivate(false);
+                    iInfo.getIndex().deactivate(false);
                 }
-                iInfo.isOpen = false;
+                iInfo.setOpen(false);
             }
-            assert iInfo.referenceCount == 0;
         }
-        removeDatasetFromCache(dsInfo.datasetID);
-        dsInfo.isOpen = false;
+        removeDatasetFromCache(dsInfo.getDatasetID());
+        dsInfo.setOpen(false);
     }
 
     @Override
     public synchronized void closeAllDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            closeDataset(dsr.getDatasetInfo());
         }
     }
 
     @Override
     public synchronized void closeUserDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            if (dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
             }
         }
     }
@@ -652,9 +478,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
         closeAllDatasets();
 
-        datasetVirtualBufferCachesMap.clear();
-        datasetOpTrackers.clear();
-        datasetInfos.clear();
+        datasets.clear();
         stopped = true;
     }
 
@@ -673,57 +497,43 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
 
         sb.append("[Datasets]\n");
         sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
             sb.append(
-                    String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount, dsInfo.lastAccess));
+                    String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(),
+                            dsInfo.getLastAccess()));
         }
         sb.append("\n");
 
         sb.append("[Indexes]\n");
         sb.append(String.format(idxHeaderFormat, "DatasetID", "ResourceID", "Open", "Reference Count", "Index"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
+            for (Map.Entry<Long, IndexInfo> entry : dsInfo.getIndexes().entrySet()) {
                 IndexInfo iInfo = entry.getValue();
-                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen, iInfo.referenceCount,
-                        iInfo.index));
+                sb.append(String.format(idxFormat, dsInfo.getDatasetID(), entry.getKey(), iInfo.isOpen(),
+                        iInfo.getReferenceCount(),
+                        iInfo.getIndex()));
             }
         }
-
         outputStream.write(sb.toString().getBytes());
     }
 
-    private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
-        if (dsInfo == null) {
+    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
             throw new HyracksDataException(
                     "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-        synchronized (dsInfo) {
-            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
-            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
-                long additionalSize = getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                while (used + additionalSize > capacity) {
-                    if (!evictCandidateDataset()) {
-                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
-                                + " memory since memory budget would be exceeded.");
-                    }
-                }
-                used += additionalSize;
-                dsInfo.memoryAllocated = true;
-            }
-        }
-    }
-
-    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null) {
             throw new HyracksDataException(
                     "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
         synchronized (dsInfo) {
-            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
-                used -= getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                dsInfo.memoryAllocated = false;
+            if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
+                used -= getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                dsInfo.setMemoryAllocated(false);
             }
         }
     }
@@ -731,54 +541,34 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     @Override
     public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourcePath);
-        allocateDatasetMemory(did);
-    }
-
-    private class DatasetVirtualBufferCaches {
-        private final int datasetID;
-        private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
-
-        public DatasetVirtualBufferCaches(int datasetID) {
-            this.datasetID = datasetID;
-        }
-
-        private List<IVirtualBufferCache> initializeVirtualBufferCaches(int ioDeviceNum) {
-            assert ioDeviceVirtualBufferCaches.size() < numPartitions;
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-            List<IVirtualBufferCache> vbcs = new ArrayList<>();
-            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                        new VirtualBufferCache(
-                                new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                        Integer.toString(datasetID)),
-                                storageProperties.getMemoryComponentPageSize(),
-                                numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
-                vbcs.add(vbc);
-            }
-            ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
-            return vbcs;
+        int datasetId = Integer.parseInt(resourcePath);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
+            throw new HyracksDataException(
+                    "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-
-        public List<IVirtualBufferCache> getVirtualBufferCaches(int ioDeviceNum) {
-            synchronized (ioDeviceVirtualBufferCaches) {
-                List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
-                if (vbcs == null) {
-                    vbcs = initializeVirtualBufferCaches(ioDeviceNum);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        synchronized (dsInfo) {
+            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
+            if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
+                long additionalSize = getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                while (used + additionalSize > capacity) {
+                    if (!evictCandidateDataset()) {
+                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
+                                + " memory since memory budget would be exceeded.");
+                    }
                 }
-                return vbcs;
+                used += additionalSize;
+                dsInfo.setMemoryAllocated(true);
             }
         }
+    }
 
-        public long getTotalSize() {
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-
-            return storageProperties.getMemoryComponentPageSize() * numPages;
-        }
+    public int getFirstAvilableUserDatasetID() {
+        return firstAvilableUserDatasetID;
     }
 
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
new file mode 100644
index 0000000..01f2e2b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+/**
+ * A dataset can be in one of two states { EVICTED , LOADED }.
+ * When a dataset is created, it is in the EVICTED state. In the EVICTED state, the allowed operations are:
+ * 1. DELETE: delete the dataset completely.
+ * 2. LOAD: move the dataset to the LOADED state.
+ * When a dataset is in the LOADED state, the allowed operations are:
+ * 1. OPEN: increment the open counter.
+ * 2. ALLOCATE RESOURCES (memory)
+ * 3. DEALLOCATE RESOURCES (memory)
+ * 4. CLOSE: decrement the open counter.
+ * 5. EVICT: deallocate resources and unload the dataset moving it to the EVICTED state
+ */
+public class DatasetResource implements Comparable<DatasetResource> {
+    private final DatasetInfo datasetInfo;
+    private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
+    private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
+
+    public DatasetResource(DatasetInfo datasetInfo,
+            PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+            DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+        this.datasetInfo = datasetInfo;
+        this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
+        this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
+    }
+
+    public boolean isRegistered() {
+        return datasetInfo.isRegistered();
+    }
+
+    public IndexInfo getIndexInfo(long resourceID) {
+        return datasetInfo.getIndexes().get(resourceID);
+    }
+
+    public boolean isOpen() {
+        return datasetInfo.isOpen();
+    }
+
+    public boolean isExternal() {
+        return datasetInfo.isExternal();
+    }
+
+    public void open(boolean open) {
+        datasetInfo.setOpen(open);
+    }
+
+    public void touch() {
+        datasetInfo.touch();
+    }
+
+    public void untouch() {
+        datasetInfo.untouch();
+    }
+
+    public DatasetVirtualBufferCaches getVirtualBufferCaches() {
+        return datasetVirtualBufferCaches;
+    }
+
+    public IIndex getIndex(long resourceID) {
+        IndexInfo iInfo = getIndexInfo(resourceID);
+        if (iInfo == null) {
+            return null;
+        }
+        return iInfo.getIndex();
+    }
+
+    public void register(long resourceID, IIndex index) throws HyracksDataException {
+        if (!datasetInfo.isRegistered()) {
+            synchronized (datasetInfo) {
+                if (!datasetInfo.isRegistered()) {
+                    datasetInfo.setExternal(!index.hasMemoryComponents());
+                    datasetInfo.setRegistered(true);
+                    datasetInfo.setDurable(((ILSMIndex) index).isDurable());
+                }
+            }
+        }
+        if (datasetInfo.getIndexes().containsKey(resourceID)) {
+            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+        }
+        datasetInfo.getIndexes().put(resourceID,
+                new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID));
+    }
+
+    public DatasetInfo getDatasetInfo() {
+        return datasetInfo;
+    }
+
+    public PrimaryIndexOperationTracker getOpTracker() {
+        return datasetPrimaryOpTracker;
+    }
+
+    @Override
+    public int compareTo(DatasetResource o) {
+        return datasetInfo.compareTo(o.datasetInfo);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetResource) {
+            return datasetInfo.equals(((DatasetResource) obj).datasetInfo);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return datasetInfo.hashCode();
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return datasetInfo.getIndexes();
+    }
+
+    public int getDatasetID() {
+        return datasetInfo.getDatasetID();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11a02942/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
new file mode 100644
index 0000000..df85509
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
+
+public class DatasetVirtualBufferCaches {
+    private final int datasetID;
+    private final AsterixStorageProperties storageProperties;
+    private final int firstAvilableUserDatasetID;
+    private final int numPartitions;
+    private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
+
+    public DatasetVirtualBufferCaches(int datasetID, AsterixStorageProperties storageProperties,
+            int firstAvilableUserDatasetID, int numPartitions) {
+        this.datasetID = datasetID;
+        this.storageProperties = storageProperties;
+        this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
+        this.numPartitions = numPartitions;
+    }
+
+    public List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
+            int ioDeviceNum) {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        List<IVirtualBufferCache> vbcs = new ArrayList<>();
+        for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+            MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+                    new VirtualBufferCache(
+                            new ResourceHeapBufferAllocator(memoryManager,
+                                    Integer.toString(datasetID)),
+                            storageProperties.getMemoryComponentPageSize(),
+                            numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
+            vbcs.add(vbc);
+        }
+        ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+        return vbcs;
+    }
+
+    public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
+        synchronized (ioDeviceVirtualBufferCaches) {
+            List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
+            if (vbcs == null) {
+                vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum);
+            }
+            return vbcs;
+        }
+    }
+
+    public long getTotalSize() {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        return storageProperties.getMemoryComponentPageSize() * ((long) numPages);
+    }
+}


Mime
View raw message