asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1058: use ResourceHeapBufferAllocator for dataset ...
Date Wed, 11 Nov 2015 08:12:09 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/481

Change subject: ASTERIXDB-1058: use ResourceHeapBufferAllocator for dataset memory components.
......................................................................

ASTERIXDB-1058: use ResourceHeapBufferAllocator for dataset memory components.

Change-Id: Ifd90fabc79e61f84370d415c38917b998db41466
---
M asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
3 files changed, 32 insertions(+), 33 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/81/481/1

diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 86e6db5..e1e6d96 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -37,13 +37,6 @@
     IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
 
     /**
-     * Allocates the memory budget of the dataset in the virtual buffer cache.
-     * @param datasetID
-     * @throws HyracksDataException
-     */
-    void allocateDatasetMemory(int datasetID) throws HyracksDataException;
-
-    /**
      * Flushes all open datasets synchronously.
      * @throws HyracksDataException
      */
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2e7b69..78ec686 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -46,7 +46,7 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -185,14 +185,14 @@
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null || !dsInfo.isRegistered) {
-            throw new HyracksDataException("Failed to open index with resource ID " + resourceID
-                    + " since it does not exist.");
+            throw new HyracksDataException(
+                    "Failed to open index with resource ID " + resourceID + " since it does
not exist.");
         }
 
         IndexInfo iInfo = dsInfo.indexes.get(resourceID);
         if (iInfo == null) {
-            throw new HyracksDataException("Failed to open index with resource ID " + resourceID
-                    + " since it does not exist.");
+            throw new HyracksDataException(
+                    "Failed to open index with resource ID " + resourceID + " since it does
not exist.");
         }
         if (!dsInfo.isOpen && !dsInfo.isExternal) {
             initializeDatasetVirtualBufferCache(did);
@@ -314,12 +314,14 @@
     private void initializeDatasetVirtualBufferCache(int datasetID) {
         List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
         synchronized (datasetVirtualBufferCaches) {
-            int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
-                    .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
+            int numPages = datasetID < firstAvilableUserDatasetID
+                    ? storageProperties.getMetadataMemoryComponentNumPages()
+                    : storageProperties.getMemoryComponentNumPages();
             for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new
VirtualBufferCache(
-                        new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(),
numPages
-                                / storageProperties.getMemoryComponentsNum()));
+                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+                        new VirtualBufferCache(new ResourceHeapBufferAllocator(this, Integer.toString(datasetID)),
+                                storageProperties.getMemoryComponentPageSize(),
+                                numPages / storageProperties.getMemoryComponentsNum()));
                 vbcs.add(vbc);
             }
             datasetVirtualBufferCaches.put(datasetID, vbcs);
@@ -488,14 +490,15 @@
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws
HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
         for (DatasetInfo dsInfo : datasetInfos.values()) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(dsInfo.datasetID);
+            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
+                    dsInfo.datasetID);
             synchronized (opTracker) {
                 for (IndexInfo iInfo : dsInfo.indexes.values()) {
                     AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)
iInfo.index
                             .getIOOperationCallback();
                     if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
-                            || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
|| opTracker
-                                .isFlushOnExit())) {
+                            || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
+                            || opTracker.isFlushOnExit())) {
                         long firstLSN = ioCallback.getFirstLSN();
                         if (firstLSN < targetLSN) {
                             opTracker.setFlushOnExit(true);
@@ -616,8 +619,8 @@
         sb.append("[Datasets]\n");
         sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last
Access"));
         for (DatasetInfo dsInfo : datasetInfos.values()) {
-            sb.append(String
-                    .format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount,
dsInfo.lastAccess));
+            sb.append(
+                    String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount,
dsInfo.lastAccess));
         }
         sb.append("\n");
 
@@ -626,20 +629,19 @@
         for (DatasetInfo dsInfo : datasetInfos.values()) {
             for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
                 IndexInfo iInfo = entry.getValue();
-                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen,
-                        iInfo.referenceCount, iInfo.index));
+                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen,
iInfo.referenceCount,
+                        iInfo.index));
             }
         }
 
         outputStream.write(sb.toString().getBytes());
     }
 
-    @Override
-    public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException
{
+    private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException
{
         DatasetInfo dsInfo = datasetInfos.get(datasetId);
         if (dsInfo == null) {
-            throw new HyracksDataException("Failed to allocate memory for dataset with ID
" + datasetId
-                    + " since it is not open.");
+            throw new HyracksDataException(
+                    "Failed to allocate memory for dataset with ID " + datasetId + " since
it is not open.");
         }
         synchronized (dsInfo) {
             // This is not needed for external datasets' indexes since they never use the
virtual buffer cache.
@@ -664,8 +666,8 @@
     private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException
{
         DatasetInfo dsInfo = datasetInfos.get(datasetId);
         if (dsInfo == null) {
-            throw new HyracksDataException("Failed to deallocate memory for dataset with
ID " + datasetId
-                    + " since it is not open.");
+            throw new HyracksDataException(
+                    "Failed to deallocate memory for dataset with ID " + datasetId + " since
it is not open.");
         }
         synchronized (dsInfo) {
             if (dsInfo.isOpen && dsInfo.memoryAllocated) {
@@ -677,4 +679,11 @@
             }
         }
     }
+
+    @Override
+    public synchronized void allocateMemory(String resourceName) throws HyracksDataException
{
+        //a resource name in the case of DatasetLifecycleManager is a dataset id which is
passed to the ResourceHeapBufferAllocator.
+        int did = Integer.parseInt(resourceName);
+        allocateDatasetMemory(did);
+    }
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index c3d75b1..7f62b3f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -60,9 +60,6 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback
searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException
{
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)
{
-            if (!dsInfo.isMemoryAllocated()) {
-                datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
-            }
             incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE)
{
             dsInfo.declareActiveIOOperation();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/481
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifd90fabc79e61f84370d415c38917b998db41466
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hubailmor@gmail.com>

Mime
View raw message