activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r669879 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/impl/index/ kaha/impl/index/hash/ store/amq/
Date Fri, 20 Jun 2008 12:52:41 GMT
Author: rajdavies
Date: Fri Jun 20 05:52:41 2008
New Revision: 669879

URL: http://svn.apache.org/viewvc?rev=669879&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1814

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java?rev=669879&r1=669878&r2=669879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java
Fri Jun 20 05:52:41 2008
@@ -300,6 +300,13 @@
     public void setValueSize(int valueSize) {
         this.valueSize = valueSize;
     }
+    
+    void copyIndex(IndexItem other) {
+        this.offset=other.offset;
+        this.active=other.active;
+        this.previousItem=other.previousItem;
+        this.nextItem=other.nextItem;
+    }
 
     /**
      * @return print of 'this'

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?rev=669879&r1=669878&r2=669879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
Fri Jun 20 05:52:41 2008
@@ -49,6 +49,7 @@
     private IndexItem lastFree;
     private boolean dirty;
     private final AtomicLong storeSize;
+    private int freeSize = 0;
 
     public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong
storeSize) throws IOException {
         this.directory = directory;
@@ -64,7 +65,11 @@
     }
 
     public synchronized IndexItem getIndex(long offset) throws IOException {
-        return reader.readItem(offset);
+        IndexItem result = null;
+        if (offset >= 0) {
+            result = reader.readItem(offset);
+        }
+        return result;
     }
 
     public synchronized IndexItem refreshIndex(IndexItem item) throws IOException {
@@ -80,8 +85,16 @@
             lastFree = item;
         } else {
             lastFree.setNextItem(item.getOffset());
+            if (lastFree.equals(firstFree)) {
+                firstFree=new IndexItem();
+                firstFree.copyIndex(lastFree);
+                writer.updateIndexes(firstFree);
+            }
+            writer.updateIndexes(lastFree);
+            lastFree=item;
         }
         writer.updateIndexes(item);
+        freeSize++;
         dirty = true;
     }
 
@@ -155,6 +168,8 @@
                 }
             }
             result.reset();
+            writer.updateIndexes(result);
+            freeSize--;
         }
         return result;
     }
@@ -200,6 +215,7 @@
                     lastFree = index;
                     firstFree = index;
                 }
+               freeSize++;
             }
             offset += IndexItem.INDEX_SIZE;
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=669879&r1=669878&r2=669879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Fri Jun 20 05:52:41 2008
@@ -44,6 +44,7 @@
     public static final int DEFAULT_BIN_SIZE;
     public static final int MAXIMUM_CAPACITY;
     public static final int DEFAULT_LOAD_FACTOR;
+    private static final int LOW_WATER_MARK=1024*16;
     private static final String NAME_PREFIX = "hash-index-";
     private static final Log LOG = LogFactory.getLog(HashIndex.class);
     private final String name;
@@ -67,6 +68,7 @@
     private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
     private int pageCacheSize = 10;
     private int size;
+    private int highestSize=0;
     private int activeBins;
     private int threshold;
     private int maximumCapacity=MAXIMUM_CAPACITY;
@@ -275,11 +277,14 @@
         entry.setKey((Comparable)key);
         entry.setIndexOffset(value.getOffset());
         if (!getBin(key).put(entry)) {
-            size++;
+            this.size++;
         }
-        if (size >= threshold) {
+        if (this.size >= this.threshold) {
             resize(2*bins.length);
         }
+        if(this.size > this.highestSize) {
+            this.highestSize=this.size;
+        }
     }
 
     public synchronized StoreEntry get(Object key) throws IOException {
@@ -292,14 +297,22 @@
 
     public synchronized StoreEntry remove(Object key) throws IOException {
         load();
+        StoreEntry result = null;
         HashEntry entry = new HashEntry();
         entry.setKey((Comparable)key);
-        HashEntry result = getBin(key).remove(entry);
-        if (result != null) {
-            size--;
-            return indexManager.getIndex(result.getIndexOffset());
+        HashEntry he = getBin(key).remove(entry);
+        if (he != null) {
+            this.size--;
+            result = this.indexManager.getIndex(he.getIndexOffset());
+        }
+        if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size
*2)) {
+            int newSize = this.size/this.keysPerPage;
+            newSize = Math.max(128, newSize);
+            this.highestSize=0;
+            resize(newSize);
+            
         }
-        return null;
+        return result;
     }
 
     public synchronized boolean containsKey(Object key) throws IOException {
@@ -523,42 +536,53 @@
     }
     
     private void resize(int newCapacity) throws IOException {
-        if (bins.length == getMaximumCapacity()) {
-            threshold = Integer.MAX_VALUE;
-            return;
-        }
-        String backFileName = name + "-REISZE";
-        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
-        backIndex.setKeyMarshaller(keyMarshaller);
-        backIndex.setKeySize(getKeySize());
-        backIndex.setNumberOfBins(newCapacity);
-        backIndex.setPageSize(getPageSize());
-        backIndex.load();
-        File backFile = backIndex.file;
-        long offset = 0;
-        while ((offset + pageSize) <= indexFile.length()) {
-            indexFile.seek(offset);
-            HashPage page = getFullPage(offset);
-            if (page.isActive()) {
-                for (HashEntry entry : page.getEntries()) {
-                    backIndex.getBin(entry.getKey()).put(entry);
-                    backIndex.size++;
+        if (bins.length < getMaximumCapacity()) {
+            if (newCapacity != numberOfBins) {
+                int capacity = 1;
+                while (capacity < newCapacity) {
+                    capacity <<= 1;
+                }
+                if (newCapacity != numberOfBins) {
+                    LOG.info("Resize hash bins " + this.name + " from " + numberOfBins +
" to " + newCapacity);
+                    
+                    String backFileName = name + "-REISZE";
+                    HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+                    backIndex.setKeyMarshaller(keyMarshaller);
+                    backIndex.setKeySize(getKeySize());
+                    backIndex.setNumberOfBins(newCapacity);
+                    backIndex.setPageSize(getPageSize());
+                    backIndex.load();
+                    File backFile = backIndex.file;
+                    long offset = 0;
+                    while ((offset + pageSize) <= indexFile.length()) {
+                        indexFile.seek(offset);
+                        HashPage page = getFullPage(offset);
+                        if (page.isActive()) {
+                            for (HashEntry entry : page.getEntries()) {
+                                backIndex.getBin(entry.getKey()).put(entry);
+                                backIndex.size++;
+                            }
+                        }
+                        page=null;
+                        offset += pageSize;
+                    }
+                    backIndex.unload();
+                  
+                    unload();
+                    IOHelper.deleteFile(file);
+                    IOHelper.copyFile(backFile, file);
+                    IOHelper.deleteFile(backFile);
+                    setNumberOfBins(newCapacity);
+                    bins = new HashBin[newCapacity];
+                    threshold = calculateThreashold();
+                    openIndexFile();
+                    doLoad();
                 }
             }
-            page=null;
-            offset += pageSize;
+        }else {
+            threshold = Integer.MAX_VALUE;
+            return;
         }
-        backIndex.unload();
-      
-        unload();
-        IOHelper.deleteFile(file);
-        IOHelper.copyFile(backFile, file);
-        IOHelper.deleteFile(backFile);
-        setNumberOfBins(newCapacity);
-        bins = new HashBin[newCapacity];
-        threshold = calculateThreashold();
-        openIndexFile();
-        doLoad();
     }
     
     private int calculateThreashold() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=669879&r1=669878&r2=669879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Fri Jun 20 05:52:41 2008
@@ -121,7 +121,7 @@
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
     private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
     private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
-    private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
+    private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
     private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>>
();
     private String directoryPath = "";
     private RandomAccessFile lockFile;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=669879&r1=669878&r2=669879&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Fri Jun 20 05:52:41 2008
@@ -19,6 +19,7 @@
 import java.io.File;
 
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
 import org.apache.activemq.store.ReferenceStoreAdapter;
@@ -33,7 +34,7 @@
  * @version $Revision: 1.17 $
  */
 public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
-
+    static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024;
     private TaskRunnerFactory taskRunnerFactory;
     private File dataDirectory;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
@@ -45,6 +46,12 @@
     private boolean useNio = true;
     private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
     private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
+    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
+    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
+    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
+    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
+    private int maxReferenceFileLength=DEFAULT_MAX_REFERNCE_FILE_LENGTH;
 
 
     /**
@@ -62,6 +69,12 @@
         result.setUseNio(isUseNio());
         result.setMaxFileLength(getMaxFileLength());
         result.setCleanupInterval(getCleanupInterval());
+        result.setIndexBinSize(getIndexBinSize());
+        result.setIndexKeySize(getIndexKeySize());
+        result.setIndexPageSize(getIndexPageSize());
+        result.setIndexMaxBinSize(getIndexMaxBinSize());
+        result.setIndexLoadFactor(getIndexLoadFactor());
+        result.setMaxReferenceFileLength(getMaxReferenceFileLength());
         return result;
     }
 
@@ -189,4 +202,88 @@
 	public void setMaxFileLength(int maxFileLength) {
 		this.maxFileLength = maxFileLength;
 	}
+
+    /**
+     * @return the indexBinSize
+     */
+    public int getIndexBinSize() {
+        return indexBinSize;
+    }
+
+    /**
+     * @param indexBinSize the indexBinSize to set
+     */
+    public void setIndexBinSize(int indexBinSize) {
+        this.indexBinSize = indexBinSize;
+    }
+
+    /**
+     * @return the indexKeySize
+     */
+    public int getIndexKeySize() {
+        return indexKeySize;
+    }
+
+    /**
+     * @param indexKeySize the indexKeySize to set
+     */
+    public void setIndexKeySize(int indexKeySize) {
+        this.indexKeySize = indexKeySize;
+    }
+
+    /**
+     * @return the indexPageSize
+     */
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    /**
+     * @param indexPageSize the indexPageSize to set
+     */
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
+    }
+
+    /**
+     * @return the indexMaxBinSize
+     */
+    public int getIndexMaxBinSize() {
+        return indexMaxBinSize;
+    }
+
+    /**
+     * @param indexMaxBinSize the indexMaxBinSize to set
+     */
+    public void setIndexMaxBinSize(int indexMaxBinSize) {
+        this.indexMaxBinSize = indexMaxBinSize;
+    }
+
+    /**
+     * @return the indexLoadFactor
+     */
+    public int getIndexLoadFactor() {
+        return indexLoadFactor;
+    }
+
+    /**
+     * @param indexLoadFactor the indexLoadFactor to set
+     */
+    public void setIndexLoadFactor(int indexLoadFactor) {
+        this.indexLoadFactor = indexLoadFactor;
+    }
+
+    /**
+     * @return the maxReferenceFileLength
+     */
+    public int getMaxReferenceFileLength() {
+        return maxReferenceFileLength;
+    }
+
+    /**
+     * @param maxReferenceFileLength the maxReferenceFileLength to set
+     */
+    public void setMaxReferenceFileLength(int maxReferenceFileLength) {
+        this.maxReferenceFileLength = maxReferenceFileLength;
+    }
 }



Mime
View raw message