activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r477166 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/kaha/impl/container/ main/java...
Date Mon, 20 Nov 2006 13:11:32 GMT
Author: rajdavies
Date: Mon Nov 20 05:11:26 2006
New Revision: 477166

URL: http://svn.apache.org/viewvc?view=rev&rev=477166
Log:
fixed some timing issues using the Kaha Persistence Adaptor

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Nov 20 05:11:26 2006
@@ -89,8 +89,8 @@
     
     synchronized public int incrementReferenceCount() {
         int rc = ++referenceCount;
-        if( persistent && rc==1 ) {
-            assert message == null;            
+        if( persistent && rc==1 && message == null) {
+                 
             try {
                 message = destinationStore.getMessage(messageId);
                 if( message == null ) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Mon Nov 20 05:11:26 2006
@@ -240,4 +240,9 @@
      * @see org.apache.activemq.kaha.IndexTypes
      */
     public void setIndexType(String type);
+    
+    /**
+     * @return true if the store has been initialized
+     */
+    public boolean isInitialized();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Nov 20 05:11:26 2006
@@ -20,13 +20,13 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activemq.kaha.IndexTypes;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
@@ -45,7 +45,6 @@
 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Store Implementation
@@ -74,7 +73,7 @@
     private String mode;
     private boolean initialized;
     private boolean logIndexChanges=false;
-    private boolean useAsyncWriter=true;
+    private boolean useAsyncWriter=false;
     private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
     private FileLock lock;
     private String indexType=IndexTypes.DISK_INDEX;
@@ -154,17 +153,21 @@
                     }
                 }
             }
-            log.info("Kaha Store deleted data directory "+directory);
+            String str=result?"successfully deleted":"failed to delete";
+            log.info("Kaha Store "+str+" data directory "+directory);
         }
-        initialized=false;
         return result;
     }
+    
+    public synchronized boolean isInitialized(){
+        return initialized;
+    }
 
     public boolean doesMapContainerExist(Object id) throws IOException{
         return doesMapContainerExist(id,DEFAULT_CONTAINER_NAME);
     }
 
-    public boolean doesMapContainerExist(Object id,String containerName) throws IOException{
+    public synchronized boolean doesMapContainerExist(Object id,String containerName) throws IOException{
         initialize();
         ContainerId containerId=new ContainerId();
         containerId.setKey(id);
@@ -204,7 +207,7 @@
         deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
     }
 
-    public void deleteMapContainer(Object id,String containerName) throws IOException{
+    public synchronized void deleteMapContainer(Object id,String containerName) throws IOException{
         initialize();
         ContainerId containerId=new ContainerId();
         containerId.setKey(id);
@@ -216,7 +219,7 @@
         }
     }
 
-    public Set getMapContainerIds() throws IOException{
+    public synchronized Set getMapContainerIds() throws IOException{
         initialize();
         Set set = new HashSet();
         for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
@@ -230,7 +233,7 @@
         return doesListContainerExist(id,DEFAULT_CONTAINER_NAME);
     }
 
-    public boolean doesListContainerExist(Object id,String containerName) throws IOException{
+    public synchronized boolean doesListContainerExist(Object id,String containerName) throws IOException{
         initialize();
         ContainerId containerId=new ContainerId();
         containerId.setKey(id);
@@ -271,7 +274,7 @@
         deleteListContainer(id,DEFAULT_CONTAINER_NAME);
     }
 
-    public void deleteListContainer(Object id,String containerName) throws IOException{
+    public synchronized void deleteListContainer(Object id,String containerName) throws IOException{
         initialize();
         ContainerId containerId=new ContainerId();
         containerId.setKey(id);
@@ -283,7 +286,7 @@
         }
     }
 
-    public Set getListContainerIds() throws IOException{
+    public synchronized Set getListContainerIds() throws IOException{
         initialize();
         Set set = new HashSet();
         for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
@@ -310,7 +313,7 @@
         return this.mapsContainer;
     }
 
-    public DataManager getDataManager(String name) throws IOException{
+    public synchronized DataManager getDataManager(String name) throws IOException{
         DataManager dm=(DataManager)dataManagers.get(name);
         if(dm==null){
             dm=new DataManager(directory,name);
@@ -322,7 +325,7 @@
         return dm;
     }
 
-    public IndexManager getIndexManager(DataManager dm,String name) throws IOException{
+    public synchronized IndexManager getIndexManager(DataManager dm,String name) throws IOException{
         IndexManager im=(IndexManager)indexManagers.get(name);
         if(im==null){
             im=new IndexManager(directory,name,mode,logIndexChanges?dm:null);
@@ -343,18 +346,18 @@
         });
     }
 
-    public boolean isLogIndexChanges(){
+    public synchronized boolean isLogIndexChanges(){
         return logIndexChanges;
     }
 
-    public void setLogIndexChanges(boolean logIndexChanges){
+    public synchronized void setLogIndexChanges(boolean logIndexChanges){
         this.logIndexChanges=logIndexChanges;
     }
 
     /**
      * @return the maxDataFileLength
      */
-    public long getMaxDataFileLength(){
+    public synchronized long getMaxDataFileLength(){
         return maxDataFileLength;
     }
 
@@ -362,7 +365,7 @@
      * @param maxDataFileLength
      *            the maxDataFileLength to set
      */
-    public void setMaxDataFileLength(long maxDataFileLength){
+    public synchronized void setMaxDataFileLength(long maxDataFileLength){
         this.maxDataFileLength=maxDataFileLength;
     }
 
@@ -370,7 +373,7 @@
      * @see org.apache.activemq.kaha.IndexTypes
      * @return the default index type
      */
-    public String getIndexType(){
+    public synchronized String getIndexType(){
         return indexType;
     }
 
@@ -380,7 +383,7 @@
      * @param type
      * @see org.apache.activemq.kaha.IndexTypes
      */
-    public void setIndexType(String type){
+    public synchronized void setIndexType(String type){
         if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
             throw new RuntimeException("Unknown IndexType: "+type);
         }
@@ -444,7 +447,7 @@
         }
     }
 
-    private void unlock() throws IOException{
+    private synchronized void unlock() throws IOException{
         if(!disableLocking&&directory!=null){
             Set set=getVmLockSet();
             synchronized(set){
@@ -485,11 +488,11 @@
      * scans the directory and builds up the IndexManager and DataManager
      * @throws IOException 
      */
-    private void generateInterestInListDataFiles() throws IOException {
-        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
-            ContainerId id = (ContainerId)i.next();
-            DataManager dm = getDataManager(id.getDataContainerName());
-            IndexManager im = getIndexManager(dm,id.getDataContainerName());
+    private void generateInterestInListDataFiles() throws IOException{
+        for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
+            ContainerId id=(ContainerId)i.next();
+            DataManager dm=getDataManager(id.getDataContainerName());
+            IndexManager im=getIndexManager(dm,id.getDataContainerName());
             IndexItem theRoot=listsContainer.getRoot(im,id);
             long nextItem=theRoot.getNextItem();
             while(nextItem!=Item.POSITION_NOT_SET){
@@ -499,13 +502,13 @@
                 dm.addInterestInFile(item.getValueFile());
                 nextItem=item.getNextItem();
             }
-            
         }
     }
     
     /**
      * scans the directory and builds up the IndexManager and DataManager
-     * @throws IOException 
+     * 
+     * @throws IOException
      */
     private void generateInterestInMapDataFiles() throws IOException {
         for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Mon Nov 20 05:11:26 2006
@@ -305,7 +305,7 @@
         return result;
     }
 
-    protected void remove(IndexItem item){
+    protected synchronized void remove(IndexItem item){
         IndexItem prev=indexList.getPrevEntry(item);
         IndexItem next=indexList.getNextEntry(item);
         indexList.remove(item);
@@ -424,7 +424,7 @@
      * 
      * @see java.util.List#set(int, E)
      */
-    public Object set(int index,Object element){
+    public synchronized Object set(int index,Object element){
         load();
         Object result=null;
         IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
@@ -438,7 +438,7 @@
         return result;
     }
 
-    protected IndexItem internalSet(int index,Object element){
+    protected synchronized IndexItem internalSet(int index,Object element){
         IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
         IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
         IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
@@ -460,7 +460,7 @@
         itemAdded(item,index,element);
     }
 
-    protected StoreEntry internalAddLast(Object o){
+    protected synchronized StoreEntry internalAddLast(Object o){
         load();
         IndexItem item=writeLast(o);
         indexList.addLast(item);
@@ -468,7 +468,7 @@
         return item;
     }
 
-    protected StoreEntry internalAddFirst(Object o){
+    protected synchronized StoreEntry internalAddFirst(Object o){
         load();
         IndexItem item=writeFirst(o);
         indexList.addFirst(item);
@@ -476,7 +476,7 @@
         return item;
     }
 
-    protected IndexItem internalAdd(int index,Object element){
+    protected synchronized IndexItem internalAdd(int index,Object element){
         load();
         IndexItem item=insert(index,element);
         indexList.add(index,item);
@@ -484,7 +484,7 @@
         return item;
     }
 
-    protected StoreEntry internalGet(int index){
+    protected synchronized StoreEntry internalGet(int index){
         load();
         if(index>=0&&index<indexList.size()){
             return indexList.get(index);
@@ -646,7 +646,7 @@
      * @param object
      * @see org.apache.activemq.kaha.ListContainer#update(org.apache.activemq.kaha.StoreEntry, java.lang.Object)
      */
-    public void update(StoreEntry entry,Object object){
+    public synchronized void update(StoreEntry entry,Object object){
         try{
             dataManager.updateItem(entry.getValueDataItem(),marshaller,object);
         }catch(IOException e){
@@ -733,7 +733,7 @@
         return indexList.getEntry(entry);
     }
 
-    protected IndexItem writeLast(Object value){
+    protected synchronized IndexItem writeLast(Object value){
         IndexItem index=null;
         try{
             if(value!=null){
@@ -760,7 +760,7 @@
         return index;
     }
 
-    protected IndexItem writeFirst(Object value){
+    protected synchronized IndexItem writeFirst(Object value){
         IndexItem index=null;
         try{
             if(value!=null){
@@ -786,7 +786,7 @@
         return index;
     }
 
-    protected IndexItem insert(int insertPos,Object value){
+    protected synchronized IndexItem insert(int insertPos,Object value){
         IndexItem index=null;
         try{
             if(value!=null){
@@ -823,7 +823,7 @@
         return index;
     }
 
-    protected Object getValue(StoreEntry item){
+    protected synchronized Object getValue(StoreEntry item){
         Object result=null;
         if(item!=null){
             try{
@@ -858,7 +858,7 @@
         return result.toString();
     }
 
-    protected void itemAdded(IndexItem item,int pos,Object value){
+    protected synchronized void itemAdded(IndexItem item,int pos,Object value){
         if(cacheEnabled){
             int cachePosition=pos-offset;
             // if pos is before the cache offset
@@ -882,7 +882,7 @@
         }
     }
 
-    protected void itemRemoved(int pos){
+    protected synchronized void itemRemoved(int pos){
         if(cacheEnabled){
             int lastPosition=offset+cacheList.size()-1;
             int cachePosition=pos-offset;
@@ -900,7 +900,7 @@
         }
     }
 
-    protected Object getCachedItem(int pos){
+    protected synchronized Object getCachedItem(int pos){
         Object result=null;
         if(cacheEnabled) {
         int cachePosition=pos-offset;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Mon Nov 20 05:11:26 2006
@@ -293,7 +293,7 @@
         return result;
     }
 
-    protected void remove(IndexItem item){
+    protected synchronized void remove(IndexItem item){
         Object key=getKey(item);
         if(key!=null){
             remove(key);
@@ -321,7 +321,7 @@
      * @param value
      * @return the StoreEntry associated with the entry
      */
-    public StoreEntry place(Object key, Object value) {
+    public synchronized StoreEntry place(Object key, Object value) {
         load();
         if(indexMap.containsKey(key)){
             remove(key);
@@ -336,7 +336,7 @@
      * Remove an Entry from ther Map
      * @param entry
      */
-    public void remove(StoreEntry entry) {
+    public synchronized void remove(StoreEntry entry) {
         load();
         IndexItem item=(IndexItem)entry;
         if(item!=null){
@@ -393,7 +393,7 @@
     }
     
 
-    protected Set getInternalKeySet(){
+    protected synchronized Set getInternalKeySet(){
         return new HashSet(indexMap.keySet());
     }
 
@@ -401,7 +401,7 @@
         return indexList;
     }
 
-    protected IndexItem write(Object key,Object value){
+    protected synchronized IndexItem write(Object key,Object value){
         IndexItem index=null;
         try{
             if(key!=null){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java Mon Nov 20 05:11:26 2006
@@ -54,7 +54,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
 	 */
-    public byte readDataItemSize(DataItem item) throws IOException {
+    public synchronized byte readDataItemSize(DataItem item) throws IOException {
     	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
     	if( asyncWrite!= null ) {
     		item.setSize(asyncWrite.location.getSize());
@@ -73,7 +73,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
 	 */
-    public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+    public synchronized Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
     	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
     	if( asyncWrite!= null ) {
             ByteArrayInputStream stream = new ByteArrayInputStream(asyncWrite.data, DataManager.ITEM_HEAD_SIZE, item.getSize());

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java Mon Nov 20 05:11:26 2006
@@ -135,7 +135,7 @@
      * @return
      * @throws IOException
      */
-    public DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
+    public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
     	// We may need to slow down if we are pounding the async thread too 
     	// hard..
     	try {
@@ -177,37 +177,36 @@
     /**
      * 
      */
-    public void updateItem(final DataItem item, Marshaller marshaller, Object payload, byte type) throws IOException {
-    	// We may need to slow down if we are pounding the async thread too 
-    	// hard..
-    	try {
-			usage.waitForSpace();
-		} catch (InterruptedException e) {
-			throw new InterruptedIOException();
-		}
-
-		//Write the packet our internal buffer.
-    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
-        buffer.position(DataManager.ITEM_HEAD_SIZE);
-        marshaller.writePayload(payload,buffer);
-        final int size=buffer.size();
-        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
-        buffer.reset();
-        buffer.writeByte(type);
-        buffer.writeInt(payloadSize);        
-        item.setSize(payloadSize);
-        final DataFile  dataFile = dataManager.getDataFile(item);                
-        
-        usage.increaseUsage(size);
-        
-    	WriteCommand write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(), latchAssignedToNewWrites);
-    	
-        // Equeue the write to an async thread.
-        synchronized(enqueueMutex) {
-        	dataFile.setWriterData(latchAssignedToNewWrites);
-        	enqueue(write);
+    public void updateItem(final DataItem item,Marshaller marshaller,Object payload,byte type) throws IOException{
+        // We may need to slow down if we are pounding the async thread too
+        // hard..
+        try{
+            usage.waitForSpace();
+        }catch(InterruptedException e){
+            throw new InterruptedIOException();
+        }
+        synchronized(enqueueMutex){
+            // Write the packet our internal buffer.
+            final DataByteArrayOutputStream buffer=new DataByteArrayOutputStream();
+            buffer.position(DataManager.ITEM_HEAD_SIZE);
+            marshaller.writePayload(payload,buffer);
+            final int size=buffer.size();
+            int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+            buffer.reset();
+            buffer.writeByte(type);
+            buffer.writeInt(payloadSize);
+            item.setSize(payloadSize);
+            final DataFile dataFile=dataManager.getDataFile(item);
+            usage.increaseUsage(size);
+            WriteCommand write=new WriteCommand(item,dataFile.getRandomAccessFile(),buffer.getData(),
+                    latchAssignedToNewWrites);
+            // Equeue the write to an async thread.
+            synchronized(enqueueMutex){
+                dataFile.setWriterData(latchAssignedToNewWrites);
+                enqueue(write);
+            }
+            inflightWrites.put(new WriteKey(item),write);
         }
-    	inflightWrites.put(new WriteKey(item), write);
     }
 
     private void enqueue(Object command) throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java Mon Nov 20 05:11:26 2006
@@ -97,7 +97,7 @@
         return name;
     }
 
-    DataFile findSpaceForData(DataItem item) throws IOException{
+    synchronized DataFile  findSpaceForData(DataItem item) throws IOException{
         if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>maxFileLength)){
             int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1;
             if(currentWriteFile!=null&&currentWriteFile.isUnused()){
@@ -125,15 +125,15 @@
         return getReader().readItem(marshaller,item);
     }
 
-    public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
+    public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
         return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
     }
     
-    public StoreLocation storeRedoItem(Object payload) throws IOException{
+    public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
         return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
     }
     
-    public void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
+    public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
         getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
     }
 
@@ -296,7 +296,7 @@
 		}
 		return reader;
 	}
-	protected DataFileReader createReader() {
+	protected synchronized DataFileReader createReader() {
 		if( useAsyncWriter ) {
 			return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
 		} else {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Mon Nov 20 05:11:26 2006
@@ -45,7 +45,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
 	 */
-    public byte readDataItemSize(DataItem item) throws IOException {
+    public synchronized byte readDataItemSize(DataItem item) throws IOException {
         RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
         file.seek(item.getOffset()); // jump to the size field
         byte rc = file.readByte();
@@ -56,7 +56,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
 	 */
-    public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+    public synchronized  Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
         RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
         
         // TODO: we could reuse the buffer in dataIn if it's big enough to avoid

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java Mon Nov 20 05:11:26 2006
@@ -24,7 +24,7 @@
  * 
  * @version $Revision$
  */
-public class DiskIndexLinkedList implements IndexLinkedList{
+public  class DiskIndexLinkedList implements IndexLinkedList{
     protected IndexManager indexManager;
     protected transient IndexItem root;
     protected transient IndexItem last;
@@ -33,12 +33,12 @@
     /**
      * Constructs an empty list.
      */
-    public DiskIndexLinkedList(IndexManager im,IndexItem header){
+    public  DiskIndexLinkedList(IndexManager im,IndexItem header){
         this.indexManager=im;
         this.root=header;
     }
 
-    public IndexItem getRoot(){
+    public synchronized IndexItem getRoot(){
         return root;
     }
 
@@ -51,7 +51,7 @@
      * 
      * @return the first element in this list.
      */
-    public IndexItem getFirst(){
+    public synchronized IndexItem getFirst(){
         if(size==0)
             return null;
         return getNextEntry(root);
@@ -62,7 +62,7 @@
      * 
      * @return the last element in this list.
      */
-    public IndexItem getLast(){
+    public synchronized IndexItem getLast(){
         if(size==0)
             return null;
         return last;
@@ -73,7 +73,7 @@
      * 
      * @return the first element from this list.
      */
-    public StoreEntry removeFirst(){
+    public synchronized StoreEntry removeFirst(){
         if(size==0){
             return null;
         }
@@ -87,7 +87,7 @@
      * 
      * @return the last element from this list.
      */
-    public Object removeLast(){
+    public synchronized Object removeLast(){
         if(size==0)
             return null;
         StoreEntry result=last;
@@ -100,7 +100,7 @@
      * 
      * @param o the element to be inserted at the beginning of this list.
      */
-    public void addFirst(IndexItem item){
+    public synchronized void addFirst(IndexItem item){
         if(size==0){
             last=item;
         }
@@ -113,7 +113,7 @@
      * 
      * @param o the element to be inserted at the end of this list.
      */
-    public void addLast(IndexItem item){
+    public synchronized void addLast(IndexItem item){
         size++;
         last=item;
     }
@@ -123,7 +123,7 @@
      * 
      * @return the number of elements in this list.
      */
-    public int size(){
+    public synchronized int size(){
         return size;
     }
 
@@ -132,7 +132,7 @@
      * 
      * @return true if there are no elements in the list
      */
-    public boolean isEmpty(){
+    public synchronized boolean isEmpty(){
         return size==0;
     }
 
@@ -142,7 +142,7 @@
      * @param o element to be appended to this list.
      * @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
      */
-    public boolean add(IndexItem item){
+    public synchronized boolean add(IndexItem item){
         addLast(item);
         return true;
     }
@@ -150,7 +150,7 @@
     /**
      * Removes all of the elements from this list.
      */
-    public void clear(){
+    public synchronized void clear(){
         last=null;
         size=0;
     }
@@ -164,7 +164,7 @@
      * 
      * @throws IndexOutOfBoundsException if the specified index is is out of range (<tt>index &lt; 0 || index &gt;= size()</tt>).
      */
-    public IndexItem get(int index){
+    public synchronized IndexItem get(int index){
         return entry(index);
     }
 
@@ -177,7 +177,7 @@
      * 
      * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index &lt; 0 || index &gt; size()</tt>).
      */
-    public void add(int index,IndexItem element){
+    public synchronized void add(int index,IndexItem element){
         if(index==size-1){
             last=element;
         }
@@ -193,7 +193,7 @@
      * 
      * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index &lt; 0 || index &gt;= size()</tt>).
      */
-    public Object remove(int index){
+    public synchronized Object remove(int index){
         IndexItem e=entry(index);
         remove(e);
         return e;
@@ -225,7 +225,7 @@
      * @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
      *         contain this element.
      */
-    public int indexOf(StoreEntry o){
+    public synchronized int indexOf(StoreEntry o){
         int index=0;
         if(size>0){
             for(IndexItem e=getNextEntry(root);e!=null;e=getNextEntry(e)){
@@ -244,7 +244,7 @@
      * @param entry
      * @return next entry
      */
-    public IndexItem getNextEntry(IndexItem current){
+    public synchronized IndexItem getNextEntry(IndexItem current){
         IndexItem result=null;
         if(current!=null&&current.getNextItem()>=0){
             try{
@@ -266,7 +266,7 @@
      * @param entry
      * @return prev entry
      */
-    public IndexItem getPrevEntry(IndexItem current){
+    public synchronized IndexItem getPrevEntry(IndexItem current){
         IndexItem result=null;
         if(current!=null&&current.getPreviousItem()>=0){
             try{
@@ -282,7 +282,7 @@
         return result;
     }
     
-   public  StoreEntry getEntry(StoreEntry current){
+   public synchronized  StoreEntry getEntry(StoreEntry current){
         StoreEntry result=null;
         if(current != null && current.getOffset() >= 0){
             try{
@@ -302,7 +302,7 @@
     * Update the indexes of a StoreEntry
     * @param current
     */
-   public StoreEntry refreshEntry(StoreEntry current){
+   public synchronized StoreEntry refreshEntry(StoreEntry current){
        StoreEntry result=null;
        if(current != null && current.getOffset() >= 0){
            try{
@@ -318,7 +318,7 @@
        return result;
    }
 
-    public void remove(IndexItem e){
+    public synchronized void remove(IndexItem e){
         if(e==root||e.equals(root))
             return;
         if(e==last||e.equals(last)){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Mon Nov 20 05:11:26 2006
@@ -81,7 +81,11 @@
     }
     
     public synchronized void updateIndexes(IndexItem index) throws IOException{
+        try {
         writer.updateIndexes(index);
+        }catch(Throwable e) {
+            log.error(name + " GORT ERROR! ",e);
+        }
     }
 
     public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java Mon Nov 20 05:11:26 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.store.jdbc.Statements;
 import org.apache.activemq.store.journal.JournalPersistenceAdapter;
 import org.apache.activemq.store.journal.QuickJournalPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,9 @@
         if( useQuickJournal ) {
             return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
         }  else {
+            KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("amqstore")); 
             return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
+            //return new JournalPersistenceAdapter(getJournal(), adaptor, getTaskRunnerFactory());
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Nov 20 05:11:26 2006
@@ -84,12 +84,6 @@
     public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
             String subscriptionName) throws SQLException,IOException;
 
-    public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
-            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-
-    public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
-            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-    
     public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
     
     public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Nov 20 05:11:26 2006
@@ -65,7 +65,6 @@
     private String nextDurableSubscriberMessageStatement;
     private String durableSubscriberMessageCountStatement;
     private String nextDurableSubscriberMessageIdStatement;
-    private String prevDurableSubscriberMessageIdStatement;
     private String destinationMessageCountStatement;
     private String findNextMessagesStatement;
     private boolean useLockCreateWhereClause;
@@ -253,46 +252,7 @@
         return durableSubscriberMessageCountStatement;
     }
     
-    /**
-     * @return the nextDurableSubscriberMessageIdStatement
-     */
-    public String getNextDurableSubscriberMessageIdStatement(){
-        if (nextDurableSubscriberMessageIdStatement==null) {
-            nextDurableSubscriberMessageIdStatement =
-                "SELECT M.ID FROM " + getFullMessageTableName() + " M, "
-                 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-                + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
-        }
-        return nextDurableSubscriberMessageIdStatement;
-    }
-    
-    /**
-     * @return the prevDurableSubscriberMessageIdStatement
-     */
-   /*
-    public String getPrevDurableSubscriberMessageIdStatement(){
-        if(prevDurableSubscriberMessageIdStatement==null) {
-            prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
-            + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-            + " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID ";
-        }
-        return prevDurableSubscriberMessageIdStatement;
-    }
-    */
-   
-   
-    public String getPrevDurableSubscriberMessageIdStatement(){
-        if(prevDurableSubscriberMessageIdStatement==null) {
-            prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M "
-            + " WHERE M.CONTAINER=? "
-            + "  AND M.ID <?" + "  ORDER BY M.ID DESC ";
-        }
-        return prevDurableSubscriberMessageIdStatement;
-    }
-    
-
-
-    public String getFindAllDestinationsStatement() {
+   public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
             findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
         }
@@ -643,15 +603,7 @@
         this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
     }
 
-    
-   
-    
-    /**
-     * @param prevDurableSubscriberMessageIdStatement the prevDurableSubscriberMessageIdStatement to set
-     */
-    public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
-        this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
-    }
+     
     
     /**
      * @param findNextMessagesStatement the findNextMessagesStatement to set

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Nov 20 05:11:26 2006
@@ -613,71 +613,7 @@
             close(rs);
             close(s);
         }
-    }
-
-    /**
-     * @param c
-     * @param destination
-     * @param clientId
-     * @param subscriberName
-     * @param id
-     * @return the previous Id
-     * @throws Exception 
-     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetPrevDurableSubscriberMessageStatement(org.apache.activemq.store.jdbc.TransactionContext,
-     *      org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
-     */
-    public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
-            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
-        PreparedStatement s=null;
-        ResultSet rs=null;
-        try{
-            s=c.getConnection().prepareStatement(statements.getPrevDurableSubscriberMessageIdStatement());
-            s.setString(1,destination.getQualifiedName());
-            s.setLong(2,id);
-            rs=s.executeQuery();
-            if (rs.next()) {
-            listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
-            }
-            listener.finished();
-           
-        }finally{
-            close(rs);
-            close(s);
-        }
-    }
-
-    /**
-     * @param c
-     * @param destination
-     * @param clientId
-     * @param subscriberName
-     * @param id
-     * @return the next id
-     * @throws SQLException
-     * @throws IOException
-     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetNextDurableSubscriberMessageIdStatement(org.apache.activemq.store.jdbc.TransactionContext,
-     *      org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
-     */
-    public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
-            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
-        PreparedStatement s=null;
-        ResultSet rs=null;
-        try{
-            s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageIdStatement());
-            s.setString(1,destination.getQualifiedName());
-            s.setLong(2,id);
-            rs=s.executeQuery();
-            if (rs.next()) {
-            listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
-            }
-            listener.finished();
-           
-        }finally{
-            close(rs);
-            close(s);
-        }
-    }
-    
+    }    
    
     public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
         PreparedStatement s=null;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Nov 20 05:11:26 2006
@@ -65,14 +65,14 @@
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        StoreEntry item = messageContainer.placeLast(message);
-        // TODO: we should do the following but it is not need if the message is being added within a persistence transaction
-        // but since I can't tell if one is running right now.. I'll leave this out for now. 
-//        if( message.isResponseRequired() ) {
-//        	messageContainer.force();
-//        }
+        StoreEntry item=messageContainer.placeLast(message);
+        // TODO: we should do the following but it is not need if the message is being added within a persistence
+        // transaction
+        // but since I can't tell if one is running right now.. I'll leave this out for now.
+        // if( message.isResponseRequired() ) {
+        // messageContainer.force();
+        // }
         cache.put(message.getMessageId(),item);
-        
     }
 
     public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
@@ -84,6 +84,7 @@
         Message result=null;
         StoreEntry entry=(StoreEntry)cache.get(identity);
         if(entry!=null){
+            entry = messageContainer.refresh(entry);
             result = (Message)messageContainer.get(entry);
         }else{    
             for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
@@ -109,6 +110,7 @@
     public synchronized void removeMessage(MessageId msgId) throws IOException{
         StoreEntry entry=(StoreEntry)cache.remove(msgId);
         if(entry!=null){
+            entry = messageContainer.refresh(entry);
             messageContainer.remove(entry);
         }else{
             for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Nov 20 05:11:26 2006
@@ -74,7 +74,7 @@
         Set rc=new HashSet();
         try{
             Store store=getStore();
-            for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+            for(Iterator i=store.getListContainerIds().iterator();i.hasNext();){
                 Object obj=i.next();
                 if(obj instanceof ActiveMQDestination){
                     rc.add(obj);
@@ -161,7 +161,11 @@
 
     public void deleteAllMessages() throws IOException{
         if(theStore!=null){
-            theStore.delete();
+            if(theStore.isInitialized()){
+                theStore.clear();
+            }else{
+                theStore.delete();
+            }
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java Mon Nov 20 05:11:26 2006
@@ -29,7 +29,7 @@
  */
 public class LoadTest extends TestCase{
     static final int COUNT=10000;
-    static final int NUM_LOADERS=2;
+    static final int NUM_LOADERS=5;
     protected String name="load.db";
     protected KahaStore store;
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java Mon Nov 20 05:11:26 2006
@@ -91,6 +91,7 @@
             startLoad=System.currentTimeMillis();
             value = getData(2048);
             for(int i=0;i<count;i++){
+                //System.out.println(this + " Container size = " + container.size());
                 String key="key:"+i;
                 container.put(key,value);
             }



Mime
View raw message