activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r656378 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/ kaha/impl/container/ kaha/impl/index/hash/ store/amq/ store/kahadaptor/
Date Wed, 14 May 2008 19:36:12 GMT
Author: rajdavies
Date: Wed May 14 12:36:11 2008
New Revision: 656378

URL: http://svn.apache.org/viewvc?rev=656378&view=rev
Log:
Grow the HashIndex bins as required

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
Wed May 14 12:36:11 2008
@@ -260,6 +260,17 @@
     int getIndexPageSize();
     
     /**
+     * set the meximum bin size
+     */
+    void setMaxBinSize(int size);
+    
+    /**
+     * @return the maximum bin size
+     * @return
+     */
+    int getMaxBinSize();
+    
+    /**
      * @return the Index MBean
      */
     IndexMBean getIndexMBean();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Wed May 14 12:36:11 2008
@@ -58,6 +58,7 @@
     private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+    private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
 
     public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager
indexManager,
                             DataManager dataManager, boolean persistentIndex) {
@@ -76,6 +77,7 @@
                     hashIndex.setNumberOfBins(getIndexBinSize());
                     hashIndex.setKeySize(getIndexKeySize());
                     hashIndex.setPageSize(getIndexPageSize());
+                    hashIndex.setMaximumCapacity(getMaxBinSize());
                     this.index = hashIndex;
                 } catch (IOException e) {
                     LOG.error("Failed to create HashIndex", e);
@@ -566,6 +568,15 @@
     public IndexMBean getIndexMBean() {
       return (IndexMBean) index;
     }
+  
+    public int getMaxBinSize() {
+        return maxBinSize;
+    }
+
+    public void setMaxBinSize(int maxBinSize) {
+        this.maxBinSize = maxBinSize;
+    }
+   
 
    
     public String toString() {
@@ -588,7 +599,5 @@
         }
         buf.append("}");
         return buf.toString();
-    }
-   
-    
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Wed May 14 12:36:11 2008
@@ -42,6 +42,11 @@
     public static final int DEFAULT_PAGE_SIZE;
     public static final int DEFAULT_KEY_SIZE;
     public static final int DEFAULT_BIN_SIZE;
+    public static final int MAXIMUM_CAPACITY = 16384;
+    /**
+     * The load factor used when none specified in constructor.
+     **/
+    static final float DEFAULT_LOAD_FACTOR;
     private static final String NAME_PREFIX = "hash-index-";
     private static final Log LOG = LogFactory.getLog(HashIndex.class);
     private final String name;
@@ -66,6 +71,9 @@
     private int pageCacheSize = 10;
     private int size;
     private int activeBins;
+    private int threshold;
+    private int maximumCapacity=MAXIMUM_CAPACITY;
+    private float loadFactor=0.75f;
     
     
     /**
@@ -178,6 +186,48 @@
         return false;
     }
     
+    /**
+     * @return the threshold
+     */
+    public int getThreshold() {
+        return threshold;
+    }
+
+    /**
+     * @param threshold the threshold to set
+     */
+    public void setThreshold(int threshold) {
+        this.threshold = threshold;
+    }
+
+    /**
+     * @return the loadFactor
+     */
+    public float getLoadFactor() {
+        return loadFactor;
+    }
+
+    /**
+     * @param loadFactor the loadFactor to set
+     */
+    public void setLoadFactor(float loadFactor) {
+        this.loadFactor = loadFactor;
+    }
+    
+    /**
+     * @return the maximumCapacity
+     */
+    public int getMaximumCapacity() {
+        return maximumCapacity;
+    }
+
+    /**
+     * @param maximumCapacity the maximumCapacity to set
+     */
+    public void setMaximumCapacity(int maximumCapacity) {
+        this.maximumCapacity = maximumCapacity;
+    }
+    
     public synchronized int getSize() {
         return size;
     }
@@ -193,6 +243,7 @@
                 capacity <<= 1;
             }
             this.bins = new HashBin[capacity];
+            threshold = calculateThreashold();
             keysPerPage = pageSize / keySize;
             dataIn = new DataByteArrayInputStream();
             dataOut = new DataByteArrayOutputStream(pageSize);
@@ -229,6 +280,9 @@
         if (!getBin(key).put(entry)) {
             size++;
         }
+        if (size >= threshold) {
+            resize(2*bins.length);
+        }
     }
 
     public synchronized StoreEntry get(Object key) throws IOException {
@@ -361,11 +415,18 @@
     }
 
     void addToBin(HashPage page) throws IOException {
-        HashBin bin = getBin(page.getBinId());
+        int index = page.getBinId();
+        if (index >= numberOfBins) {
+            HashBin[] newBins = new HashBin[index+1];
+            System.arraycopy(this.bins, 0, newBins, 0, this.bins.length);
+            this.bins=newBins;
+        }
+        HashBin bin = getBin(index);
         bin.addHashPageInfo(page.getId(), page.getPersistedSize());
     }
 
     private HashBin getBin(int index) {
+        
         HashBin result = bins[index];
         if (result == null) {
             result = new HashBin(this, index, pageSize / keySize);
@@ -464,6 +525,49 @@
         doLoad();
     }
     
+    private void resize(int newCapacity) throws IOException {
+        if (bins.length == getMaximumCapacity()) {
+            threshold = Integer.MAX_VALUE;
+            return;
+        }
+        String backFileName = name + "-REISZE";
+        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+        backIndex.setKeyMarshaller(keyMarshaller);
+        backIndex.setKeySize(getKeySize());
+        backIndex.setNumberOfBins(newCapacity);
+        backIndex.setPageSize(getPageSize());
+        backIndex.load();
+        File backFile = backIndex.file;
+        long offset = 0;
+        while ((offset + pageSize) <= indexFile.length()) {
+            indexFile.seek(offset);
+            HashPage page = getFullPage(offset);
+            if (page.isActive()) {
+                for (HashEntry entry : page.getEntries()) {
+                    backIndex.getBin(entry.getKey()).put(entry);
+                    backIndex.size++;
+                }
+            }
+            page=null;
+            offset += pageSize;
+        }
+        backIndex.unload();
+      
+        unload();
+        IOHelper.deleteFile(file);
+        IOHelper.copyFile(backFile, file);
+        IOHelper.deleteFile(backFile);
+        setNumberOfBins(newCapacity);
+        bins = new HashBin[newCapacity];
+        threshold = calculateThreashold();
+        openIndexFile();
+        doLoad();
+    }
+    
+    private int calculateThreashold() {
+        return (int)(bins.length * 100 * loadFactor);
+    }
+    
     
     public String toString() {
         String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
@@ -488,5 +592,6 @@
         DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
         DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
         DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+        DEFAULT_LOAD_FACTOR=Float.parseFloat(System.getProperty("defaultLoadFactor","1.5f"));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed May 14 12:36:11 2008
@@ -118,6 +118,7 @@
     private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+    private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
     private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
     private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>>
();
     private String directoryPath = "";
@@ -685,6 +686,7 @@
         adaptor.setIndexBinSize(getIndexBinSize());
         adaptor.setIndexKeySize(getIndexKeySize());
         adaptor.setIndexPageSize(getIndexPageSize());
+        adaptor.setMaxBinSize(getMaxBinSize());
         return adaptor;
     }
 
@@ -833,6 +835,14 @@
     public int getIndexPageSize() {
         return indexPageSize;
     }
+    
+    public int getMaxBinSize() {
+        return maxBinSize;
+    }
+
+    public void setMaxBinSize(int maxBinSize) {
+        this.maxBinSize = maxBinSize;
+    }
 
     /**
      * When set using XBean, you can use values such as: "20

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Wed May 14 12:36:11 2008
@@ -72,6 +72,7 @@
     private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+    private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
    
 
     public KahaReferenceStoreAdapter(AtomicLong size){
@@ -203,6 +204,7 @@
         container.setIndexBinSize(getIndexBinSize());
         container.setIndexKeySize(getIndexKeySize());
         container.setIndexPageSize(getIndexPageSize());
+        container.setMaxBinSize(getIndexBinSize());
         container.setKeyMarshaller(new MessageIdMarshaller());
         container.setValueMarshaller(new ReferenceRecordMarshaller());
         container.load();
@@ -361,4 +363,12 @@
     public void setIndexPageSize(int indexPageSize) {
         this.indexPageSize = indexPageSize;
     }
+    
+    public int getMaxBinSize() {
+        return maxBinSize;
+    }
+
+    public void setMaxBinSize(int maxBinSize) {
+        this.maxBinSize = maxBinSize;
+    }
 }



Mime
View raw message