activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r504999 [1/2] - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/ kaha/impl/container/ kaha/impl/index/hash/ store/ store/amq/ store/kahadaptor/
Date Thu, 08 Feb 2007 18:46:56 GMT
Author: rajdavies
Date: Thu Feb  8 10:46:54 2007
New Revision: 504999

URL: http://svn.apache.org/viewvc?view=rev&rev=504999
Log:
amq store - just like the quick store (cut n' paste) - difference is optimizations for recoverying in flight messages

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Thu Feb  8 10:46:54 2007
@@ -208,4 +208,11 @@
     * @return a refreshed StoreEntry
     */
    public StoreEntry refresh(StoreEntry entry);
+   
+   /**
+    * Get the StoreEntry associated with the key
+    * @param key
+    * @return the StoreEntry
+    */
+   public StoreEntry getEntry(K key);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Thu Feb  8 10:46:54 2007
@@ -193,6 +193,23 @@
         }
         return result;
     }
+    
+    /**
+     * Get the StoreEntry associated with the key
+     * @param key
+     * @return the StoreEntry
+     */
+    public synchronized StoreEntry getEntry(Object key) {
+        load();
+        StoreEntry item=null;
+        try{
+            item=index.get(key);
+        }catch(IOException e){
+            log.error("Failed trying to get key: "+key,e);
+            throw new RuntimeException(e);
+        }
+        return item;
+    }
 
     /*
      * (non-Javadoc)

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java Thu Feb  8 10:46:54 2007
@@ -138,6 +138,9 @@
                 }
             }
             if(!replace){
+                if (low > size()) {
+                   System.out.println("SIZE() " + size() + " low = " + low); 
+                }
                 addHashEntry(low,newEntry);
                 size++;
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Thu Feb  8 10:46:54 2007
@@ -98,21 +98,21 @@
      * 
      * @param marshaller
      */
-    public void setKeyMarshaller(Marshaller marshaller){
+    public synchronized void setKeyMarshaller(Marshaller marshaller){
         this.keyMarshaller=marshaller;
     }
 
     /**
      * @return the keySize
      */
-    public int getKeySize(){
+    public  synchronized int getKeySize(){
         return this.keySize;
     }
 
     /**
      * @param keySize the keySize to set
      */
-    public void setKeySize(int keySize){
+    public synchronized void setKeySize(int keySize){
         this.keySize=keySize;
         if(loaded.get()){
             throw new RuntimeException("Pages already loaded - can't reset key size");
@@ -122,14 +122,14 @@
     /**
      * @return the pageSize
      */
-    public int getPageSize(){
+    public synchronized int getPageSize(){
         return this.pageSize;
     }
 
     /**
      * @param pageSize the pageSize to set
      */
-    public void setPageSize(int pageSize){
+    public synchronized void setPageSize(int pageSize){
         if(loaded.get()&&pageSize!=this.pageSize){
             throw new RuntimeException("Pages already loaded - can't reset page size");
         }
@@ -140,38 +140,38 @@
     /**
      * @return the enablePageCaching
      */
-    public boolean isEnablePageCaching(){
+    public synchronized boolean isEnablePageCaching(){
         return this.enablePageCaching;
     }
 
     /**
      * @param enablePageCaching the enablePageCaching to set
      */
-    public void setEnablePageCaching(boolean enablePageCaching){
+    public synchronized void setEnablePageCaching(boolean enablePageCaching){
         this.enablePageCaching=enablePageCaching;
     }
 
     /**
      * @return the pageCacheSize
      */
-    public int getPageCacheSize(){
+    public synchronized int getPageCacheSize(){
         return this.pageCacheSize;
     }
 
     /**
      * @param pageCacheSize the pageCacheSize to set
      */
-    public void setPageCacheSize(int pageCacheSize){
+    public synchronized void setPageCacheSize(int pageCacheSize){
         this.pageCacheSize=pageCacheSize;
         pageCache.setMaxCacheSize(pageCacheSize);
     }
 
 
-    public boolean isTransient(){
+    public synchronized  boolean isTransient(){
         return false;
     }
 
-    public void load(){
+    public synchronized void load(){
         if(loaded.compareAndSet(false,true)){
             keysPerPage=pageSize/keySize;
             dataIn=new DataByteArrayInputStream();
@@ -211,7 +211,7 @@
         }
     }
 
-    public void unload() throws IOException{
+    public synchronized void unload() throws IOException{
         if(loaded.compareAndSet(true,false)){
             if(indexFile!=null){
                 indexFile.close();
@@ -222,7 +222,7 @@
         }
     }
 
-    public void store(Object key,StoreEntry value) throws IOException{
+    public synchronized void store(Object key,StoreEntry value) throws IOException{
         load();
         HashEntry entry=new HashEntry();
         entry.setKey((Comparable)key);
@@ -230,7 +230,7 @@
         getBin(key).put(entry);
     }
 
-    public StoreEntry get(Object key) throws IOException{
+    public synchronized StoreEntry get(Object key) throws IOException{
         load();
         HashEntry entry=new HashEntry();
         entry.setKey((Comparable)key);
@@ -238,7 +238,7 @@
         return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
     }
 
-    public StoreEntry remove(Object key) throws IOException{
+    public synchronized StoreEntry remove(Object key) throws IOException{
         load();
         HashEntry entry=new HashEntry();
         entry.setKey((Comparable)key);
@@ -246,18 +246,18 @@
         return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
     }
 
-    public boolean containsKey(Object key) throws IOException{
+    public synchronized boolean containsKey(Object key) throws IOException{
         return get(key)!=null;
     }
 
-    public void clear() throws IOException{
+    public synchronized  void clear() throws IOException{
         unload();
         delete();
         openIndexFile();
         load();
     }
 
-    public void delete() throws IOException{
+    public synchronized void delete() throws IOException{
         unload();
         if(file.exists()){
             boolean result=file.delete();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java Thu Feb  8 10:46:54 2007
@@ -192,6 +192,8 @@
         
 
     void addHashEntry(int index,HashEntry entry) throws IOException{
+        //index = index >= 0 ? index : 0;
+        //index = (index == 0 || index< size()) ? index : size()-1;
         hashIndexEntries.add(index,entry);
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Feb  8 10:46:54 2007
@@ -72,4 +72,11 @@
      */
     public ReferenceData getMessageReference(MessageId identity) throws IOException;
     
+    /**
+     * @return true if it supports external batch control
+     */
+    public boolean supportsExternalBatchControl();
+    
+    public void setBatch(MessageId startAfter);
+    
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Thu Feb  8 10:46:54 2007
@@ -0,0 +1,496 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.TransactionTemplate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.14 $
+ */
+public class AMQMessageStore implements MessageStore{
+
+    private static final Log log=LogFactory.getLog(AMQMessageStore.class);
+    protected final AMQPersistenceAdapter peristenceAdapter;
+    protected final AMQTransactionStore transactionStore;
+    protected final ReferenceStore referenceStore;
+    protected final ActiveMQDestination destination;
+    protected final TransactionTemplate transactionTemplate;
+    private LinkedHashMap<MessageId,ReferenceData> messages=new LinkedHashMap<MessageId,ReferenceData>();
+    private ArrayList<MessageAck> messageAcks=new ArrayList<MessageAck>();
+    /** A MessageStore that we can use to retrieve messages quickly. */
+    private LinkedHashMap<MessageId,ReferenceData> cpAddedMessageIds;
+    protected Location lastLocation;
+    protected Location lastWrittenLocation;
+    protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
+    protected final TaskRunner asyncWriteTask;
+    protected CountDownLatch flushLatch;
+    private final AtomicReference<Location> mark=new AtomicReference<Location>();
+
+    public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
+        this.peristenceAdapter=adapter;
+        this.transactionStore=adapter.getTransactionStore();
+        this.referenceStore=referenceStore;
+        this.destination=destination;
+        this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext());
+        asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
+
+            public boolean iterate(){
+                asyncWrite();
+                return false;
+            }
+        },"Checkpoint: "+destination);
+    }
+
+    public void setUsageManager(UsageManager usageManager){
+        referenceStore.setUsageManager(usageManager);
+    }
+
+    /**
+     * Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it
+     * is doing.
+     */
+    public void addMessage(ConnectionContext context,final Message message) throws IOException{
+        final MessageId id=message.getMessageId();
+        final boolean debug=log.isDebugEnabled();
+        final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
+        if(!context.isInTransaction()){
+            if(debug)
+                log.debug("Journalled message add for: "+id+", at: "+location);
+            addMessage(message,location);
+        }else{
+            if(debug)
+                log.debug("Journalled transacted message add for: "+id+", at: "+location);
+            synchronized(this){
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.addMessage(this,message,location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+
+                public void afterCommit() throws Exception{
+                    if(debug)
+                        log.debug("Transacted message add commit for: "+id+", at: "+location);
+                    synchronized(AMQMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                        addMessage(message,location);
+                    }
+                }
+
+                public void afterRollback() throws Exception{
+                    if(debug)
+                        log.debug("Transacted message add rollback for: "+id+", at: "+location);
+                    synchronized(AMQMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+    }
+
+    private void addMessage(final Message message,final Location location) throws InterruptedIOException{
+        ReferenceData data=new ReferenceData();
+        data.setExpiration(message.getExpiration());
+        data.setFileId(location.getDataFileId());
+        data.setOffset(location.getOffset());
+        synchronized(this){
+            lastLocation=location;
+            messages.put(message.getMessageId(),data);
+        }
+        try{
+            asyncWriteTask.wakeup();
+        }catch(InterruptedException e){
+            throw new InterruptedIOException();
+        }
+    }
+
+    public boolean replayAddMessage(ConnectionContext context,Message message,Location location){
+        MessageId id=message.getMessageId();
+        try{
+            // Only add the message if it has not already been added.
+            ReferenceData data=referenceStore.getMessageReference(id);
+            if(data==null){
+                data=new ReferenceData();
+                data.setExpiration(message.getExpiration());
+                data.setFileId(location.getDataFileId());
+                data.setOffset(location.getOffset());
+                referenceStore.addMessageReference(context,id,data);
+                return true;
+            }
+        }catch(Throwable e){
+            log.warn("Could not replay add for message '"+id+"'.  Message may have already been added. reason: "+e,e);
+        }
+        return false;
+    }
+
+    /**
+     */
+    public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
+        final boolean debug=log.isDebugEnabled();
+        JournalQueueAck remove=new JournalQueueAck();
+        remove.setDestination(destination);
+        remove.setMessageAck(ack);
+        final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired());
+        if(!context.isInTransaction()){
+            if(debug)
+                log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+            removeMessage(ack,location);
+        }else{
+            if(debug)
+                log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
+            synchronized(this){
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.removeMessage(this,ack,location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+
+                public void afterCommit() throws Exception{
+                    if(debug)
+                        log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized(AMQMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                        removeMessage(ack,location);
+                    }
+                }
+
+                public void afterRollback() throws Exception{
+                    if(debug)
+                        log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
+                    synchronized(AMQMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+    }
+
+    private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
+        ReferenceData data;
+        synchronized(this){
+            lastLocation=location;
+            MessageId id=ack.getLastMessageId();
+            data=messages.remove(id);
+            if(data==null){
+                messageAcks.add(ack);
+            }
+        }
+        if(data==null){
+            try{
+                asyncWriteTask.wakeup();
+            }catch(InterruptedException e){
+                throw new InterruptedIOException();
+            }
+        }
+    }
+
+    public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){
+        try{
+            // Only remove the message if it has not already been removed.
+            ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId());
+            if(t!=null){
+                referenceStore.removeMessage(context,messageAck);
+                return true;
+            }
+        }catch(Throwable e){
+            log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId()
+                    +"'.  Message may have already been acknowledged. reason: "+e);
+        }
+        return false;
+    }
+
+    /**
+     * Waits till the lastest data has landed on the referenceStore
+     * 
+     * @throws InterruptedIOException
+     */
+    public void flush() throws InterruptedIOException{
+        if(log.isDebugEnabled()){
+            log.debug("flush starting ...");
+        }
+        CountDownLatch countDown;
+        synchronized(this){
+            if(lastWrittenLocation==lastLocation){
+                return;
+            }
+            if(flushLatch==null){
+                flushLatch=new CountDownLatch(1);
+            }
+            countDown=flushLatch;
+        }
+        try{
+            asyncWriteTask.wakeup();
+            countDown.await();
+        }catch(InterruptedException e){
+            throw new InterruptedIOException();
+        }
+        if(log.isDebugEnabled()){
+            log.debug("flush finished");
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    private void asyncWrite(){
+        try{
+            CountDownLatch countDown;
+            synchronized(this){
+                countDown=flushLatch;
+                flushLatch=null;
+            }
+            mark.set(doAsyncWrite());
+            if(countDown!=null){
+                countDown.countDown();
+            }
+        }catch(IOException e){
+            log.error("Checkpoint failed: "+e,e);
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    protected Location doAsyncWrite() throws IOException{
+        final ArrayList<MessageAck> cpRemovedMessageLocations;
+        final ArrayList<Location> cpActiveJournalLocations;
+        final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize();
+        final Location lastLocation;
+        // swap out the message hash maps..
+        synchronized(this){
+            cpAddedMessageIds=this.messages;
+            cpRemovedMessageLocations=this.messageAcks;
+            cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
+            this.messages=new LinkedHashMap<MessageId,ReferenceData>();
+            this.messageAcks=new ArrayList<MessageAck>();
+            lastLocation=this.lastLocation;
+        }
+        if(log.isDebugEnabled())
+            log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "
+                    +cpRemovedMessageLocations.size()+" ");
+        transactionTemplate.run(new Callback(){
+
+            public void execute() throws Exception{
+                int size=0;
+                PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter();
+                ConnectionContext context=transactionTemplate.getContext();
+                // Checkpoint the added messages.
+                Iterator<Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator();
+                while(iterator.hasNext()){
+                    Entry<MessageId,ReferenceData> entry=iterator.next();
+                    try{
+                        referenceStore.addMessageReference(context,entry.getKey(),entry.getValue());
+                    }catch(Throwable e){
+                        log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+                    }
+                    size++;
+                    // Commit the batch if it's getting too big
+                    if(size>=maxCheckpointMessageAddSize){
+                        persitanceAdapter.commitTransaction(context);
+                        persitanceAdapter.beginTransaction(context);
+                        size=0;
+                    }
+                }
+                persitanceAdapter.commitTransaction(context);
+                persitanceAdapter.beginTransaction(context);
+                // Checkpoint the removed messages.
+                for(MessageAck ack:cpRemovedMessageLocations){
+                    try{
+                        referenceStore.removeMessage(transactionTemplate.getContext(),ack);
+                    }catch(Throwable e){
+                        e.printStackTrace();
+                        log.debug("Message could not be removed from long term store: "+e.getMessage(),e);
+                    }
+                }
+            }
+        });
+        log.debug("Batch update done.");
+        synchronized(this){
+            cpAddedMessageIds=null;
+            lastWrittenLocation=lastLocation;
+        }
+        if(cpActiveJournalLocations.size()>0){
+            Collections.sort(cpActiveJournalLocations);
+            return cpActiveJournalLocations.get(0);
+        }else{
+            return lastLocation;
+        }
+    }
+
+    /**
+     * 
+     */
+    public Message getMessage(MessageId identity) throws IOException{
+        ReferenceData data=null;
+        synchronized(this){
+            // Is it still in flight???
+            data=messages.get(identity);
+            if(data==null&&cpAddedMessageIds!=null){
+                data=cpAddedMessageIds.get(identity);
+            }
+        }
+        if(data==null){
+            data=referenceStore.getMessageReference(identity);
+            if(data==null){
+                return null;
+            }
+        }
+        Location location=new Location();
+        location.setDataFileId(data.getFileId());
+        location.setOffset(data.getOffset());
+        DataStructure rc=peristenceAdapter.readCommand(location);
+        try{
+            return (Message)rc;
+        }catch(ClassCastException e){
+            throw new IOException("Could not read message "+identity+" at location "+location
+                    +", expected a message, but got: "+rc);
+        }
+    }
+
+    /**
+     * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
+     * transaction log and then the cache is updated.
+     * 
+     * @param listener
+     * @throws Exception
+     */
+    public void recover(final MessageRecoveryListener listener) throws Exception{
+        flush();
+        referenceStore.recover(new RecoveryListenerAdapter(this,listener));
+    }
+
+    public void start() throws Exception{
+        referenceStore.start();
+    }
+
+    public void stop() throws Exception{
+        asyncWriteTask.shutdown();
+        referenceStore.stop();
+    }
+
+    /**
+     * @return Returns the longTermStore.
+     */
+    public ReferenceStore getReferenceStore(){
+        return referenceStore;
+    }
+
+    /**
+     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
+     */
+    public void removeAllMessages(ConnectionContext context) throws IOException{
+        flush();
+        referenceStore.removeAllMessages(context);
+    }
+
+    public ActiveMQDestination getDestination(){
+        return destination;
+    }
+
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+            throws IOException{
+        throw new IOException("The journal does not support message references.");
+    }
+
+    public String getMessageReference(MessageId identity) throws IOException{
+        throw new IOException("The journal does not support message references.");
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#getMessageCount()
+     */
+    public int getMessageCount() throws IOException{
+        flush();
+        return referenceStore.getMessageCount();
+    }
+
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
+        if(referenceStore.supportsExternalBatchControl()){
+            synchronized(this){
+                referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+                if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
+                    // check for inflight messages
+                    int count=0;
+                    Iterator<Entry<MessageId,ReferenceData>> iterator=messages.entrySet().iterator();
+                    while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
+                        Entry<MessageId,ReferenceData> entry=iterator.next();
+                        ReferenceData data=entry.getValue();
+                        Message message=getMessage(data);
+                        recoveryListener.recoverMessage(message);
+                        count++;
+                    }
+                    referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId());
+                }
+            }
+        }else{
+            flush();
+            referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+        }
+    }
+
+    Message getMessage(ReferenceData data) throws IOException{
+        Location location=new Location();
+        location.setDataFileId(data.getFileId());
+        location.setOffset(data.getOffset());
+        DataStructure rc=peristenceAdapter.readCommand(location);
+        try{
+            return (Message)rc;
+        }catch(ClassCastException e){
+            throw new IOException("Could not read message  at location "+location+", expected a message, but got: "+rc);
+        }
+    }
+
+    public void resetBatching(){
+        referenceStore.resetBatching();
+    }
+
+    public Location getMark(){
+        return mark.get();
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Feb  8 10:46:54 2007
@@ -0,0 +1,679 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.memory.UsageListener;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
+import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
+
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener {
+
+    private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
+
+    private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
+    private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
+    
+    private AsyncDataManager asyncDataManager;
+    private ReferenceStoreAdapter referenceStoreAdapter;
+	private TaskRunnerFactory taskRunnerFactory; 
+    private WireFormat wireFormat = new OpenWireFormat();
+
+    private UsageManager usageManager;
+
+    private long cleanupInterval = 1000 * 60;
+    private long checkpointInterval = 1000 * 10;
+    
+    private int maxCheckpointWorkers = 1;
+    private int maxCheckpointMessageAddSize = 1024*4;
+
+    private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
+    
+    private TaskRunner checkpointTask;
+    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+    
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private Runnable periodicCheckpointTask;
+
+	private Runnable periodicCleanupTask;
+	private boolean deleteAllMessages;
+	private File directory = new File("activemq-data/quick");
+
+
+    
+    public synchronized void start() throws Exception {
+        if( !started.compareAndSet(false, true) )
+            return;
+        this.usageManager.addUsageListener(this);
+
+        if( asyncDataManager == null ) {
+        	asyncDataManager = createAsyncDataManager();
+        }
+        
+        if( referenceStoreAdapter==null ) {
+        	referenceStoreAdapter = createReferenceStoreAdapter();
+        }
+        referenceStoreAdapter.setUsageManager(usageManager);
+
+        if( taskRunnerFactory==null ) {
+        	taskRunnerFactory = createTaskRunnerFactory();
+        }
+        
+    	asyncDataManager.start();    	
+    	if( deleteAllMessages ) {
+    		asyncDataManager.delete();
+	        try {
+	            JournalTrace trace = new JournalTrace();
+	            trace.setMessage("DELETED "+new Date());
+	            Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
+	            asyncDataManager.setMark(location, true);
+	            log.info("Journal deleted: ");
+	            deleteAllMessages=false;
+	        } catch (IOException e) {
+	            throw e;
+	        } catch (Throwable e) {
+	            throw IOExceptionSupport.create(e);
+	        }
+
+	        referenceStoreAdapter.deleteAllMessages();
+        }
+        referenceStoreAdapter.start();
+    	
+    	Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
+    	log.info("Active data files: "+files);
+        
+        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+            public boolean iterate() {
+                doCheckpoint();
+                return false;
+            }
+        }, "ActiveMQ Journal Checkpoint Worker");
+                
+        createTransactionStore();
+        recover();
+
+        // Do a checkpoint periodically.
+        periodicCheckpointTask = new Runnable() {
+	        public void run() {
+                checkpoint(false);
+	        }
+	    };	    
+        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+        
+        periodicCleanupTask = new Runnable() {
+	        public void run() {
+	        	cleanup();
+	        }
+	    };
+        Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
+
+    }
+
+
+	public void stop() throws Exception {
+        
+        if( !started.compareAndSet(true, false) )
+            return;
+        
+        this.usageManager.removeUsageListener(this);        
+        Scheduler.cancel(periodicCheckpointTask);
+        Scheduler.cancel(periodicCleanupTask);
+
+        
+        Iterator<AMQMessageStore> iterator = queues.values().iterator();
+        while (iterator.hasNext()) {
+            AMQMessageStore ms = iterator.next();
+            ms.stop();
+        }
+
+        iterator = topics.values().iterator();
+        while (iterator.hasNext()) {
+            final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
+            ms.stop();
+        }
+        
+        // Take one final checkpoint and stop checkpoint processing.
+        checkpoint(true);
+        checkpointTask.shutdown();   
+        
+        queues.clear();
+        topics.clear();
+
+        IOException firstException = null;
+        referenceStoreAdapter.stop();
+        try {
+            log.debug("Journal close");
+            asyncDataManager.close();
+        } catch (Exception e) {
+            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+        }
+        
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+    
+
+    /**
+     * When we checkpoint we move all the journalled data to long term storage.
+     * @param stopping 
+     * 
+     * @param b
+     */
+    public void checkpoint(boolean sync) {
+        try {
+            if (asyncDataManager == null )
+                throw new IllegalStateException("Journal is closed.");
+            
+            CountDownLatch latch = null;
+            synchronized(this) {
+                latch = nextCheckpointCountDownLatch;
+            }
+            
+            checkpointTask.wakeup();
+            
+            if (sync) {
+                log.debug("Waitng for checkpoint to complete.");
+                latch.await();
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Request to start checkpoint failed: " + e, e);
+        }
+    }
+        
+    /**
+     * This does the actual checkpoint.
+     * @return 
+     */
+    public boolean doCheckpoint() {
+        CountDownLatch latch = null;
+        synchronized(this) {                       
+            latch = nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch = new CountDownLatch(1);
+        }        
+        try {
+
+            log.debug("Checkpoint started.");
+            Location newMark = null;
+
+            Iterator<AMQMessageStore> iterator = queues.values().iterator();
+            while (iterator.hasNext()) {
+                final AMQMessageStore ms = iterator.next();
+                Location mark = (Location) ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                    newMark = mark;
+                }
+            }
+
+            iterator = topics.values().iterator();
+            while (iterator.hasNext()) {
+                final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
+                Location mark = (Location) ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                    newMark = mark;
+                }
+            }
+
+            try {
+                if (newMark != null) {
+                    log.debug("Marking journal at: " + newMark);
+                    asyncDataManager.setMark(newMark, false);
+                    writeTraceMessage("CHECKPOINT "+new Date(), true);
+                }
+            }
+            catch (Exception e) {
+                log.error("Failed to mark the Journal: " + e, e);
+            }
+    
+//                if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
+//                    // We may be check pointing more often than the checkpointInterval if under high use
+//                    // But we don't want to clean up the db that often.
+//                    long now = System.currentTimeMillis();
+//                    if( now > lastCleanup+checkpointInterval ) {
+//                        lastCleanup = now;
+//                        ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
+//                    }
+//                }
+
+            log.debug("Checkpoint done.");
+        }
+        finally {
+            latch.countDown();
+        }
+        return true;
+    }
+
+    /**
+     * Cleans up the data files
+     * @return 
+     * @throws IOException 
+     */
+    public void cleanup() {
+    	
+    	try {
+    		Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
+			asyncDataManager.consolidateDataFilesNotIn(inUse);
+		} catch (IOException e) {
+            log.error("Could not cleanup data files: "+e, e);
+		}
+    	
+    }
+    
+
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+        destinations.addAll(queues.keySet());
+        destinations.addAll(topics.keySet());
+        return destinations;
+    }
+
+    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+        if (destination.isQueue()) {
+            return createQueueMessageStore((ActiveMQQueue) destination);
+        }
+        else {
+            return createTopicMessageStore((ActiveMQTopic) destination);
+        }
+    }
+
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        AMQMessageStore store = queues.get(destination);
+        if (store == null) {
+        	ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
+            store = new AMQMessageStore(this, checkpointStore, destination);
+            try {
+				store.start();
+			} catch (Exception e) {
+				throw IOExceptionSupport.create(e);
+			}
+            queues.put(destination, store);
+        }
+        return store;
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
+        AMQTopicMessageStore store = (AMQTopicMessageStore) topics.get(destinationName);
+        if (store == null) {
+        	TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
+            store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
+            try {
+				store.start();
+			} catch (Exception e) {
+				throw IOExceptionSupport.create(e);
+			}
+            topics.put(destinationName, store);
+        }
+        return store;
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return transactionStore;
+    }
+
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return referenceStoreAdapter.getLastMessageBrokerSequenceId();
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.beginTransaction(context);
+    }
+
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.commitTransaction(context);
+    }
+
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        referenceStoreAdapter.rollbackTransaction(context);
+    }
+
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public DataStructure readCommand(Location location) throws IOException {
+        try {
+        	ByteSequence packet = asyncDataManager.read(location);
+            return (DataStructure) wireFormat.unmarshal(packet);
+        } catch (IOException e) {
+            throw createReadException(location, e);
+        }
+    }
+
+    /**
+     * Move all the messages that were in the journal into long term storage. We
+     * just replay and do a checkpoint.
+     * 
+     * @throws IOException
+     * @throws IOException
+     * @throws InvalidLocationException
+     * @throws IllegalStateException
+     */
+    private void recover() throws IllegalStateException, IOException {
+
+        Location pos = null;
+        int redoCounter = 0;
+
+        log.info("Journal Recovery Started from: " + asyncDataManager);
+        long start = System.currentTimeMillis();
+        ConnectionContext context = new ConnectionContext();
+
+        // While we have records in the journal.
+        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+            ByteSequence data = asyncDataManager.read(pos);
+            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+            if (c instanceof Message ) {
+                Message message = (Message) c;
+                AMQMessageStore store = (AMQMessageStore) createMessageStore(message.getDestination());
+                if ( message.isInTransaction()) {
+                    transactionStore.addMessage(store, message, pos);
+                }
+                else {
+                    if( store.replayAddMessage(context, message, pos) ) {
+                    	redoCounter++;
+                    }
+                }
+            } else {
+                switch (c.getDataStructureType()) {
+                case JournalQueueAck.DATA_STRUCTURE_TYPE:
+                {
+                    JournalQueueAck command = (JournalQueueAck) c;
+                    AMQMessageStore store = (AMQMessageStore) createMessageStore(command.getDestination());
+                    if (command.getMessageAck().isInTransaction()) {
+                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
+                    }
+                    else {
+                        if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
+                        	redoCounter++;
+                        }
+                    }
+                }
+                break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
+                {
+                    JournalTopicAck command = (JournalTopicAck) c;
+                    AMQTopicMessageStore store = (AMQTopicMessageStore) createMessageStore(command.getDestination());
+                    if (command.getTransactionId() != null) {
+                        transactionStore.acknowledge(store, command, pos);
+                    }
+                    else {
+                        if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
+                        	redoCounter++;
+                        }
+                    }
+                }
+                break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE:
+                {
+                    JournalTransaction command = (JournalTransaction) c;
+                    try {
+                        // Try to replay the packet.
+                        switch (command.getType()) {
+                        case JournalTransaction.XA_PREPARE:
+                            transactionStore.replayPrepare(command.getTransactionId());
+                            break;
+                        case JournalTransaction.XA_COMMIT:
+                        case JournalTransaction.LOCAL_COMMIT:
+                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+                            if (tx == null)
+                                break; // We may be trying to replay a commit that
+                                        // was already committed.
+
+                            // Replay the committed operations.
+                            tx.getOperations();
+                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+                                TxOperation op = (TxOperation) iter.next();
+                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
+                                    if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
+                                        redoCounter++;
+                                }
+                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
+                                    if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
+                                        redoCounter++;
+                                }
+                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
+                                    JournalTopicAck ack = (JournalTopicAck) op.data;
+                                    if( ((AMQTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
+                                        redoCounter++;
+                                    }
+                                }
+                            }
+                            break;
+                        case JournalTransaction.LOCAL_ROLLBACK:
+                        case JournalTransaction.XA_ROLLBACK:
+                            transactionStore.replayRollback(command.getTransactionId());
+                            break;
+                        }
+                    }
+                    catch (IOException e) {
+                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+                    }
+                }
+                break;
+                case JournalTrace.DATA_STRUCTURE_TYPE:
+                    JournalTrace trace = (JournalTrace) c;
+                    log.debug("TRACE Entry: " + trace.getMessage());
+                    break;
+                default:
+                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
+                }
+            }
+        }
+        Location location = writeTraceMessage("RECOVERED "+new Date(), true);
+        asyncDataManager.setMark(location, true);
+        long end = System.currentTimeMillis();
+
+        log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
+    }
+
+    private IOException createReadException(Location location, Exception e) {
+        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(DataStructure packet, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+    }
+
+    protected IOException createWriteException(String command, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+    }
+
+    protected IOException createRecoveryFailedException(Exception e) {
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+    }
+
+    /**
+     * 
+     * @param command
+     * @param sync
+     * @return
+     * @throws IOException
+     */
+    public Location writeCommand(DataStructure command, boolean sync) throws IOException {
+        return asyncDataManager.write(wireFormat.marshal(command), sync);
+    }
+
+    private Location writeTraceMessage(String message, boolean sync) throws IOException {
+        JournalTrace trace = new JournalTrace();
+        trace.setMessage(message);
+        return writeCommand(trace, sync);
+    }
+
+    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+        newPercentUsage = ((newPercentUsage)/10)*10;
+        oldPercentUsage = ((oldPercentUsage)/10)*10;
+        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+            checkpoint(false);
+        }
+    }
+    
+    public AMQTransactionStore getTransactionStore() {
+        return transactionStore;
+    }
+
+    public void deleteAllMessages() throws IOException {
+    	deleteAllMessages=true;
+    }
+
+
+
+    public String toString(){
+        return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Subclass overridables
+    ///////////////////////////////////////////////////////////////////
+    protected AsyncDataManager createAsyncDataManager() {
+    	AsyncDataManager manager = new AsyncDataManager();
+    	manager.setDirectory(new File(directory, "journal"));
+		return manager;
+	}
+    
+    protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+    	KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); 
+		return adaptor;
+	}
+
+    protected TaskRunnerFactory createTaskRunnerFactory() {
+		return DefaultThreadPools.getDefaultTaskRunnerFactory();
+	}
+
+
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+    
+	public AsyncDataManager getAsyncDataManager() {
+		return asyncDataManager;
+	}
+	public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+		this.asyncDataManager = asyncDataManager;
+	}
+
+	public ReferenceStoreAdapter getReferenceStoreAdapter() {
+		return referenceStoreAdapter;
+	}
+	public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+		this.referenceStoreAdapter = referenceStoreAdapter;
+	}
+
+	public TaskRunnerFactory getTaskRunnerFactory() {
+		return taskRunnerFactory;
+	}
+	public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+		this.taskRunnerFactory = taskRunnerFactory;
+	}
+
+    /**
+     * @return Returns the wireFormat.
+     */
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
+	public void setWireFormat(WireFormat wireFormat) {
+		this.wireFormat = wireFormat;
+	}
+
+    public UsageManager getUsageManager() {
+        return usageManager;
+    }
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+    }
+
+    public int getMaxCheckpointMessageAddSize() {
+        return maxCheckpointMessageAddSize;
+    }
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+    }
+
+    public int getMaxCheckpointWorkers() {
+        return maxCheckpointWorkers;
+    }
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+        this.maxCheckpointWorkers = maxCheckpointWorkers;
+    }
+
+	public File getDirectory() {
+		return directory;
+	}
+
+	public void setDirectory(File directory) {
+		this.directory = directory;
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Thu Feb  8 10:46:54 2007
@@ -0,0 +1,205 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.13 $
+ */
+public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{
+
+    private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class);
+    private TopicReferenceStore topicReferenceStore;
+    private HashMap<SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+
+    public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore,
+            ActiveMQTopic destinationName){
+        super(adapter,topicReferenceStore,destinationName);
+        this.topicReferenceStore=topicReferenceStore;
+    }
+
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+            throws Exception{
+        flush();
+        topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener));
+    }
+
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+            final MessageRecoveryListener listener) throws Exception{
+        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
+        topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
+        if(recoveryListener.size()==0){
+            flush();
+            topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
+        }
+    }
+
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+        return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
+    }
+
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+            throws IOException{
+        topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
+    }
+
+    /**
+     */
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
+            throws IOException{
+        final boolean debug=log.isDebugEnabled();
+        JournalTopicAck ack=new JournalTopicAck();
+        ack.setDestination(destination);
+        ack.setMessageId(messageId);
+        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
+        ack.setSubscritionName(subscriptionName);
+        ack.setClientId(clientId);
+        ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
+        final Location location=peristenceAdapter.writeCommand(ack,false);
+        final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+        if(!context.isInTransaction()){
+            if(debug)
+                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+            acknowledge(messageId,location,key);
+        }else{
+            if(debug)
+                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+            synchronized(this){
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.acknowledge(this,ack,location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+
+                public void afterCommit() throws Exception{
+                    if(debug)
+                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+                    synchronized(AMQTopicMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                        acknowledge(messageId,location,key);
+                    }
+                }
+
+                public void afterRollback() throws Exception{
+                    if(debug)
+                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+                    synchronized(AMQTopicMessageStore.this){
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+    }
+
+    public boolean replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,
+            MessageId messageId){
+        try{
+            SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName);
+            if(sub!=null){
+                topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId);
+                return true;
+            }
+        }catch(Throwable e){
+            log.debug("Could not replay acknowledge for message '"+messageId
+                    +"'.  Message may have already been acknowledged. reason: "+e);
+        }
+        return false;
+    }
+
+    /**
+     * @param messageId
+     * @param location
+     * @param key
+     * @throws InterruptedIOException
+     */
+    private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
+        synchronized(this){
+            lastLocation=location;
+            ackedLastAckLocations.put(key,messageId);
+        }
+        try{
+            asyncWriteTask.wakeup();
+        }catch(InterruptedException e){
+            throw new InterruptedIOException();
+        }
+    }
+
+    @Override protected Location doAsyncWrite() throws IOException{
+        final HashMap<SubscriptionKey,MessageId> cpAckedLastAckLocations;
+        // swap out the hash maps..
+        synchronized(this){
+            cpAckedLastAckLocations=this.ackedLastAckLocations;
+            this.ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+        }
+        Location location=super.doAsyncWrite();
+        transactionTemplate.run(new Callback(){
+
+            public void execute() throws Exception{
+                // Checkpoint the acknowledged messages.
+                Iterator<SubscriptionKey> iterator=cpAckedLastAckLocations.keySet().iterator();
+                while(iterator.hasNext()){
+                    SubscriptionKey subscriptionKey=iterator.next();
+                    MessageId identity=cpAckedLastAckLocations.get(subscriptionKey);
+                    topicReferenceStore.acknowledge(transactionTemplate.getContext(),subscriptionKey.clientId,
+                            subscriptionKey.subscriptionName,identity);
+                }
+            }
+        });
+        return location;
+    }
+
+    /**
+     * @return Returns the longTermStore.
+     */
+    public TopicReferenceStore getTopicReferenceStore(){
+        return topicReferenceStore;
+    }
+
+    public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
+        topicReferenceStore.deleteSubscription(clientId,subscriptionName);
+    }
+
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+        return topicReferenceStore.getAllSubscriptions();
+    }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        flush();
+        return topicReferenceStore.getMessageCount(clientId,subscriberName);
+    }
+
+    public void resetBatching(String clientId,String subscriptionName){
+        topicReferenceStore.resetBatching(clientId,subscriptionName);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Thu Feb  8 10:46:54 2007
@@ -0,0 +1,340 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+
+
+/**
+ */
+public class AMQTransactionStore implements TransactionStore {
+
+    private final AMQPersistenceAdapter peristenceAdapter;
+    Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
+    Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
+    private boolean doingRecover;
+
+    
+    public static class TxOperation {
+        
+        static final byte ADD_OPERATION_TYPE       = 0;
+        static final byte REMOVE_OPERATION_TYPE    = 1;
+        static final byte ACK_OPERATION_TYPE       = 3;
+        
+        public byte operationType;
+        public AMQMessageStore store;
+        public Object data;
+        public Location location;
+        
+        public TxOperation(byte operationType, AMQMessageStore store, Object data, Location location) {
+            this.operationType=operationType;
+            this.store=store;
+            this.data=data;
+            this.location=location;
+        }
+        
+    }
+    /**
+     * Operations
+     * @version $Revision: 1.6 $
+     */
+    public static class Tx {
+
+        private final Location location;
+        private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
+
+        public Tx(Location location) {
+            this.location=location;
+        }
+
+        public void add(AMQMessageStore store, Message msg, Location location) {
+            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
+        }
+
+        public void add(AMQMessageStore store, MessageAck ack) {
+            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
+        }
+
+        public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
+            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
+        }
+        
+        public Message[] getMessages() {
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
+                if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            Message rc[] = new Message[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public MessageAck[] getAcks() {
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
+                if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            MessageAck rc[] = new MessageAck[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public ArrayList<TxOperation> getOperations() {
+            return operations;
+        }
+
+    }
+
+    public AMQTransactionStore(AMQPersistenceAdapter adapter) {
+        this.peristenceAdapter = adapter;
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void prepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx==null)
+            return;
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
+    }
+    
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void replayPrepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx==null)
+            return;
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
+    }
+
+    public Tx getTx(TransactionId txid,Location location){
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.get(txid);
+        }
+        if(tx==null){
+            tx=new Tx(location);
+            inflightTransactions.put(txid,tx);
+        }
+        return tx;
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+        Tx tx;
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                tx=preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                tx=inflightTransactions.remove(txid);
+            }
+        }
+        if(tx==null)
+            return;
+        if(txid.isXATransaction()){
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
+        }else{
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
+                    true);
+        }
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                return preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                return inflightTransactions.remove(txid);
+            }
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void rollback(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx!=null)
+            synchronized(preparedTransactions){
+                tx=preparedTransactions.remove(txid);
+            }
+        if(tx!=null){
+            if(txid.isXATransaction()){
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
+            }else{
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
+                        true);
+            }
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void replayRollback(TransactionId txid) throws IOException{
+        boolean inflight=false;
+        synchronized(inflightTransactions){
+            inflight=inflightTransactions.remove(txid)!=null;
+        }
+        if(inflight){
+            synchronized(preparedTransactions){
+                preparedTransactions.remove(txid);
+            }
+        }
+    }
+    
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+    
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+        // All the in-flight transactions get rolled back..
+        synchronized(inflightTransactions){
+            inflightTransactions.clear();
+        }
+        this.doingRecover=true;
+        try{
+            Map<TransactionId, Tx> txs=null;
+            synchronized(preparedTransactions){
+                txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
+            }
+            for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
+                Object txid=iter.next();
+                Tx tx=txs.get(txid);
+                listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+            }
+        }finally{
+            this.doingRecover=false;
+        }
+    }
+
+    /**
+     * @param message
+     * @throws IOException
+     */
+    void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
+        Tx tx = getTx(message.getTransactionId(), location);
+        tx.add(store, message, location);
+    }
+
+    /**
+     * @param ack
+     * @throws IOException
+     */
+    public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
+    }
+    
+    
+    public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
+    }
+
+
+    public Location checkpoint() throws IOException{
+        // Nothing really to checkpoint.. since, we don't
+        // checkpoint tx operations in to long term store until they are committed.
+        // But we keep track of the first location of an operation
+        // that was associated with an active tx. The journal can not
+        // roll over active tx records.
+        Location rc=null;
+        synchronized(inflightTransactions){
+            for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
+                Tx tx=iter.next();
+                Location location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
+            }
+        }
+        synchronized(preparedTransactions){
+            for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
+                Tx tx=iter.next();
+                Location location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
+            }
+            return rc;
+        }
+    }
+
+    public boolean isDoingRecover() {
+        return doingRecover;
+    }
+
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Thu Feb  8 10:46:54 2007
@@ -0,0 +1,73 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+final class RecoveryListenerAdapter implements MessageRecoveryListener{
+
+    static final private Log log=LogFactory.getLog(RecoveryListenerAdapter.class);
+    private final MessageStore store;
+    private final MessageRecoveryListener listener;
+    private int count=0;
+    private MessageId lastRecovered;
+
+    RecoveryListenerAdapter(MessageStore store,MessageRecoveryListener listener){
+        this.store=store;
+        this.listener=listener;
+    }
+
+    public void finished(){
+        listener.finished();
+    }
+
+    public boolean hasSpace(){
+        return listener.hasSpace();
+    }
+
+    public void recoverMessage(Message message) throws Exception{
+        listener.recoverMessage(message);
+        lastRecovered=message.getMessageId();
+        count++;
+    }
+
+    public void recoverMessageReference(MessageId ref) throws Exception{
+        Message message=this.store.getMessage(ref);
+        if(message!=null){
+            listener.recoverMessage(message);
+            count++;
+            lastRecovered=ref;
+        }else{
+            log.error("Message id "+ref+" could not be recovered from the data store!");
+        }
+    }
+    
+    MessageId getLastRecoveredMessageId() {
+        return lastRecovered;
+    }
+
+    int size(){
+        return count;
+    }
+
+    void reset(){
+        count=0;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html?view=auto&rev=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html Thu Feb  8 10:46:54 2007
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	
+</p>
+
+</body>
+</html>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=504999&r1=504998&r2=504999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Thu Feb  8 10:46:54 2007
@@ -99,6 +99,7 @@
             throws IOException{
         ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
         messageContainer.put(messageId,record);
+        addInterest(record);
     }
 
     public ReferenceData getMessageReference(MessageId identity) throws IOException{
@@ -120,7 +121,8 @@
     }
 
     public synchronized void removeMessage(MessageId msgId) throws IOException{
-        messageContainer.remove(msgId);
+        ReferenceRecord rr = messageContainer.remove(msgId);
+        removeInterest(rr);
         if(messageContainer.isEmpty()){
             resetBatching();
         }
@@ -151,5 +153,29 @@
 
     public boolean isSupportForCursors(){
         return true;
+    }
+
+    /**
+     * @param startAfter
+     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
+     */
+    public void setBatch(MessageId startAfter){
+        resetBatching();
+        if (startAfter != null) {
+            batchEntry = messageContainer.getEntry(startAfter);
+        }
+        
+    }
+
+    public boolean supportsExternalBatchControl(){
+        return true;
+    }
+    
+    void removeInterest(ReferenceRecord rr) {
+        
+    }
+    
+    void addInterest(ReferenceRecord rr) {
+        
     }
 }



Mime
View raw message