activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r591442 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/index/hash/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/kaha/impl/index/hash/
Date Fri, 02 Nov 2007 20:09:58 GMT
Author: rajdavies
Date: Fri Nov  2 13:09:57 2007
New Revision: 591442

URL: http://svn.apache.org/viewvc?rev=591442&view=rev
Log:
Further enhancement to https://issues.apache.org/activemq/browse/AMQ-1246

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
Fri Nov  2 13:09:57 2007
@@ -181,9 +181,36 @@
     }
 
     private void addHashEntry(int index, HashEntry entry) throws IOException {
-        HashPageInfo page = getInsertPage(index);
-        int offset = index % maximumEntries;
-        page.addHashEntry(offset, entry);
+        HashPageInfo pageToUse = null;
+        int offset = 0;
+        if (index >= maximumBinSize()) {
+            HashPage hp = hashIndex.createPage(id);
+            pageToUse = addHashPageInfo(hp.getId(), 0);
+            pageToUse.setPage(hp);
+            offset = 0;
+        } else {
+            
+            int count = 0;
+            int countSoFar=0;
+            int pageNo = 0;
+            for (HashPageInfo page : hashPages) {
+                count += page.size();
+                if (index < count ) {
+                    offset = index - countSoFar;
+                    break;
+                }
+                if (index == count && page.size()+1 <= maximumEntries) {
+                    offset = page.size();
+                    break;
+                }
+                countSoFar += page.size();
+                pageNo++;
+            }
+            pageToUse = hashPages.get(pageNo);
+        }
+        pageToUse.begin();
+        
+        pageToUse.addHashEntry(offset, entry);
         doOverFlow(index);
     }
 
@@ -202,25 +229,12 @@
         HashEntry result = page.getHashEntry(offset);
         return result;
     }
+    
 
     private int maximumBinSize() {
         return maximumEntries * hashPages.size();
     }
 
-    private HashPageInfo getInsertPage(int index) throws IOException {
-        HashPageInfo result = null;
-        if (index >= maximumBinSize()) {
-            HashPage page = hashIndex.createPage(id);
-            result = addHashPageInfo(page.getId(), 0);
-            result.setPage(page);
-        } else {
-            int offset = index / maximumEntries;
-            result = hashPages.get(offset);
-        }
-        result.begin();
-        return result;
-    }
-
     private HashPageInfo getRetrievePage(int index) throws IOException {
         HashPageInfo result = null;
         int count = 0;
@@ -250,16 +264,6 @@
         }
         return result;
     }
-
-//    private int getInsertPageNo(int index) {
-//        int result = index / maximumEntries;
-//        return result;
-//    }
-//
-//    private int getOffset(int index) {
-//        int result = index % maximumEntries;
-//        return result;
-//    }
 
     private void doOverFlow(int index) throws IOException {
         int pageNo = index / maximumEntries;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
Fri Nov  2 13:09:57 2007
@@ -40,7 +40,6 @@
     private int binId;
     private int persistedSize;
     private List<HashEntry> hashIndexEntries;
-    private static final HashEntry nullEntry = new HashEntry();
     /*
      * for persistence only
      */
@@ -193,11 +192,6 @@
     void addHashEntry(int index, HashEntry entry) throws IOException {
         // index = index >= 0 ? index : 0;
         // index = (index == 0 || index< size()) ? index : size()-1;
-        if (index > hashIndexEntries.size()) {
-            for (int i = hashIndexEntries.size(); i < (index+1);i++) {
-                hashIndexEntries.add(nullEntry);
-            }
-        }
         hashIndexEntries.add(index, entry);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Fri Nov  2 13:09:57 2007
@@ -55,6 +55,8 @@
 
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
     private static final String STORE_STATE = "store-state";
+    private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
+    private static final Integer INDEX_VERSION = new Integer(2);
     private static final String RECORD_REFERENCES = "record-references";
     private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;
@@ -67,6 +69,7 @@
     private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+   
 
     public KahaReferenceStoreAdapter(AtomicLong size){
         super(size);
@@ -94,12 +97,21 @@
                 storeValid = status.get();
             }
             if (storeValid) {
+                //check what version the indexes are at
+                Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
+                if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue())
{
+                    storeValid = false;
+                    LOG.warn("Indexes at an older version - need to regenerate");
+                }
+            }
+            if (storeValid) {
                 if (stateMap.containsKey(RECORD_REFERENCES)) {
                     recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
                 }
             }
         }
         stateMap.put(STORE_STATE, new AtomicBoolean());
+        stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
         durableSubscribers = store.getListContainer("durableSubscribers");
         durableSubscribers.setMarshaller(new CommandMarshaller());
         preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
@@ -112,6 +124,7 @@
     public synchronized void stop() throws Exception {
         stateMap.put(RECORD_REFERENCES, recordReferences);
         stateMap.put(STORE_STATE, new AtomicBoolean(true));
+        stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
         if (this.stateStore != null) {
             this.stateStore.close();
             this.stateStore = null;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
Fri Nov  2 13:09:57 2007
@@ -25,15 +25,17 @@
 import org.apache.activemq.kaha.impl.index.IndexManager;
 import org.apache.activemq.util.IOHelper;
 
-
 /**
  * Test a HashIndex
  */
 public class HashTest extends TestCase {
 
-    private static final int COUNT = 1000;
+    private static final int COUNT = 10000;
+
     private HashIndex hashIndex;
+
     private File directory;
+
     private IndexManager indexManager;
 
     /**
@@ -44,8 +46,12 @@
         super.setUp();
         directory = new File(IOHelper.getDefaultDataDirectory());
         directory.mkdirs();
-        indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
+        IOHelper.deleteChildren(directory);
+        indexManager = new IndexManager(directory, "im-hash-test", "rw", null,
+                new AtomicLong());
         this.hashIndex = new HashIndex(directory, "testHash", indexManager);
+        this.hashIndex.setNumberOfBins(12);
+        this.hashIndex.setPageSize(32 * 1024);
         this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
     }
 
@@ -56,7 +62,7 @@
         doTest(600);
         hashIndex.clear();
         hashIndex.unload();
-        doTest(1024 * 4);
+        doTest(128);
     }
 
     public void doTest(int pageSize) throws Exception {
@@ -66,8 +72,11 @@
         doInsert(keyRoot);
         checkRetrieve(keyRoot);
         doRemove(keyRoot);
+
         doInsert(keyRoot);
-        doRemoveBackwards(keyRoot);
+        doRemoveHalf(keyRoot);
+        doInsertHalf(keyRoot);
+        checkRetrieve(keyRoot);
     }
 
     void doInsert(String keyRoot) throws Exception {
@@ -75,23 +84,41 @@
             IndexItem value = indexManager.createNewIndex();
             indexManager.storeIndex(value);
             hashIndex.store(keyRoot + i, value);
-
         }
     }
 
     void checkRetrieve(String keyRoot) throws IOException {
         for (int i = 0; i < COUNT; i++) {
-            IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
             assertNotNull(item);
         }
     }
 
+    void doRemoveHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                hashIndex.remove(keyRoot + i);
+            }
+
+        }
+    }
+
+    void doInsertHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                IndexItem value = indexManager.createNewIndex();
+                indexManager.storeIndex(value);
+                hashIndex.store(keyRoot + i, value);
+            }
+        }
+    }
+
     void doRemove(String keyRoot) throws Exception {
         for (int i = 0; i < COUNT; i++) {
             hashIndex.remove(keyRoot + i);
         }
         for (int i = 0; i < COUNT; i++) {
-            IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
             assertNull(item);
         }
     }
@@ -101,7 +128,7 @@
             hashIndex.remove(keyRoot + i);
         }
         for (int i = 0; i < COUNT; i++) {
-            IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
             assertNull(item);
         }
     }



Mime
View raw message