activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r477567 [1/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/rapid/ test/java/org/apache/activemq/perf/
Date Tue, 21 Nov 2006 08:11:04 GMT
Author: rajdavies
Date: Tue Nov 21 00:11:03 2006
New Revision: 477567

URL: http://svn.apache.org/viewvc?view=rev&rev=477567
Log:
Updated Rapid Persistence Adaptor to do batching for cursors

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Tue Nov 21 00:11:03 2006
@@ -91,7 +91,7 @@
                 Message msg=(Message)messageContainer.get(entry);
                 if(msg.getMessageId().equals(identity)){
                     result=msg;
-                    cache.put(identity,msg);
+                    cache.put(identity,entry);
                     break;
                 }
             }
@@ -186,7 +186,7 @@
      * @throws Exception
      * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
      */
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
         StoreEntry entry = batchEntry;
         if (entry == null) {
             entry= messageContainer.getFirst();
@@ -239,10 +239,9 @@
      * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
      */
     public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
-        if (newPercentUsage == 100) {
+        if(newPercentUsage==100){
             cache.clear();
         }
-        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Tue Nov 21 00:11:03 2006
@@ -32,6 +32,7 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.rapid.RapidMessageReference;
 
 /**
  * @version $Revision: 1.5 $
@@ -149,11 +150,9 @@
                         listener.recoverMessage((Message)msg);
                     }
                 }
-                listener.finished();
             }
-        }else{
-            listener.finished();
         }
+        listener.finished();
     }
 
     public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
@@ -236,31 +235,7 @@
         messageContainer.add(messageRef);
     }
 
-    /**
-     * @return the destination
-     * @see org.apache.activemq.store.MessageStore#getDestination()
-     */
-    public ActiveMQDestination getDestination(){
-        return destination;
-    }
-
-    /**
-     * @param identity
-     * @return the Message
-     * @throws IOException
-     * @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
-     */
-    public Message getMessage(MessageId identity) throws IOException{
-        Message result=null;
-        for(Iterator i=messageContainer.iterator();i.hasNext();){
-            Message msg=(Message)i.next();
-            if(msg.getMessageId().equals(identity)){
-                result=msg;
-                break;
-            }
-        }
-        return result;
-    }
+   
 
     /**
      * @param identity
@@ -272,22 +247,7 @@
         return null;
     }
 
-    /**
-     * @throws Exception
-     * @see org.apache.activemq.store.MessageStore#recover(org.apache.activemq.store.MessageRecoveryListener)
-     */
-    public void recover(MessageRecoveryListener listener) throws Exception{
-        for(Iterator iter=messageContainer.iterator();iter.hasNext();){
-            Object msg=iter.next();
-            if(msg.getClass()==String.class){
-                listener.recoverMessageReference((String)msg);
-            }else{
-                listener.recoverMessage((Message)msg);
-            }
-        }
-        listener.finished();
-    }
-
+  
     /**
      * @param context
      * @throws IOException
@@ -302,49 +262,12 @@
         }
     }
 
-    /**
-     * @param context
-     * @param ack
-     * @throws IOException
-     * @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext,
-     *      org.apache.activemq.command.MessageAck)
-     */
-    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
-        for(Iterator i=messageContainer.iterator();i.hasNext();){
-            Message msg=(Message)i.next();
-            if(msg.getMessageId().equals(ack.getLastMessageId())){
-                i.remove();
-                break;
-            }
-        }
-    }
-
+   
     public synchronized void resetBatching(String clientId,String subscriptionName){
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
         if(topicSubContainer!=null){
             topicSubContainer.reset();
         }
-    }
-
-   
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /**
-     * @param clientId
-     * @param subscriptionName
-     * @param id
-     * @return previous messageId
-     * @throws IOException
-     * @see org.apache.activemq.store.TopicMessageStore#getPreviousMessageIdToDeliver(java.lang.String,
-     *      java.lang.String, org.apache.activemq.command.MessageId)
-     */
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
-            throws IOException{
-        // TODO Auto-generated method stub
-        return null;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Tue Nov 21 00:11:03 2006
@@ -22,43 +22,43 @@
  * 
  * @version $Revision: 1.10 $
  */
- class TopicSubContainer{
+ public class TopicSubContainer{
 
     private ListContainer listContainer;
     private StoreEntry batchEntry;
     
-    TopicSubContainer(ListContainer container){
+    public TopicSubContainer(ListContainer container){
         this.listContainer = container;
     }
     /**
      * @return the batchEntry
      */
-     StoreEntry getBatchEntry(){
+     public StoreEntry getBatchEntry(){
         return this.batchEntry;
     }
     
     /**
      * @param batchEntry the batchEntry to set
      */
-     void setBatchEntry(StoreEntry batchEntry){
+     public void setBatchEntry(StoreEntry batchEntry){
         this.batchEntry=batchEntry;
     }
     
     /**
      * @return the listContainer
      */
-     ListContainer getListContainer(){
+     public ListContainer getListContainer(){
         return this.listContainer;
     }
     
     /**
      * @param listContainer the listContainer to set
      */
-     void setListContainer(ListContainer container){
+     public void setListContainer(ListContainer container){
         this.listContainer=container;
     }
     
-     void reset() {
+     public void reset() {
         batchEntry = null;
     }
    

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java Tue Nov 21 00:11:03 2006
@@ -26,6 +26,10 @@
     public final MessageId messageId;
     public final Location location;
     
+    public RapidMessageReference(MessageId messageId, Location location) {
+        this.messageId = messageId;
+        this.location=location;
+    }
     public RapidMessageReference(Message message, Location location) {
         this.messageId = message.getMessageId();
         this.location=location;

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java?view=auto&rev=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java Tue Nov 21 00:11:03 2006
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.rapid;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activeio.journal.active.Location;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.Marshaller;
+
+public class RapidMessageReferenceMarshaller  implements Marshaller{
+    
+
+    
+    public Object readPayload(DataInput dataIn) throws IOException{
+        MessageId mid = new MessageId(dataIn.readUTF());
+        Location loc = new Location(dataIn.readInt(),dataIn.readInt());
+        RapidMessageReference rmr = new RapidMessageReference(mid,loc);
+        return rmr;
+    }
+
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
+        RapidMessageReference rmr = (RapidMessageReference)object;
+        dataOut.writeUTF(rmr.getMessageId().toString());
+        dataOut.writeInt(rmr.getLocation().getLogFileId());
+        dataOut.writeInt(rmr.getLocation().getLogFileOffset());
+        
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Tue Nov 21 00:11:03 2006
@@ -30,11 +30,15 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.memory.UsageListener;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.TransactionTemplate;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,31 +48,42 @@
  * 
  * @version $Revision: 1.14 $
  */
-public class RapidMessageStore implements MessageStore {
+public class RapidMessageStore implements MessageStore, UsageListener {
 
     private static final Log log = LogFactory.getLog(RapidMessageStore.class);
 
     protected final RapidPersistenceAdapter peristenceAdapter;
     protected final RapidTransactionStore transactionStore;
-    protected final MapContainer messageContainer;
+    protected final ListContainer messageContainer;
     protected final ActiveMQDestination destination;
     protected final TransactionTemplate transactionTemplate;
-
-//    private LinkedHashMap messages = new LinkedHashMap();
-//    private ArrayList messageAcks = new ArrayList();
-
-//    /** A MessageStore that we can use to retrieve messages quickly. */
-//    private LinkedHashMap cpAddedMessageIds;
+    protected final LRUCache cache;
+    protected UsageManager usageManager;
+    protected StoreEntry batchEntry = null;
+    
+    
     
     protected Location lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
     
-    public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) {
+    public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, ListContainer container, int maximumCacheSize) {
         this.peristenceAdapter = adapter;
         this.transactionStore = adapter.getTransactionStore();
         this.messageContainer = container;
         this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+        this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
+//      populate the cache
+        StoreEntry entry=messageContainer.getFirst();
+        int count = 0;
+        if(entry!=null){
+            do{
+                RapidMessageReference msg = (RapidMessageReference)messageContainer.get(entry);
+                cache.put(msg.getMessageId(),entry);
+                entry = messageContainer.getNext(entry);
+                count++;
+            }while(entry!=null && count < maximumCacheSize);
+        }
     }
     
 
@@ -76,7 +91,7 @@
      * Not synchronized since the Journal has better throughput if you increase
      * the number of concurrent writes that it is doing.
      */
-    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+    public synchronized void addMessage(ConnectionContext context, final Message message) throws IOException {
         
         final MessageId id = message.getMessageId();
         
@@ -118,12 +133,9 @@
         }
     }
 
-    private void addMessage(final RapidMessageReference messageReference) {
-        synchronized (this) {
-            lastLocation = messageReference.getLocation();
-            MessageId id = messageReference.getMessageId();
-            messageContainer.put(id.toString(), messageReference);
-        }
+    private synchronized void addMessage(final RapidMessageReference messageReference){
+        StoreEntry item=messageContainer.placeLast(messageReference);
+        cache.put(messageReference.getMessageId(),item);
     }
     
     static protected String toString(Location location) {
@@ -141,7 +153,7 @@
     public void replayAddMessage(ConnectionContext context, Message message, Location location) {
         try {
             RapidMessageReference messageReference = new RapidMessageReference(message, location);
-            messageContainer.put(message.getMessageId().toString(), messageReference);
+            addMessage(messageReference);
         }
         catch (Throwable e) {
             log.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
@@ -160,7 +172,7 @@
         if( !context.isInTransaction() ) {
             if( debug )
                 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
-            removeMessage(ack, location);
+            removeMessage(ack.getLastMessageId());
         } else {
             if( debug )
                 log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
@@ -174,7 +186,7 @@
                         log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
                     synchronized( RapidMessageStore.this ) {
                         inFlightTxLocations.remove(location);
-                        removeMessage(ack, location);
+                        removeMessage(ack.getLastMessageId());
                     }
                 }
                 public void afterRollback() throws Exception {                    
@@ -189,32 +201,53 @@
         }
     }
     
-    private void removeMessage(final MessageAck ack, final Location location) {
-        synchronized (this) {
-            lastLocation = location;
-            MessageId id = ack.getLastMessageId();
-            messageContainer.remove(id.toString());
+        
+    public synchronized void removeMessage(MessageId msgId) throws IOException{
+        StoreEntry entry=(StoreEntry)cache.remove(msgId);
+        if(entry!=null){
+            entry = messageContainer.refresh(entry);
+            messageContainer.remove(entry);
+        }else{
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+                RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
+                if(msg.getMessageId().equals(msgId)){
+                    messageContainer.remove(entry);
+                    break;
+                }
+            }
         }
     }
     
     public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
         try {
             MessageId id = ack.getLastMessageId();
-            messageContainer.remove(id.toString());
+           removeMessage(id);
         }
         catch (Throwable e) {
             log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
         }
     }
 
-    /**
-     * 
-     */
-    public Message getMessage(MessageId id) throws IOException {
-        RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString());
-        if (messageReference == null )
+   
+    public synchronized Message getMessage(MessageId identity) throws IOException{
+        RapidMessageReference result=null;
+        StoreEntry entry=(StoreEntry)cache.get(identity);
+        if(entry!=null){
+            entry = messageContainer.refresh(entry);
+            result = (RapidMessageReference)messageContainer.get(entry);
+        }else{    
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+                RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
+                if(msg.getMessageId().equals(identity)){
+                    result=msg;
+                    cache.put(identity,entry);
+                    break;
+                }
+            }
+        }
+        if (result == null )
             return null;
-        return (Message) peristenceAdapter.readCommand(messageReference.getLocation());
+        return (Message) peristenceAdapter.readCommand(result.getLocation());
     }
 
     /**
@@ -225,28 +258,32 @@
      * @param listener
      * @throws Exception 
      */
-    public void recover(final MessageRecoveryListener listener) throws Exception {
         
-        for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
+    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
+        for(Iterator iter=messageContainer.iterator();iter.hasNext();){
             RapidMessageReference messageReference=(RapidMessageReference) iter.next();
             Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
             listener.recoverMessage(m);
         }
         listener.finished();
-        
     }
 
-    public void start() throws Exception {
+    public void start() {
+        if( this.usageManager != null )
+            this.usageManager.addUsageListener(this);
     }
 
-    public void stop() throws Exception {
+    public void stop() {
+        if( this.usageManager != null )
+            this.usageManager.removeUsageListener(this);
     }
 
     /**
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
      */
-    public void removeAllMessages(ConnectionContext context) throws IOException {
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
+        cache.clear();
     }
     
     public ActiveMQDestination getDestination() {
@@ -254,15 +291,16 @@
     }
 
     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
-        throw new IOException("The journal does not support message references.");
+        throw new IOException("Does not support message references.");
     }
 
     public String getMessageReference(MessageId identity) throws IOException {
-        throw new IOException("The journal does not support message references.");
+        throw new IOException("Does not support message references.");
     }
 
 
     public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
     }
 
     /**
@@ -289,13 +327,50 @@
 
    
     public int getMessageCount(){
-        return 0;
+        return messageContainer.size();
     }
 
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        StoreEntry entry=batchEntry;
+        if(entry==null){
+            entry=messageContainer.getFirst();
+        }else{
+            entry=messageContainer.refresh(entry);
+            entry=messageContainer.getNext(entry);
+        }
+        if(entry!=null){
+            int count=0;
+            do{
+                RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(entry);
+                Message msg=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
+                if(msg!=null){
+                    Message message=(Message)msg;
+                    listener.recoverMessage(message);
+                    count++;
+                }
+                batchEntry=entry;
+                entry=messageContainer.getNext(entry);
+            }while(entry!=null&&count<maxReturned);
+        }
+        listener.finished();
     }
 
     public void resetBatching(){
+        batchEntry = null;
+    }
+    
+    /**
+     * @return true if the store supports cursors
+     */
+    public boolean isSupportForCursors() {
+        return true;
+    }
+    
+    public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        if (newPercentUsage == 100) {
+            cache.clear();
+        }
+        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Tue Nov 21 00:11:03 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.rapid;
 
 import java.io.File;
@@ -32,7 +29,6 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
@@ -52,6 +48,7 @@
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
@@ -65,6 +62,8 @@
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
 import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+import org.apache.activemq.store.kahadaptor.KahaTopicMessageStore;
+import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller;
 import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
 import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
 import org.apache.activemq.thread.Scheduler;
@@ -78,101 +77,89 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * An implementation of {@link PersistenceAdapter} designed for use with a
- * {@link Journal} and then check pointing asynchronously on a timeout with some
- * other long term persistent storage.
+ * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
+ * asynchronously on a timeout with some other long term persistent storage.
  * 
  * @org.apache.xbean.XBean
  * 
  * @version $Revision: 1.17 $
  */
-public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
+public class RapidPersistenceAdapter implements PersistenceAdapter,JournalEventListener,UsageListener{
 
-    private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class);
+    private static final Log log=LogFactory.getLog(RapidPersistenceAdapter.class);
     private final Journal journal;
-
-    private final WireFormat wireFormat = new OpenWireFormat();
-
-    private final ConcurrentHashMap queues = new ConcurrentHashMap();
-    private final ConcurrentHashMap topics = new ConcurrentHashMap();
-    
-    private long checkpointInterval = 1000 * 60 * 5;
-    private long lastCheckpointRequest = System.currentTimeMillis();
-    private int maxCheckpointWorkers = 10;
-    private int maxCheckpointMessageAddSize = 5000;
-
-    private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
+    private final WireFormat wireFormat=new OpenWireFormat();
+    private final ConcurrentHashMap queues=new ConcurrentHashMap();
+    private final ConcurrentHashMap topics=new ConcurrentHashMap();
+    private long checkpointInterval=1000*60*5;
+    private long lastCheckpointRequest=System.currentTimeMillis();
+    private int maxCheckpointWorkers=10;
+    private int maxCheckpointMessageAddSize=5000;
+    private RapidTransactionStore transactionStore=new RapidTransactionStore(this);
     private ThreadPoolExecutor checkpointExecutor;
-    
     private TaskRunner checkpointTask;
-    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+    private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
     private boolean fullCheckPoint;
-    
-    private AtomicBoolean started = new AtomicBoolean(false);
-    
+    private AtomicBoolean started=new AtomicBoolean(false);
     Store store;
     private boolean useExternalMessageReferences;
+    private final Runnable periodicCheckpointTask=createPeriodicCheckpointTask();
+    private int maximumDestinationCacheSize=2000;
 
+    final Runnable createPeriodicCheckpointTask(){
+        return new Runnable(){
 
-    private final Runnable periodicCheckpointTask  = createPeriodicCheckpointTask(); 
-    	
-    final Runnable createPeriodicCheckpointTask() {
-    	return new Runnable() {
-    		public void run() {
-	            if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
-	                checkpoint(false, true);
-	            }
-	        }
-	    };
+            public void run(){
+                if(System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval){
+                    checkpoint(false,true);
+                }
+            }
+        };
     }
-    
-    public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
 
-        this.journal = journal;
+    public RapidPersistenceAdapter(Journal journal,TaskRunnerFactory taskRunnerFactory) throws IOException{
+        this.journal=journal;
         journal.setJournalEventListener(this);
-
-        File dir = ((JournalImpl)journal).getLogDirectory();
+        File dir=((JournalImpl)journal).getLogDirectory();
         String name=dir.getAbsolutePath()+File.separator+"kaha.db";
         store=StoreFactory.open(name,"rw");
-        
-        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
-            public boolean iterate() {
+        checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
+
+            public boolean iterate(){
                 return doCheckpoint();
             }
-        }, "ActiveMQ Checkpoint Worker");
-
+        },"ActiveMQ Checkpoint Worker");
     }
 
-    public Set getDestinations() {
+    public Set getDestinations(){
         Set rc=new HashSet();
-        try {
-        for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
-            Object obj=i.next();
-            if(obj instanceof ActiveMQDestination){
-                rc.add(obj);
+        try{
+            for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+                Object obj=i.next();
+                if(obj instanceof ActiveMQDestination){
+                    rc.add(obj);
+                }
             }
-        }
         }catch(IOException e){
-            log.error("Failed to get destinations " ,e);
+            log.error("Failed to get destinations ",e);
         }
         return rc;
     }
 
-    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
-        if (destination.isQueue()) {
-            return createQueueMessageStore((ActiveMQQueue) destination);
-        }
-        else {
-            return createTopicMessageStore((ActiveMQTopic) destination);
+    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
+        if(destination.isQueue()){
+            return createQueueMessageStore((ActiveMQQueue)destination);
+        }else{
+            return createTopicMessageStore((ActiveMQTopic)destination);
         }
     }
 
-    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        RapidMessageStore store = (RapidMessageStore) queues.get(destination);
-        if (store == null) {
-            MapContainer messageContainer=getMapContainer(destination,"topic-data");
-            store = new RapidMessageStore(this, destination, messageContainer);
-            queues.put(destination, store);
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+        RapidMessageStore store=(RapidMessageStore)queues.get(destination);
+        if(store==null){
+            ListContainer messageContainer=getListContainer(destination,"topic-data");
+            store=new RapidMessageStore(this,destination,messageContainer,maximumDestinationCacheSize);
+            queues.put(destination,store);
         }
         return store;
     }
@@ -189,257 +176,241 @@
         return container;
     }
 
-    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination);
-        if (store == null) {
-            
-            MapContainer messageContainer=getMapContainer(destination,"topic-data");
-            MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs");
-            MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks");
-            
-            ackContainer.setKeyMarshaller(new StringMarshaller());
-            ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
+    protected ListContainer getListContainer(Object id,String containerName) throws IOException{
+        Store store=getStore();
+        ListContainer container=store.getListContainer(id,containerName);
+        container.setMaximumCacheSize(0);
+        container.setMarshaller(new RapidMessageReferenceMarshaller());
+        container.load();
+        return container;
+    }
 
-            store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer);
-            topics.put(destination, store);
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
+        TopicMessageStore rc=(TopicMessageStore)topics.get(destination);
+        if(rc==null){
+            Store store=getStore();
+            ListContainer messageContainer=getListContainer(destination,"topic-data");
+            MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
+            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+            ackContainer.setMarshaller(new TopicSubAckMarshaller());
+            rc=new RapidTopicMessageStore(this,store,messageContainer,ackContainer,subsContainer,destination,
+                    maximumDestinationCacheSize);
+            topics.put(destination,rc);
         }
-        return store;
+        return rc;
     }
 
-    public TransactionStore createTransactionStore() throws IOException {
+    public TransactionStore createTransactionStore() throws IOException{
         return transactionStore;
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException {
+    public long getLastMessageBrokerSequenceId() throws IOException{
         // TODO: implement this.
         return 0;
     }
 
-    public void beginTransaction(ConnectionContext context) throws IOException {
+    public void beginTransaction(ConnectionContext context) throws IOException{
     }
 
-    public void commitTransaction(ConnectionContext context) throws IOException {
+    public void commitTransaction(ConnectionContext context) throws IOException{
     }
 
-    public void rollbackTransaction(ConnectionContext context) throws IOException {
+    public void rollbackTransaction(ConnectionContext context) throws IOException{
     }
 
-    public synchronized void start() throws Exception {
-        if( !started.compareAndSet(false, true) )
+    public synchronized void start() throws Exception{
+        if(!started.compareAndSet(false,true))
             return;
-        
-        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
-            public Thread newThread(Runnable runable) {
-                Thread t = new Thread(runable, "Journal checkpoint worker");
-                t.setPriority(7);
-                return t;
-            }            
-        });
-        //checkpointExecutor.allowCoreThreadTimeOut(true);
-        
+        checkpointExecutor=new ThreadPoolExecutor(maxCheckpointWorkers,maxCheckpointWorkers,30,TimeUnit.SECONDS,
+                new LinkedBlockingQueue(),new ThreadFactory(){
+
+                    public Thread newThread(Runnable runable){
+                        Thread t=new Thread(runable,"Journal checkpoint worker");
+                        t.setPriority(7);
+                        return t;
+                    }
+                });
+        // checkpointExecutor.allowCoreThreadTimeOut(true);
         createTransactionStore();
         recover();
-
         // Do a checkpoint periodically.
-        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
-
+        Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval/10);
     }
 
-    public void stop() throws Exception {
-        
-        if( !started.compareAndSet(true, false) )
+    public void stop() throws Exception{
+        if(!started.compareAndSet(true,false))
             return;
-        
         Scheduler.cancel(periodicCheckpointTask);
-
         // Take one final checkpoint and stop checkpoint processing.
-        checkpoint(false, true);
-        checkpointTask.shutdown();        
+        checkpoint(false,true);
+        checkpointTask.shutdown();
         checkpointExecutor.shutdown();
-        
         queues.clear();
         topics.clear();
-
-        IOException firstException = null;
-        try {
+        IOException firstException=null;
+        try{
             journal.close();
-        } catch (Exception e) {
-            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+        }catch(Exception e){
+            firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
         }
         store.close();
-        
-        if (firstException != null) {
+        if(firstException!=null){
             throw firstException;
         }
     }
 
     // Properties
     // -------------------------------------------------------------------------
-
     /**
      * @return Returns the wireFormat.
      */
-    public WireFormat getWireFormat() {
+    public WireFormat getWireFormat(){
         return wireFormat;
     }
 
     // Implementation methods
     // -------------------------------------------------------------------------
-
     /**
-     * The Journal give us a call back so that we can move old data out of the
-     * journal. Taking a checkpoint does this for us.
+     * The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this
+     * for us.
      * 
      * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
      */
-    public void overflowNotification(RecordLocation safeLocation) {
-        checkpoint(false, true);
+    public void overflowNotification(RecordLocation safeLocation){
+        checkpoint(false,true);
     }
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * @param stopping 
+     * 
+     * @param stopping
      * 
      * @param b
      */
-    public void checkpoint(boolean sync, boolean fullCheckpoint) {
-        try {
-            if (journal == null )
+    public void checkpoint(boolean sync,boolean fullCheckpoint){
+        try{
+            if(journal==null)
                 throw new IllegalStateException("Journal is closed.");
-            
-            long now = System.currentTimeMillis();
-            CountDownLatch latch = null;
-            synchronized(this) {
-                latch = nextCheckpointCountDownLatch;
-                lastCheckpointRequest = now;
-                if( fullCheckpoint ) {
-                    this.fullCheckPoint = true; 
+            long now=System.currentTimeMillis();
+            CountDownLatch latch=null;
+            synchronized(this){
+                latch=nextCheckpointCountDownLatch;
+                lastCheckpointRequest=now;
+                if(fullCheckpoint){
+                    this.fullCheckPoint=true;
                 }
             }
-            
             checkpointTask.wakeup();
-            
-            if (sync) {
+            if(sync){
                 log.debug("Waking for checkpoint to complete.");
                 latch.await();
             }
-        }
-        catch (InterruptedException e) {
-            log.warn("Request to start checkpoint failed: " + e, e);
+        }catch(InterruptedException e){
+            log.warn("Request to start checkpoint failed: "+e,e);
         }
     }
-        
+
     /**
      * This does the actual checkpoint.
-     * @return 
+     * 
+     * @return
      */
-    public boolean doCheckpoint() {
-        CountDownLatch latch = null;
+    public boolean doCheckpoint(){
+        CountDownLatch latch=null;
         boolean fullCheckpoint;
-        synchronized(this) {                       
-            latch = nextCheckpointCountDownLatch;
-            nextCheckpointCountDownLatch = new CountDownLatch(1);
-            fullCheckpoint = this.fullCheckPoint;
-            this.fullCheckPoint=false;            
-        }        
-        try {
-
+        synchronized(this){
+            latch=nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch=new CountDownLatch(1);
+            fullCheckpoint=this.fullCheckPoint;
+            this.fullCheckPoint=false;
+        }
+        try{
             log.debug("Checkpoint started.");
-            RecordLocation newMark = null;
-
-            ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
-            
+            RecordLocation newMark=null;
+            ArrayList futureTasks=new ArrayList(queues.size()+topics.size());
             //
             // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
-            // to long term store as soon as possible.  
+            // to long term store as soon as possible.
             // 
             // We want to avoid doing that for queue messages since removes the come in the same
-            // checkpoint cycle will nullify the previous message add.  Therefore, we only
+            // checkpoint cycle will nullify the previous message add. Therefore, we only
             // checkpoint queues on the fullCheckpoint cycles.
             //
-            if( fullCheckpoint ) {                
-                Iterator iterator = queues.values().iterator();
-                while (iterator.hasNext()) {
-                    try {
-                        final RapidMessageStore ms = (RapidMessageStore) iterator.next();
-                        FutureTask task = new FutureTask(new Callable() {
-                            public Object call() throws Exception {
+            if(fullCheckpoint){
+                Iterator iterator=queues.values().iterator();
+                while(iterator.hasNext()){
+                    try{
+                        final RapidMessageStore ms=(RapidMessageStore)iterator.next();
+                        FutureTask task=new FutureTask(new Callable(){
+
+                            public Object call() throws Exception{
                                 return ms.checkpoint();
-                            }});
+                            }
+                        });
                         futureTasks.add(task);
-                        checkpointExecutor.execute(task);                        
-                    }
-                    catch (Exception e) {
-                        log.error("Failed to checkpoint a message store: " + e, e);
+                        checkpointExecutor.execute(task);
+                    }catch(Exception e){
+                        log.error("Failed to checkpoint a message store: "+e,e);
                     }
                 }
             }
+            Iterator iterator=topics.values().iterator();
+            while(iterator.hasNext()){
+                try{
+                    final RapidTopicMessageStore ms=(RapidTopicMessageStore)iterator.next();
+                    FutureTask task=new FutureTask(new Callable(){
 
-            Iterator iterator = topics.values().iterator();
-            while (iterator.hasNext()) {
-                try {
-                    final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next();
-                    FutureTask task = new FutureTask(new Callable() {
-                        public Object call() throws Exception {
+                        public Object call() throws Exception{
                             return ms.checkpoint();
-                        }});
+                        }
+                    });
                     futureTasks.add(task);
-                    checkpointExecutor.execute(task);                        
-                }
-                catch (Exception e) {
-                    log.error("Failed to checkpoint a message store: " + e, e);
+                    checkpointExecutor.execute(task);
+                }catch(Exception e){
+                    log.error("Failed to checkpoint a message store: "+e,e);
                 }
             }
-
-            try {
-                for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
-                    FutureTask ft = (FutureTask) iter.next();
-                    RecordLocation mark = (RecordLocation) ft.get();
+            try{
+                for(Iterator iter=futureTasks.iterator();iter.hasNext();){
+                    FutureTask ft=(FutureTask)iter.next();
+                    RecordLocation mark=(RecordLocation)ft.get();
                     // We only set a newMark on full checkpoints.
-                    if( fullCheckpoint ) {
-                        if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
-                            newMark = mark;
+                    if(fullCheckpoint){
+                        if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
+                            newMark=mark;
                         }
                     }
                 }
-            } catch (Throwable e) {
-                log.error("Failed to checkpoint a message store: " + e, e);
+            }catch(Throwable e){
+                log.error("Failed to checkpoint a message store: "+e,e);
             }
-            
-
-            if( fullCheckpoint ) {
-                try {
-                    if (newMark != null) {
-                        log.debug("Marking journal at: " + newMark);
-                        journal.setMark(newMark, true);
+            if(fullCheckpoint){
+                try{
+                    if(newMark!=null){
+                        log.debug("Marking journal at: "+newMark);
+                        journal.setMark(newMark,true);
                     }
+                }catch(Exception e){
+                    log.error("Failed to mark the Journal: "+e,e);
                 }
-                catch (Exception e) {
-                    log.error("Failed to mark the Journal: " + e, e);
-                }
-                
-// TODO: do we need to implement a periodic clean up?
-               
-//                if (longTermPersistence instanceof JDBCPersistenceAdapter) {
-//                    // We may be check pointing more often than the checkpointInterval if under high use
-//                    // But we don't want to clean up the db that often.
-//                    long now = System.currentTimeMillis();
-//                    if( now > lastCleanup+checkpointInterval ) {
-//                        lastCleanup = now;
-//                        ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
-//                    }
-//                }
+                // TODO: do we need to implement a periodic clean up?
+                // if (longTermPersistence instanceof JDBCPersistenceAdapter) {
+                // // We may be check pointing more often than the checkpointInterval if under high use
+                // // But we don't want to clean up the db that often.
+                // long now = System.currentTimeMillis();
+                // if( now > lastCleanup+checkpointInterval ) {
+                // lastCleanup = now;
+                // ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
+                // }
+                // }
             }
-
             log.debug("Checkpoint done.");
-        }
-        finally {
+        }finally{
             latch.countDown();
         }
-        synchronized(this) {
+        synchronized(this){
             return this.fullCheckPoint;
-        }        
-
+        }
     }
 
     /**
@@ -447,108 +418,95 @@
      * @return
      * @throws IOException
      */
-    public DataStructure readCommand(RecordLocation location) throws IOException {
-        try {
-            Packet data = journal.read(location);
-            return (DataStructure) wireFormat.unmarshal(toByteSequence(data));
-        }
-        catch (InvalidRecordLocationException e) {
-            throw createReadException(location, e);
-        }
-        catch (IOException e) {
-            throw createReadException(location, e);
+    public DataStructure readCommand(RecordLocation location) throws IOException{
+        try{
+            Packet data=journal.read(location);
+            return (DataStructure)wireFormat.unmarshal(toByteSequence(data));
+        }catch(InvalidRecordLocationException e){
+            throw createReadException(location,e);
+        }catch(IOException e){
+            throw createReadException(location,e);
         }
     }
 
     /**
-     * Move all the messages that were in the journal into long term storage. We
-     * just replay and do a checkpoint.
+     * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
      * 
      * @throws IOException
      * @throws IOException
      * @throws InvalidRecordLocationException
      * @throws IllegalStateException
      */
-    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
-
-        Location pos = null;
-        int transactionCounter = 0;
-
+    private void recover() throws IllegalStateException,InvalidRecordLocationException,IOException,IOException{
+        Location pos=null;
+        int transactionCounter=0;
         log.info("Journal Recovery Started.");
-        ConnectionContext context = new ConnectionContext();
-
+        ConnectionContext context=new ConnectionContext();
         // While we have records in the journal.
-        while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) {
-            Packet data = journal.read(pos);
-            DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
-
-            if (c instanceof Message ) {
-                Message message = (Message) c;
-                RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination());
-                if ( message.isInTransaction()) {
-                    transactionStore.addMessage(store, message, pos);
-                }
-                else {
-                    store.replayAddMessage(context, message, pos);
+        while((pos=(Location)journal.getNextRecordLocation(pos))!=null){
+            Packet data=journal.read(pos);
+            DataStructure c=(DataStructure)wireFormat.unmarshal(toByteSequence(data));
+            if(c instanceof Message){
+                Message message=(Message)c;
+                RapidMessageStore store=(RapidMessageStore)createMessageStore(message.getDestination());
+                if(message.isInTransaction()){
+                    transactionStore.addMessage(store,message,pos);
+                }else{
+                    store.replayAddMessage(context,message,pos);
                     transactionCounter++;
                 }
-            } else {
-                switch (c.getDataStructureType()) {
-                case JournalQueueAck.DATA_STRUCTURE_TYPE:
-                {
-                    JournalQueueAck command = (JournalQueueAck) c;
-                    RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination());
-                    if (command.getMessageAck().isInTransaction()) {
-                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
-                    }
-                    else {
-                        store.replayRemoveMessage(context, command.getMessageAck());
+            }else{
+                switch(c.getDataStructureType()){
+                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
+                    JournalQueueAck command=(JournalQueueAck)c;
+                    RapidMessageStore store=(RapidMessageStore)createMessageStore(command.getDestination());
+                    if(command.getMessageAck().isInTransaction()){
+                        transactionStore.removeMessage(store,command.getMessageAck(),pos);
+                    }else{
+                        store.replayRemoveMessage(context,command.getMessageAck());
                         transactionCounter++;
                     }
                 }
-                break;
-                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
-                {
-                    JournalTopicAck command = (JournalTopicAck) c;
-                    RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination());
-                    if (command.getTransactionId() != null) {
-                        transactionStore.acknowledge(store, command, pos);
-                    }
-                    else {
-                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
+                    break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
+                    JournalTopicAck command=(JournalTopicAck)c;
+                    RapidTopicMessageStore store=(RapidTopicMessageStore)createMessageStore(command.getDestination());
+                    if(command.getTransactionId()!=null){
+                        transactionStore.acknowledge(store,command,pos);
+                    }else{
+                        store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
+                                .getMessageId());
                         transactionCounter++;
                     }
                 }
-                break;
-                case JournalTransaction.DATA_STRUCTURE_TYPE:
-                {
-                    JournalTransaction command = (JournalTransaction) c;
-                    try {
+                    break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE: {
+                    JournalTransaction command=(JournalTransaction)c;
+                    try{
                         // Try to replay the packet.
-                        switch (command.getType()) {
+                        switch(command.getType()){
                         case JournalTransaction.XA_PREPARE:
                             transactionStore.replayPrepare(command.getTransactionId());
                             break;
                         case JournalTransaction.XA_COMMIT:
                         case JournalTransaction.LOCAL_COMMIT:
-                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
-                            if (tx == null)
+                            Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
+                            if(tx==null)
                                 break; // We may be trying to replay a commit that
-                                        // was already committed.
-
+                            // was already committed.
                             // Replay the committed operations.
-                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
-                                TxOperation op = (TxOperation) iter.next();
-                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
-                                    op.store.replayAddMessage(context, (Message) op.data, op.location);
+                            for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
+                                TxOperation op=(TxOperation)iter.next();
+                                if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
+                                    op.store.replayAddMessage(context,(Message)op.data,op.location);
                                 }
-                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
-                                    op.store.replayRemoveMessage(context, (MessageAck) op.data);
+                                if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
+                                    op.store.replayRemoveMessage(context,(MessageAck)op.data);
                                 }
-                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
-                                    JournalTopicAck ack = (JournalTopicAck) op.data;
-                                    ((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
-                                            .getMessageId());
+                                if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
+                                    JournalTopicAck ack=(JournalTopicAck)op.data;
+                                    ((RapidTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
+                                            .getSubscritionName(),ack.getMessageId());
                                 }
                             }
                             transactionCounter++;
@@ -558,42 +516,39 @@
                             transactionStore.replayRollback(command.getTransactionId());
                             break;
                         }
-                    }
-                    catch (IOException e) {
-                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+                    }catch(IOException e){
+                        log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
                     }
                 }
-                break;
+                    break;
                 case JournalTrace.DATA_STRUCTURE_TYPE:
-                    JournalTrace trace = (JournalTrace) c;
-                    log.debug("TRACE Entry: " + trace.getMessage());
+                    JournalTrace trace=(JournalTrace)c;
+                    log.debug("TRACE Entry: "+trace.getMessage());
                     break;
                 default:
-                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
+                    log.error("Unknown type of record in transaction log which will be discarded: "+c);
                 }
             }
         }
-
-        RecordLocation location = writeTraceMessage("RECOVERED", true);
-        journal.setMark(location, true);
-
-        log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+        RecordLocation location=writeTraceMessage("RECOVERED",true);
+        journal.setMark(location,true);
+        log.info("Journal Recovered: "+transactionCounter+" message(s) in transactions recovered.");
     }
 
-    private IOException createReadException(RecordLocation location, Exception e) {
-        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+    private IOException createReadException(RecordLocation location,Exception e){
+        return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
     }
 
-    protected IOException createWriteException(DataStructure packet, Exception e) {
-        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+    protected IOException createWriteException(DataStructure packet,Exception e){
+        return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
     }
 
-    protected IOException createWriteException(String command, Exception e) {
-        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+    protected IOException createWriteException(String command,Exception e){
+        return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
     }
 
-    protected IOException createRecoveryFailedException(Exception e) {
-        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+    protected IOException createRecoveryFailedException(Exception e){
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
     }
 
     /**
@@ -603,85 +558,102 @@
      * @return
      * @throws IOException
      */
-    public Location writeCommand(DataStructure command, boolean sync) throws IOException {
-        if( started.get() )
-            return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync);
+    public Location writeCommand(DataStructure command,boolean sync) throws IOException{
+        if(started.get())
+            return (Location)journal.write(toPacket(wireFormat.marshal(command)),sync);
         throw new IOException("closed");
     }
 
-    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
-        JournalTrace trace = new JournalTrace();
+    private RecordLocation writeTraceMessage(String message,boolean sync) throws IOException{
+        JournalTrace trace=new JournalTrace();
         trace.setMessage(message);
-        return writeCommand(trace, sync);
+        return writeCommand(trace,sync);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
-        if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
-            checkpoint(false, true);
+    public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        if(newPercentUsage>80&&oldPercentUsage<newPercentUsage){
+            checkpoint(false,true);
         }
     }
 
-    public RapidTransactionStore getTransactionStore() {
+    public RapidTransactionStore getTransactionStore(){
         return transactionStore;
     }
 
-    public void deleteAllMessages() throws IOException {        
-        try {
-            JournalTrace trace = new JournalTrace();
+    public void deleteAllMessages() throws IOException{
+        try{
+            JournalTrace trace=new JournalTrace();
             trace.setMessage("DELETED");
-            RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
-            journal.setMark(location, true);
+            RecordLocation location=journal.write(toPacket(wireFormat.marshal(trace)),false);
+            journal.setMark(location,true);
             log.info("Journal deleted: ");
-        } catch (IOException e) {
+        }catch(IOException e){
             throw e;
-        } catch (Throwable e) {
+        }catch(Throwable e){
             throw IOExceptionSupport.create(e);
         }
-        
         if(store!=null){
-            store.delete();
+            if(store.isInitialized()){
+                store.clear();
+            }else{
+                store.delete();
+            }
         }
     }
 
-    public int getMaxCheckpointMessageAddSize() {
+    public int getMaxCheckpointMessageAddSize(){
         return maxCheckpointMessageAddSize;
     }
 
-    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
-        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
+        this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
     }
 
-    public int getMaxCheckpointWorkers() {
+    public int getMaxCheckpointWorkers(){
         return maxCheckpointWorkers;
     }
 
-    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
-        this.maxCheckpointWorkers = maxCheckpointWorkers;
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
+        this.maxCheckpointWorkers=maxCheckpointWorkers;
     }
 
-    public boolean isUseExternalMessageReferences() {
+    public boolean isUseExternalMessageReferences(){
         return false;
     }
 
-    public void setUseExternalMessageReferences(boolean enable) {
-        if( enable )
+    public void setUseExternalMessageReferences(boolean enable){
+        if(enable)
             throw new IllegalArgumentException("The journal does not support message references.");
     }
 
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(UsageManager usageManager){
     }
 
-    public Store getStore() {
+    public Store getStore(){
         return store;
     }
-    
-    public Packet toPacket(ByteSequence sequence) {
-    	return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
-    }
-    
-    public ByteSequence toByteSequence(Packet packet) {
-    	org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
-    	return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+    /**
+     * @return the maximumDestinationCacheSize
+     */
+    public int getMaximumDestinationCacheSize(){
+        return this.maximumDestinationCacheSize;
     }
 
+    /**
+     * @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
+     */
+    public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
+        this.maximumDestinationCacheSize=maximumDestinationCacheSize;
+    }
+
+    public Packet toPacket(ByteSequence sequence){
+        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data,sequence.offset,
+                sequence.length));
+    }
+
+    public ByteSequence toByteSequence(Packet packet){
+        org.apache.activeio.packet.ByteSequence sequence=packet.asByteSequence();
+        return new ByteSequence(sequence.getData(),sequence.getOffset(),sequence.getLength());
+    }
 }



Mime
View raw message