activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r651242 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/cursors/ kaha/impl/index/hash/ store/kahadaptor/ util/
Date Thu, 24 Apr 2008 12:23:25 GMT
Author: rajdavies
Date: Thu Apr 24 05:23:17 2008
New Revision: 651242

URL: http://svn.apache.org/viewvc?rev=651242&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1683

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Thu Apr 24 05:23:17 2008
@@ -23,13 +23,11 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
Thu Apr 24 05:23:17 2008
@@ -17,10 +17,6 @@
 package org.apache.activemq.kaha.impl.index.hash;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * Bin in a HashIndex
@@ -28,12 +24,13 @@
  * @version $Revision: 1.1.1.1 $
  */
 class HashBin {
-    private static final transient Log LOG = LogFactory.getLog(HashBin.class);
     private HashIndex hashIndex;
     private int id;
     private int maximumEntries;
     private int size;
-    private List<HashPageInfo> hashPages = new ArrayList<HashPageInfo>();
+    private int numberOfPages =0;
+    private HashPageInfo root = null;
+    private HashPageInfo tail = null;
 
     /**
      * Constructor
@@ -62,43 +59,49 @@
     }
 
     public int hashCode() {
-        return (int)id;
+        return (int)getId();
     }
 
-    int getId() {
+     int  getId() {
         return id;
     }
 
-    void setId(int id) {
+     void setId(int id) {
         this.id = id;
     }
 
-    boolean isEmpty() {
+     boolean isEmpty() {
         return true;
     }
 
-    int getMaximumEntries() {
+     int getMaximumEntries() {
         return this.maximumEntries;
     }
 
-    void setMaximumEntries(int maximumEntries) {
+     void setMaximumEntries(int maximumEntries) {
         this.maximumEntries = maximumEntries;
     }
 
-    int size() {
+     int size() {
         return size;
     }
 
-    HashPageInfo addHashPageInfo(long id, int size) throws IOException {
+     HashPageInfo addHashPageInfo(long id, int size) throws IOException {
         HashPageInfo info = new HashPageInfo(hashIndex);
         info.setId(id);
         info.setSize(size);
-        hashPages.add(info);
+        if (root == null) {
+            root=info;
+        }else {
+            tail.linkAfter(info);
+        }
+        tail=info;
+        this.numberOfPages++;
         this.size += size;
         return info;
     }
 
-    public HashEntry find(HashEntry key) throws IOException {
+     public HashEntry find(HashEntry key) throws IOException {
         HashEntry result = null;
         try {
             int low = 0;
@@ -122,7 +125,7 @@
         return result;
     }
 
-    boolean put(HashEntry newEntry) throws IOException {
+     boolean put(HashEntry newEntry) throws IOException {
         boolean replace = false;
         try {
             int low = 0;
@@ -151,7 +154,7 @@
         return replace;
     }
 
-    HashEntry remove(HashEntry entry) throws IOException {
+     HashEntry remove(HashEntry entry) throws IOException {
         HashEntry result = null;
         try {
             int low = 0;
@@ -191,8 +194,10 @@
             int count = 0;
             int countSoFar=0;
             int pageNo = 0;
-            for (HashPageInfo page : hashPages) {
+            HashPageInfo page = root;
+            while (page != null) {
                 count += page.size();
+                pageToUse=page;
                 if (index < count ) {
                     offset = index - countSoFar;
                     break;
@@ -203,13 +208,12 @@
                 }
                 countSoFar += page.size();
                 pageNo++;
+                page = (HashPageInfo) page.getNext();
             }
-            while(pageNo >= hashPages.size()) {
-                HashPage hp = hashIndex.createPage(id);
-                addHashPageInfo(hp.getId(), 0);               
-            }
-            pageToUse = hashPages.get(pageNo);
-           
+            while(pageNo >= this.numberOfPages) {
+                HashPage hp  = hashIndex.createPage(id);
+                pageToUse = addHashPageInfo(hp.getId(), 0);               
+            }            
         }
         pageToUse.begin();  
         pageToUse.addHashEntry(offset, entry);
@@ -222,7 +226,14 @@
         HashEntry result = page.removeHashEntry(offset);
         
         if (page.isEmpty()) {
-            hashPages.remove(page);
+            if (root.equals(page)) {
+                root=(HashPageInfo) root.getNext();
+            }
+            if (tail.equals(page)) {
+                tail=(HashPageInfo) page.getPrevious();
+            }
+            page.unlink();
+            this.numberOfPages--;
             hashIndex.releasePage(page.getPage());
         }
         doUnderFlow(index);
@@ -239,21 +250,22 @@
     
 
     private int getMaximumBinSize() {
-        return maximumEntries * hashPages.size();
+        return maximumEntries * this.numberOfPages;
     }
 
     private HashPageInfo getRetrievePage(int index) throws IOException {
         HashPageInfo result = null;
         int count = 0;
-        int pageNo = 0;
-        for (HashPageInfo page : hashPages) {
+        HashPageInfo page = root;
+        while (page != null) {
             count += page.size();
+            result = page;
             if (index < count) {
                 break;
             }
-            pageNo++;
+            page = (HashPageInfo) page.getNext();
         }
-        result = hashPages.get(pageNo);
+        
         result.begin();
         return result;
     }
@@ -261,12 +273,14 @@
     private int getRetrieveOffset(int index) throws IOException {
         int result = 0;
         int count = 0;
-        for (HashPageInfo page : hashPages) {
+        HashPageInfo page = root;
+        while (page != null) {
             if ((index + 1) <= (count + page.size())) {
                 result = index - count;
                 break;
             }
             count += page.size();
+            page = (HashPageInfo) page.getNext();
         }
         return result;
     }
@@ -277,43 +291,51 @@
             // overflowed
             info.begin();
             HashEntry entry = info.removeHashEntry(info.size() - 1);
-            doOverFlow(hashPages.indexOf(info)+1, entry);
+            doOverFlow(getNextPage(info), entry);
         }
     }
 
-    private void doOverFlow(int pageNo, HashEntry entry) throws IOException {
+    private void doOverFlow(HashPageInfo next, HashEntry entry) throws IOException {
         HashPageInfo info = null;
-        if (pageNo >= hashPages.size()) {
+        if (next == null) {
             HashPage page = hashIndex.createPage(id);
             info = addHashPageInfo(page.getId(), 0);
             info.setPage(page);
         } else {
-            info = hashPages.get(pageNo);
+            info = next;
         }
         info.begin();
         info.addHashEntry(0, entry);
         if (info.size() > maximumEntries) {
             // overflowed
             HashEntry overflowed = info.removeHashEntry(info.size() - 1);
-            doOverFlow(pageNo+1, overflowed);
+            doOverFlow(getNextPage(info), overflowed);
         }
     }
+    
+    private HashPageInfo getNextPage(HashPageInfo start) {
+        return (HashPageInfo) start.getNext();
+    }
 
     private void doUnderFlow(int index) {
     }
 
     String dump() throws IOException {
-        String str = "[" + hashPages.size()+"]";
-        for (HashPageInfo page : hashPages) {
+        String str = "[" + this.numberOfPages+"]";
+        HashPageInfo page = root;
+        while (page != null) {
             page.begin();
             str +=page.dump();
             page.end();
+            page = (HashPageInfo) page.getNext();
         }
         return str;
     }
     private void end() throws IOException {
-        for (HashPageInfo info : hashPages) {
-            info.end();
+        HashPageInfo page = root;
+        while (page != null) {
+            page.end();
+            page = (HashPageInfo) page.getNext();
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Thu Apr 24 05:23:17 2008
@@ -463,6 +463,12 @@
         openIndexFile();
         doLoad();
     }
+    
+    
+    public String toString() {
+        String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
+        return str;
+    }
       
 
     static int hash(Object x) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
Thu Apr 24 05:23:17 2008
@@ -17,13 +17,14 @@
 package org.apache.activemq.kaha.impl.index.hash;
 
 import java.io.IOException;
+import org.apache.activemq.util.LinkedNode;
 
 /**
  * A Page within a HashPageInfo
  * 
  * @version $Revision: 1.1.1.1 $
  */
-class HashPageInfo {
+class HashPageInfo extends LinkedNode{
 
     private HashIndex hashIndex;
     private long id;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Thu Apr 24 05:23:17 2008
@@ -59,7 +59,7 @@
     private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
     private static final String STORE_STATE = "store-state";
     private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
-    private static final Integer INDEX_VERSION = new Integer(5);
+    private static final Integer INDEX_VERSION = new Integer(6);
     private static final String RECORD_REFERENCES = "record-references";
     private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Apr 24 05:23:17 2008
@@ -352,7 +352,9 @@
     }
     
     private String getSubscriptionContainerName(String subscriptionKey) {
-        StringBuffer buffer = new StringBuffer(subscriptionKey);
-        return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
+        StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
+        result.append(destination.getQualifiedName());
+        result.append(subscriptionKey);
+        return result.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java?rev=651242&r1=651241&r2=651242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java Thu
Apr 24 05:23:17 2008
@@ -133,6 +133,7 @@
     public void unlink() {
         // If we are allready unlinked...
         if (prev == this) {
+            reset();
             return;
         }
 
@@ -145,6 +146,10 @@
         prev.next = next;
 
         // Update our links..
+        reset();
+    }
+    
+    public void reset() {
         next = this;
         prev = this;
         tail = true;



Mime
View raw message