asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1058: make Asterix compatible with lazy LSM memory...
Date Fri, 25 Sep 2015 19:57:40 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/408

Change subject: ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation
......................................................................

ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation

Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
---
M asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
M pom.xml
4 files changed, 113 insertions(+), 45 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/08/408/1

diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 27fb2c6..f6a796c 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.config.AsterixStorageProperties;
@@ -51,6 +53,7 @@
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent
{
+    private static final Logger LOGGER = Logger.getLogger(DatasetLifecycleManager.class.getName());
     private final AsterixStorageProperties storageProperties;
     private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
     private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
@@ -157,11 +160,7 @@
 
         dsInfo.indexes.remove(resourceID);
         if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()
&& !dsInfo.isExternal) {
-            List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
-            assert vbcs != null;
-            for (IVirtualBufferCache vbc : vbcs) {
-                used -= (vbc.getNumPages() * vbc.getPageSize());
-            }
+            deallocateDatasetMemory(did);
             datasetInfos.remove(did);
             datasetVirtualBufferCaches.remove(did);
             datasetOpTrackers.remove(did);
@@ -182,22 +181,7 @@
             throw new HyracksDataException("Failed to open index with resource ID " + resourceID
                     + " since it does not exist.");
         }
-
-        // This is not needed for external datasets' indexes since they never use the virtual
buffer cache.
-        if (!dsInfo.isOpen && !dsInfo.isExternal) {
-            List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
-            assert vbcs != null;
-            long additionalSize = 0;
-            for (IVirtualBufferCache vbc : vbcs) {
-                additionalSize += vbc.getNumPages() * vbc.getPageSize();
-            }
-            while (used + additionalSize > capacity) {
-                if (!evictCandidateDataset()) {
-                    throw new HyracksDataException("Cannot activate index since memory budget
would be exceeded.");
-                }
-            }
-            used += additionalSize;
-        }
+        initializeDatasetVirtualBufferCache(did);
 
         dsInfo.isOpen = true;
         dsInfo.touch();
@@ -209,6 +193,65 @@
             iInfo.isOpen = true;
         }
         iInfo.touch();
+    }
+
+    @Override
+    public synchronized void allocateMemory(long resourceID) throws HyracksDataException
{
+        int did = getDIDfromRID(resourceID);
+        allocateDatasetMemory(did);
+    }
+
+    @Override
+    public synchronized void deallocateMemory(long resourceID) throws HyracksDataException
{
+        int did = getDIDfromRID(resourceID);
+        deallocateDatasetMemory(did);
+    }
+
+    public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException
{
+        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        if (dsInfo == null) {
+            throw new HyracksDataException("Failed to deallocate memory for dataset with
ID " + datasetId
+                    + " since it is not open.");
+        }
+        synchronized (dsInfo) {
+            // This is not needed for external datasets' indexes since they never use the
virtual buffer cache.
+            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
+                List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+                long additionalSize = 0;
+                for (IVirtualBufferCache vbc : vbcs) {
+                    additionalSize += vbc.getNumPages() * vbc.getPageSize();
+                }
+                while (used + additionalSize > capacity) {
+                    if (!evictCandidateDataset()) {
+                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
+                                + " memory since memory budget would be exceeded.");
+                    }
+                }
+                used += additionalSize;
+                dsInfo.memoryAllocated = true;
+                LOGGER.log(Level.INFO, "Allocated memory (" + additionalSize + ") bytes for
dataset( "
+                        + dsInfo.datasetID + "). Total Used (" + used + ") bytes.");
+            }
+        }
+    }
+
+    public synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException
{
+        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        if (dsInfo == null) {
+            throw new HyracksDataException("Failed to deallocate memory for dataset with
ID " + datasetId
+                    + " since it is not open.");
+        }
+        synchronized (dsInfo) {
+            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
+                List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+                for (IVirtualBufferCache vbc : vbcs) {
+                    used -= (vbc.getNumPages() * vbc.getPageSize());
+                }
+                dsInfo.memoryAllocated = false;
+                LOGGER.log(Level.INFO, "Deallocated memory for dataset( " + dsInfo.datasetID
+ "). Total Used (" + used
+                        + ") bytes.");
+            }
+        }
     }
 
     private boolean evictCandidateDataset() throws HyracksDataException {
@@ -289,18 +332,26 @@
         synchronized (datasetVirtualBufferCaches) {
             List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
             if (vbcs == null) {
-                vbcs = new ArrayList<IVirtualBufferCache>();
-                int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
-                        .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
-                for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                    MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new
VirtualBufferCache(
-                            new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(),
numPages
-                                    / storageProperties.getMemoryComponentsNum()));
-                    vbcs.add(vbc);
-                }
-                datasetVirtualBufferCaches.put(datasetID, vbcs);
+                initializeDatasetVirtualBufferCache(datasetID);
+                vbcs = datasetVirtualBufferCaches.get(datasetID);
+                assert vbcs != null;
             }
             return vbcs;
+        }
+    }
+
+    private void initializeDatasetVirtualBufferCache(int datasetID) {
+        List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
+        synchronized (datasetVirtualBufferCaches) {
+            int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
+                    .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
+            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new
VirtualBufferCache(
+                        new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(),
numPages
+                                / storageProperties.getMemoryComponentsNum()));
+                vbcs.add(vbc);
+            }
+            datasetVirtualBufferCaches.put(datasetID, vbcs);
         }
     }
 
@@ -347,12 +398,14 @@
         private long lastAccess;
         private int numActiveIOOps;
         private final boolean isExternal;
+        private boolean memoryAllocated;
 
         public DatasetInfo(int datasetID, boolean isExternal) {
             this.indexes = new HashMap<Long, IndexInfo>();
             this.lastAccess = -1;
             this.datasetID = datasetID;
             this.isExternal = isExternal;
+            this.memoryAllocated = false;
         }
 
         @Override
@@ -426,6 +479,14 @@
         public String toString() {
             return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
                     + ", lastAccess: " + lastAccess + "}";
+        }
+
+        public boolean isMemoryAllocated() {
+            return memoryAllocated;
+        }
+
+        public int getDatasetID() {
+            return datasetID;
         }
     }
 
@@ -537,7 +598,6 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-
         for (IndexInfo iInfo : dsInfo.indexes.values()) {
             if (iInfo.isOpen) {
                 ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
@@ -548,12 +608,8 @@
             }
             assert iInfo.referenceCount == 0;
         }
+        deallocateDatasetMemory(dsInfo.datasetID);
         dsInfo.isOpen = false;
-
-        List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
-        for (IVirtualBufferCache vbc : vbcs) {
-            used -= vbc.getNumPages() * vbc.getPageSize();
-        }
     }
 
     @Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2603de2..ac33115 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,6 +59,9 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback
searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException
{
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)
{
+            if (!dsInfo.isMemoryAllocated()) {
+                datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
+            }
             incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE)
{
             dsInfo.declareActiveIOOperation();
@@ -77,7 +80,7 @@
     @Override
     public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
             ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)
{
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
@@ -108,7 +111,7 @@
         }
 
         if (needsFlush || flushOnExit) {
-            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify
operations from entering them until the current flush is schedule.
+            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify
operations from entering them until the current flush is scheduled.
             for (ILSMIndex lsmIndex : indexes) {
                 AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex) lsmIndex);
                 ILSMOperationTracker opTracker = abstractLSMIndex.getOperationTracker();
@@ -133,7 +136,7 @@
         }
     }
 
-    //Since this method is called sequentially by LogPage.notifyFlushTerminator in the sequence
flush were scheduled.
+    //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence
flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException
{
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 730116a..0df5761 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -39,9 +39,9 @@
 
 class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
 
-    AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
   
+    AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
     IIndexLifecycleManager ilm = new IndexLifecycleManager();
-    
+
     @Override
     public AsterixThreadExecutor getThreadExecutor() {
         return ate;
@@ -138,6 +138,15 @@
         public List<IIndex> getOpenIndexes() {
             throw new UnsupportedOperationException();
         }
-        
+
+        @Override
+        public void allocateMemory(long resourceID) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void deallocateMemory(long resourceID) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
     }
 }
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 94960e9..1245271 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,8 +56,8 @@
         <global.test.includes>**/*TestSuite.java,**/*Test.java,${execution.tests}</global.test.includes>
         <global.test.excludes>${optimizer.tests},${metadata.tests},${invalid.tests},${repeated.tests}</global.test.excludes>
     <!-- Versions under dependencymanagement or used in many projects via properties -->
-        <algebricks.version>0.2.16-incubating</algebricks.version>
-        <hyracks.version>0.2.16-incubating</hyracks.version>
+        <algebricks.version>0.2.17-SNAPSHOT</algebricks.version>
+        <hyracks.version>0.2.17-SNAPSHOT</hyracks.version>
         <hadoop.version>2.2.0</hadoop.version>
         <junit.version>4.11</junit.version>
         <commons.io.version>2.4</commons.io.version>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/408
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hubailmor@gmail.com>

Mime
View raw message