activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r513455 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/kaha/impl/container/ main/java/org/apache/activemq/kaha/impl/data/ main/java/org/apac...
Date Thu, 01 Mar 2007 19:24:20 GMT
Author: rajdavies
Date: Thu Mar  1 11:24:17 2007
New Revision: 513455

URL: http://svn.apache.org/viewvc?view=rev&rev=513455
Log:
fix for jira issue: http://issues.apache.org/activemq/browse/AMQ-1121

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
      - copied, changed from r511881, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java
(from r511881, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java?view=diff&rev=513455&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java&r1=511881&p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java Thu
Mar  1 11:24:17 2007
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.kaha.impl.container;
+package org.apache.activemq.kaha;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -31,6 +31,15 @@
     private Object key;
     private String dataContainerName;
 
+    public ContainerId() {
+    }
+    
+    public ContainerId(Object key,String dataContainerName) {
+        this.key=key;
+        this.dataContainerName=dataContainerName;
+    }
+    
+    
     /**
      * @return Returns the dataContainerPrefix.
      */
@@ -39,10 +48,10 @@
     }
 
     /**
-     * @param dataContainerPrefix The dataContainerPrefix to set.
+     * @param dataContainerName The dataContainerPrefix to set.
      */
-    public void setDataContainerName(String dataContainerPrefix){
-        this.dataContainerName=dataContainerPrefix;
+    public void setDataContainerName(String dataContainerName){
+        this.dataContainerName=dataContainerName;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Thu Mar
 1 11:24:17 2007
@@ -138,6 +138,13 @@
      * @throws IOException
      */
     public void deleteMapContainer(Object id,String containerName) throws IOException;
+    
+    /**
+     * Delete Map container
+     * @param id
+     * @throws IOException
+     */
+    public void deleteMapContainer(ContainerId id) throws IOException;
 
     /**
      * Get a Set of call MapContainer Ids
@@ -145,7 +152,7 @@
      * @return the set of ids
      * @throws IOException
      */
-    public Set getMapContainerIds() throws IOException;
+    public Set<ContainerId> getMapContainerIds() throws IOException;
 
     /**
      * Checks if a ListContainer exists in the default container
@@ -213,6 +220,12 @@
      */
     public void deleteListContainer(Object id,String containerName) throws IOException;
 
+    /**
+     * delete a list container
+     * @param id
+     * @throws IOException
+     */
+    public void deleteListContainer(ContainerId id) throws IOException;
 
     /**
      * Get a Set of call ListContainer Ids
@@ -220,7 +233,7 @@
      * @return the set of ids
      * @throws IOException
      */
-    public Set getListContainerIds() throws IOException;
+    public Set<ContainerId> getListContainerIds() throws IOException;
     
     /**
      * @return the maxDataFileLength

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
Thu Mar  1 11:24:17 2007
@@ -22,11 +22,11 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexManager;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Thu Mar  1 11:24:17 2007
@@ -27,6 +27,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.RuntimeStoreException;
@@ -34,7 +35,6 @@
 import org.apache.activemq.kaha.StoreLocation;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.DataManagerFacade;
-import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
 import org.apache.activemq.kaha.impl.container.MapContainerImpl;
 import org.apache.activemq.kaha.impl.data.DataManagerImpl;
@@ -218,12 +218,14 @@
     public void deleteMapContainer(Object id) throws IOException{
         deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
     }
+    
+    public void deleteMapContainer(Object id,String containerName) throws IOException{
+        ContainerId containerId = new ContainerId(id,containerName);
+        deleteMapContainer(containerId);
+    }
 
-    public synchronized void deleteMapContainer(Object id,String containerName) throws IOException{
+    public synchronized void deleteMapContainer(ContainerId containerId) throws IOException{
         initialize();
-        ContainerId containerId=new ContainerId();
-        containerId.setKey(id);
-        containerId.setDataContainerName(containerName);
         MapContainerImpl container=maps.remove(containerId);
         if(container!=null){
             container.clear();
@@ -232,12 +234,12 @@
         }
     }
 
-    public synchronized Set<Object> getMapContainerIds() throws IOException{
+    public synchronized Set<ContainerId> getMapContainerIds() throws IOException{
         initialize();
-        Set<Object> set = new HashSet<Object>();
+        Set<ContainerId> set = new HashSet<ContainerId>();
         for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
             ContainerId id = (ContainerId)i.next();
-            set.add(id.getKey());
+            set.add(id);
         }
         return set;
     }
@@ -286,12 +288,14 @@
     public void deleteListContainer(Object id) throws IOException{
         deleteListContainer(id,DEFAULT_CONTAINER_NAME);
     }
-
+    
     public synchronized void deleteListContainer(Object id,String containerName) throws IOException{
+        ContainerId containerId=new ContainerId(id,containerName);
+        deleteListContainer(containerId);
+    }
+
+    public synchronized void deleteListContainer(ContainerId containerId) throws IOException{
         initialize();
-        ContainerId containerId=new ContainerId();
-        containerId.setKey(id);
-        containerId.setDataContainerName(containerName);
         ListContainerImpl container=lists.remove(containerId);
         if(container!=null){
             listsContainer.removeRoot(container.getIndexManager(),containerId);
@@ -300,12 +304,12 @@
         }
     }
 
-    public synchronized Set<Object> getListContainerIds() throws IOException{
+    public synchronized Set<ContainerId> getListContainerIds() throws IOException{
         initialize();
-        Set<Object> set = new HashSet<Object>();
+        Set<ContainerId> set = new HashSet<ContainerId>();
         for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
             ContainerId id = (ContainerId)i.next();
-            set.add(id.getKey());
+            set.add(id);
         }
         return set;
     }
@@ -333,7 +337,7 @@
         	if( isUseAsyncDataManager() ) {
 	        	AsyncDataManager t=new AsyncDataManager();
 	        	t.setDirectory(directory);
-	        	t.setFilePrefix("data-"+name+"-");
+	        	t.setFilePrefix("async-data-"+name+"-");
 	        	t.setMaxFileLength((int) maxDataFileLength);
 	        	t.start();
 	            dm=new DataManagerFacade(t, name);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
Thu Mar  1 11:24:17 2007
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
Thu Mar  1 11:24:17 2007
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.RuntimeStoreException;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Thu Mar  1 11:24:17 2007
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.RuntimeStoreException;
@@ -69,7 +70,7 @@
                     throw new RuntimeException(e);
                 }
             }else{
-                this.index=new VMIndex();
+                this.index=new VMIndex(indexManager);
             }
         }
         index.setKeyMarshaller(keyMarshaller);
@@ -505,7 +506,7 @@
                 StoreLocation data=dataManager.storeDataItem(valueMarshaller,value);
                 index.setValueData(data);
             }
-            IndexItem prev=indexList.getLast();
+            IndexItem prev=indexList.getLast();           
             prev=prev!=null?prev:indexList.getRoot();
             IndexItem next=indexList.getNextEntry(prev);
             prev.setNextItem(index.getOffset());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
Thu Mar  1 11:24:17 2007
@@ -34,6 +34,7 @@
     private RandomAccessFile randomAcessFile;
     private Object writerData;
     long length=0;
+    private boolean dirty;
 
     DataFile(File file,int number){
         this.file=file;
@@ -107,6 +108,15 @@
 	 */
 	public synchronized void setWriterData(Object writerData) {
 		this.writerData = writerData;
+        dirty=true;
 	}
+    
+    public synchronized boolean isDirty() {
+        return dirty;
+    }
+    
+    public synchronized void setDirty(boolean value) {
+        this.dirty = value;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
Thu Mar  1 11:24:17 2007
@@ -57,7 +57,7 @@
     
     Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
     private String dataFilePrefix;
-
+   
     public DataManagerImpl(File dir, final String name){
         this.dir=dir;
         this.name=name;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Thu Mar  1 11:24:17 2007
@@ -96,9 +96,10 @@
 
 	public synchronized void force(DataFile dataFile) throws IOException {
 		// If our dirty marker was set.. then we need to sync
-		if( dataFile.getWriterData()!=null ) {
+		if( dataFile.getWriterData()!=null && dataFile.isDirty()) {
 			dataFile.getRandomAccessFile().getFD().sync();
 	        dataFile.setWriterData(null);
+            dataFile.setDirty(false);
 		}
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
Thu Mar  1 11:24:17 2007
@@ -65,6 +65,10 @@
     public synchronized IndexItem getLast(){
         if(size==0)
             return null;
+        if(last!=null){
+            last.next=null;
+            last.setNextItem(IndexItem.POSITION_NOT_SET);
+        }
         return last;
     }
 
@@ -323,6 +327,7 @@
             return;
         if(e==last||e.equals(last)){
             if(size>1){
+                last = (IndexItem)refreshEntry(last);
                 last=getPrevEntry(last);
             }else{
                 last=null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
Thu Mar  1 11:24:17 2007
@@ -45,6 +45,7 @@
     private long length=0;
     private IndexItem firstFree;
     private IndexItem lastFree;
+    private boolean dirty;
 
     public IndexManager(File directory,String name,String mode,DataManager redoLog) throws
IOException{
         this.directory=directory;
@@ -76,10 +77,12 @@
             lastFree.setNextItem(item.getOffset());
         }
         writer.updateIndexes(item);
+        dirty=true;
     }
 
     public synchronized void storeIndex(IndexItem index) throws IOException{
         writer.storeItem(index);
+        dirty=true;
     }
 
     public synchronized void updateIndexes(IndexItem index) throws IOException{
@@ -88,10 +91,12 @@
         }catch(Throwable e){
             log.error(name+" error updating indexes ",e);
         }
+        dirty=true;
     }
 
     public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{
         writer.redoStoreItem(redo);
+        dirty=true;
     }
 
     public synchronized IndexItem createNewIndex() throws IOException{
@@ -113,8 +118,9 @@
     }
 
     public synchronized void force() throws IOException{
-        if(indexFile!=null){
+        if(indexFile!=null && dirty){
             indexFile.getFD().sync();
+            dirty=false;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
Thu Mar  1 11:24:17 2007
@@ -14,10 +14,14 @@
 
 package org.apache.activemq.kaha.impl.index;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.container.MapContainerImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Index implementation using a HashMap
@@ -25,9 +29,13 @@
  * @version $Revision: 1.2 $
  */
 public class VMIndex implements Index{
-
+    private static final Log log=LogFactory.getLog(VMIndex.class);
+    private IndexManager indexManager;
     private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
 
+    public VMIndex(IndexManager manager) {
+        this.indexManager= manager;
+    }
     /**
      * 
      * @see org.apache.activemq.kaha.impl.index.Index#clear()
@@ -47,10 +55,20 @@
 
     /**
      * @param key
+     * @return store entry
      * @see org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
      */
     public StoreEntry remove(Object key){
-       return  map.remove(key);
+        StoreEntry result =   map.remove(key);
+        if (result != null) {
+            try{
+                result=indexManager.refreshIndex((IndexItem)result);
+            }catch(IOException e){
+                log.error("Failed to refresh entry",e);
+               throw new RuntimeException("Failed to refresh entry");
+            }
+        }
+        return result;
     }
 
     /**
@@ -68,7 +86,16 @@
      * @return the entry
      */
     public StoreEntry get(Object key){
-        return map.get(key);
+        StoreEntry result =  map.get(key);
+        if (result != null) {
+            try{
+                result=indexManager.refreshIndex((IndexItem)result);
+            }catch(IOException e){
+                log.error("Failed to refresh entry",e);
+               throw new RuntimeException("Failed to refresh entry");
+            }
+        }
+        return result;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Thu Mar  1 11:24:17 2007
@@ -84,7 +84,7 @@
     private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic,
AMQMessageStore>();
     
     private AsyncDataManager asyncDataManager;
-    private ReferenceStoreAdapter referenceStoreAdapter;
+    private KahaReferenceStoreAdapter referenceStoreAdapter;
 	private TaskRunnerFactory taskRunnerFactory; 
     private WireFormat wireFormat = new OpenWireFormat();
 
@@ -106,7 +106,7 @@
 
 	private Runnable periodicCleanupTask;
 	private boolean deleteAllMessages;
-	private File directory = new File(IOHelper.getDefaultDataDirectory() + "/quick");
+	private File directory = new File(IOHelper.getDefaultDataDirectory() + "/amq");
 
 
     
@@ -242,7 +242,9 @@
             checkpointTask.wakeup();
             
             if (sync) {
-                log.debug("Waitng for checkpoint to complete.");
+                if(log.isDebugEnabled()){
+                    log.debug("Waitng for checkpoint to complete.");
+                }
                 latch.await();
             }
         }
@@ -264,7 +266,10 @@
         }        
         try {
 
-            log.debug("Checkpoint started.");
+            if(log.isDebugEnabled()){
+                log.debug("Checkpoint started.");
+            }
+            referenceStoreAdapter.sync();
             Location newMark = null;
 
             Iterator<AMQMessageStore> iterator = queues.values().iterator();
@@ -287,7 +292,9 @@
 
             try {
                 if (newMark != null) {
-                    log.debug("Marking journal at: " + newMark);
+                    if(log.isDebugEnabled()){
+                        log.debug("Marking journal at: " + newMark);
+                    }
                     asyncDataManager.setMark(newMark, false);
                     writeTraceMessage("CHECKPOINT "+new Date(), true);
                 }
@@ -296,17 +303,12 @@
                 log.error("Failed to mark the Journal: " + e, e);
             }
     
-//                if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
-//                    // We may be check pointing more often than the checkpointInterval
if under high use
-//                    // But we don't want to clean up the db that often.
-//                    long now = System.currentTimeMillis();
-//                    if( now > lastCleanup+checkpointInterval ) {
-//                        lastCleanup = now;
-//                        ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
-//                    }
-//                }
-
-            log.debug("Checkpoint done.");
+            if(log.isDebugEnabled()){
+                log.debug("Checkpoint done.");
+            }
+        }
+        catch(IOException e) {
+            log.error("Failed to sync reference store",e);
         }
         finally {
             latch.countDown();
@@ -603,7 +605,7 @@
 		return manager;
 	}
     
-    protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException
{
     	KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); 
 		return adaptor;
 	}
@@ -627,9 +629,7 @@
 	public ReferenceStoreAdapter getReferenceStoreAdapter() {
 		return referenceStoreAdapter;
 	}
-	public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
-		this.referenceStoreAdapter = referenceStoreAdapter;
-	}
+	
 
 	public TaskRunnerFactory getTaskRunnerFactory() {
 		return taskRunnerFactory;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Thu Mar  1 11:24:17 2007
@@ -31,9 +31,11 @@
 
     protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
+    protected KahaReferenceStoreAdapter adapter;
     protected StoreEntry batchEntry=null;
 
-    public KahaReferenceStore(MapContainer container,ActiveMQDestination destination) throws
IOException{
+    public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination
destination) throws IOException{
+        this.adapter = adapter;
         this.messageContainer=container;
         this.destination=destination;
     }
@@ -109,10 +111,10 @@
         return result.data;
     }
 
-    public void addReferenceFileIdsInUse(Set<Integer> rc){
+    public void addReferenceFileIdsInUse(){
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
-            rc.add(msg.data.getFileId());
+            addInterest(msg);
         }
     }
 
@@ -172,10 +174,10 @@
     }
     
     void removeInterest(ReferenceRecord rr) {
-        
+        adapter.removeInterestInRecordFile(rr.data.getFileId());
     }
     
     void addInterest(ReferenceRecord rr) {
-        
+        adapter.addInterestInRecordFile(rr.data.getFileId());
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Thu Mar  1 11:24:17 2007
@@ -17,34 +17,39 @@
  */
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.MessageIdMarshaller;
-import org.apache.activemq.kaha.MessageMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStoreAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter
{
-
-	private MapContainer<Integer, Integer> fileReferences;
+    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
+   private static final String STORE_STATE = "store-state";
+   private static final String RECORD_REFERENCES = "record-references";
+    private MapContainer stateMap;
+	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+    private boolean storeValid;
 
 	public KahaReferenceStoreAdapter(File dir) throws IOException {
 		super(dir);
@@ -59,22 +64,63 @@
     }
     
     @Override
-    public void start() throws Exception {
-    	super.start();
-    	
+    public void start() throws Exception{
+        super.start();
         Store store=getStore();
-        fileReferences=store.getMapContainer("file-references");
-        fileReferences.setKeyMarshaller(new IntegerMarshaller());
-        fileReferences.setValueMarshaller(new IntegerMarshaller());
-        fileReferences.load();        
+        boolean empty=store.getMapContainerIds().isEmpty();
+        stateMap=store.getMapContainer("state",STORE_STATE);
+        stateMap.load();
+        if(!empty){
+            
+            AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
+            if(status!=null){
+                storeValid=status.get();
+            }
+           
+            if(storeValid){
+                if(stateMap.containsKey(RECORD_REFERENCES)){
+                    recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
+                }
+            }else {
+                /*
+                log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
+                Set<ContainerId> set = store.getListContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteListContainer(cid);
+                    }
+                }
+                set = store.getMapContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteMapContainer(cid);
+                    }
+                }
+                */
+                buildReferenceFileIdsInUse();
+            }
+            
+        }
+        stateMap.put(STORE_STATE,new AtomicBoolean());
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        stateMap.put(RECORD_REFERENCES,recordReferences);
+        stateMap.put(STORE_STATE,new AtomicBoolean(true));
+        super.stop();        
     }
     
     
+    public boolean isStoreValid() {
+        return storeValid;
+    }
+    
 
 	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException
{
 		ReferenceStore rc=(ReferenceStore)queues.get(destination);
         if(rc==null){
-            rc=new KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination);
+            rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
             messageStores.put(destination,rc);
 //            if(transactionStore!=null){
 //                rc=transactionStore.proxy(rc);
@@ -89,10 +135,10 @@
         if(rc==null){
             Store store=getStore();
             MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
-            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
             ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination);
+            rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
             messageStores.put(destination,rc);
 //            if(transactionStore!=null){
 //                rc=transactionStore.proxy(rc);
@@ -102,25 +148,26 @@
         return rc;
 	}
 
-	public Set<Integer> getReferenceFileIdsInUse() throws IOException {
+	public void buildReferenceFileIdsInUse() throws IOException {
 		
-		Set<Integer> rc = new HashSet<Integer>();
+        recordReferences = new HashMap<Integer,AtomicInteger>();
 		
 		Set<ActiveMQDestination> destinations = getDestinations();
 		for (ActiveMQDestination destination : destinations) {
 			if( destination.isQueue() ) {
 				KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue)
destination);
-				store.addReferenceFileIdsInUse(rc);
+				store.addReferenceFileIdsInUse();
 			} else {
 				KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic)
destination);
-				store.addReferenceFileIdsInUse(rc);
+				store.addReferenceFileIdsInUse();
 			}
-		}
-		
-		return rc;
-		
+        }		
 	}
     
+    public void sync() throws IOException {
+        getStore().force();
+    }
+    
     protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object
id,String containerName) throws IOException{
         Store store=getStore();
         MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
@@ -128,6 +175,33 @@
         container.setValueMarshaller(new ReferenceRecordMarshaller());        
         container.load();
         return container;
+    }
+    
+    synchronized void addInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr == null) {
+            rr = new AtomicInteger();
+            recordReferences.put(key,rr);
+        }
+        rr.incrementAndGet();
+    }
+    
+    synchronized void removeInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr != null && rr.decrementAndGet() <= 0) {
+            recordReferences.remove(key);
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
+     */
+    public Set<Integer> getReferenceFileIdsInUse() throws IOException{
+        return recordReferences.keySet();
     }
 
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Thu Mar  1 11:24:17 2007
@@ -86,8 +86,12 @@
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
+                        StoreEntry entry = ref.getAckEntry();
+                        entry = ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        entry = tsa.getMessageEntry();
+                        entry =messageContainer.refresh(entry);
+                        messageContainer.remove(entry);
                     }else{
                         ackContainer.update(ref.getAckEntry(),tsa);
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Mar  1 11:24:17 2007
@@ -39,9 +39,9 @@
     private Store store;
     protected Map subscriberMessages=new ConcurrentHashMap();
 
-    public KahaTopicReferenceStore(Store store,MapContainer messageContainer,ListContainer
ackContainer,
+    public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer
messageContainer,ListContainer ackContainer,
             MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        super(messageContainer,destination);
+        super(adapter,messageContainer,destination);
         this.store=store;
         this.ackContainer=ackContainer;
         subscriberContainer=subsContainer;
@@ -97,18 +97,18 @@
         return result.data;
     }
 
-    public void addReferenceFileIdsInUse(Set<Integer> rc){
+    public void addReferenceFileIdsInUse(){
         for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
             TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
             if(subAck.getCount()>0){
                 ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
-                rc.add(rr.data.getFileId());
+                addInterest(rr);
             }
         }
     }
 
     protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs");
+        ListContainer container=store.getListContainer(key,"topic-subs-references");
         Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
         TopicSubContainer tsc=new TopicSubContainer(container);
@@ -129,11 +129,15 @@
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ref.getAckEntry());
-                        ReferenceRecord rr = messageContainer.get(messageId);
-                        if (rr != null) {
-                        messageContainer.remove(tsa.getMessageEntry());
-                        removeInterest(rr);
+                        StoreEntry entry=ref.getAckEntry();
+                        entry=ackContainer.refresh(entry);
+                        ackContainer.remove(entry);
+                        ReferenceRecord rr=messageContainer.get(messageId);
+                        if(rr!=null){
+                            entry=tsa.getMessageEntry();
+                            entry=messageContainer.refresh(entry);
+                            messageContainer.remove(entry);
+                            removeInterest(rr);
                         }
                     }else{
                         ackContainer.update(ref.getAckEntry(),tsa);
@@ -261,7 +265,7 @@
                 }
             }
         }
-        store.deleteListContainer(key,"topic-subs");
+        store.deleteListContainer(key,"topic-subs-references");
     }
 
     protected String getSubscriptionKey(String clientId,String subscriberName){

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
Thu Mar  1 11:24:17 2007
@@ -37,10 +37,23 @@
     protected MapContainer container;
     protected Map testMap;
     protected static final int COUNT = 10;
+    
+    public void testBasicAllocations() throws Exception{
+        String key = "key";
+        Object value = testMap;
+        MapContainer test = store.getMapContainer("test","test");
+        test.put(key,value);
+        store.close();
+        store = getStore();
+        assertTrue(store.getMapContainerIds().isEmpty()==false);
+        test = store.getMapContainer("test","test");
+        assertEquals(value,test.get(key));
+        
+    }
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.size()'
      */
-    public void XtestSize() throws Exception {
+    public void testSize() throws Exception {
         container.putAll(testMap);
         assertTrue(container.size()==testMap.size());
     }
@@ -48,14 +61,14 @@
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()'
      */
-    public void XtestIsEmpty() throws Exception {
+    public void testIsEmpty() throws Exception {
        assertTrue(container.isEmpty());
     }
 
     /*
      * Test method for 'org.apache.activemq.kaha.MapContainer.clear()'
      */
-    public void XtestClear() throws Exception {
+    public void testClear() throws Exception {
         container.putAll(testMap);
         assertTrue(container.size()==testMap.size());
         container.clear();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java?view=diff&rev=513455&r1=513454&r2=513455
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java Thu
Mar  1 11:24:17 2007
@@ -100,16 +100,7 @@
         assertFalse(store.doesMapContainerExist(containerId));
     }
 
-    /*
-     * Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()'
-     */
-    public void testGetMapContainerIds()throws Exception {
-        String containerId = "test";
-        MapContainer container = store.getMapContainer(containerId);
-        Set set = store.getMapContainerIds();
-        assertTrue(set.contains(containerId));
-    }
-
+    
     
 
     /*
@@ -139,16 +130,7 @@
         assertFalse(store.doesListContainerExist(containerId));
     }
 
-    /*
-     * Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()'
-     */
-    public void testGetListContainerIds()throws Exception {
-        String containerId = "test";
-        ListContainer container = store.getListContainer(containerId);
-        Set set = store.getListContainerIds();
-        assertTrue(set.contains(containerId));
-    }
-    
+        
     public void testBasicAllocations() throws Exception{
         Map testMap = new HashMap();
         int count = 1000;



Mime
View raw message