activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [16/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Aug  8 11:56:59 2007
@@ -24,6 +24,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.DataStructure;
@@ -51,78 +52,82 @@
  * 
  * @version $Revision: 1.14 $
  */
-public class AMQMessageStore implements MessageStore{
+public class AMQMessageStore implements MessageStore {
 
-    private static final Log log=LogFactory.getLog(AMQMessageStore.class);
+    private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
     protected final AMQPersistenceAdapter peristenceAdapter;
     protected final AMQTransactionStore transactionStore;
     protected final ReferenceStore referenceStore;
     protected final ActiveMQDestination destination;
     protected final TransactionTemplate transactionTemplate;
-    private LinkedHashMap<MessageId,ReferenceData> messages=new LinkedHashMap<MessageId,ReferenceData>();
-    private ArrayList<MessageAck> messageAcks=new ArrayList<MessageAck>();
+    private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
+    private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
     /** A MessageStore that we can use to retrieve messages quickly. */
-    private LinkedHashMap<MessageId,ReferenceData> cpAddedMessageIds;
+    private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
     protected Location lastLocation;
     protected Location lastWrittenLocation;
-    protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
+    protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
     protected final TaskRunner asyncWriteTask;
     protected CountDownLatch flushLatch;
-    private final boolean debug=log.isDebugEnabled();
-    private final AtomicReference<Location> mark=new AtomicReference<Location>();
-    
-    public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
-        this.peristenceAdapter=adapter;
-        this.transactionStore=adapter.getTransactionStore();
-        this.referenceStore=referenceStore;
-        this.destination=destination;
-        this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext());
-        asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
+    private final boolean debug = LOG.isDebugEnabled();
+    private final AtomicReference<Location> mark = new AtomicReference<Location>();
+
+    public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+        this.peristenceAdapter = adapter;
+        this.transactionStore = adapter.getTransactionStore();
+        this.referenceStore = referenceStore;
+        this.destination = destination;
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+        asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
 
-            public boolean iterate(){
+            public boolean iterate() {
                 asyncWrite();
                 return false;
             }
-        },"Checkpoint: "+destination);
+        }, "Checkpoint: " + destination);
     }
 
-    public void setUsageManager(UsageManager usageManager){
+    public void setUsageManager(UsageManager usageManager) {
         referenceStore.setUsageManager(usageManager);
     }
 
     /**
-     * Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it
-     * is doing.
+     * Not synchronized since the Journal has better throughput if you increase
+     * the number of concurrent writes that it is doing.
      */
-    public void addMessage(ConnectionContext context,final Message message) throws IOException{
-        final MessageId id=message.getMessageId();
-        final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
-        if(!context.isInTransaction()){
-            if(debug)
-                log.debug("Journalled message add for: "+id+", at: "+location);
-            addMessage(message,location);
-        }else{
-            if(debug)
-                log.debug("Journalled transacted message add for: "+id+", at: "+location);
-            synchronized(this){
+    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+        final MessageId id = message.getMessageId();
+        final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+        if (!context.isInTransaction()) {
+            if (debug) {
+                LOG.debug("Journalled message add for: " + id + ", at: " + location);
+            }
+            addMessage(message, location);
+        } else {
+            if (debug) {
+                LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
+            }
+            synchronized (this) {
                 inFlightTxLocations.add(location);
             }
-            transactionStore.addMessage(this,message,location);
-            context.getTransaction().addSynchronization(new Synchronization(){
+            transactionStore.addMessage(this, message, location);
+            context.getTransaction().addSynchronization(new Synchronization() {
 
-                public void afterCommit() throws Exception{
-                    if(debug)
-                        log.debug("Transacted message add commit for: "+id+", at: "+location);
-                    synchronized(AMQMessageStore.this){
+                public void afterCommit() throws Exception {
+                    if (debug) {
+                        LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
+                    }
+                    synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        addMessage(message,location);
+                        addMessage(message, location);
                     }
                 }
 
-                public void afterRollback() throws Exception{
-                    if(debug)
-                        log.debug("Transacted message add rollback for: "+id+", at: "+location);
-                    synchronized(AMQMessageStore.this){
+                public void afterRollback() throws Exception {
+                    if (debug) {
+                        LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
+                    }
+                    synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                 }
@@ -130,74 +135,78 @@
         }
     }
 
-    void addMessage(final Message message,final Location location) throws InterruptedIOException{
-        ReferenceData data=new ReferenceData();
+    void addMessage(final Message message, final Location location) throws InterruptedIOException {
+        ReferenceData data = new ReferenceData();
         data.setExpiration(message.getExpiration());
         data.setFileId(location.getDataFileId());
         data.setOffset(location.getOffset());
-        synchronized(this){
-            lastLocation=location;
-            messages.put(message.getMessageId(),data);
+        synchronized (this) {
+            lastLocation = location;
+            messages.put(message.getMessageId(), data);
         }
-        try{
+        try {
             asyncWriteTask.wakeup();
-        }catch(InterruptedException e){
+        } catch (InterruptedException e) {
             throw new InterruptedIOException();
         }
     }
 
-    public boolean replayAddMessage(ConnectionContext context,Message message,Location location){
-        MessageId id=message.getMessageId();
-        try{
+    public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
+        MessageId id = message.getMessageId();
+        try {
             // Only add the message if it has not already been added.
-            ReferenceData data=referenceStore.getMessageReference(id);
-            if(data==null){
-                data=new ReferenceData();
+            ReferenceData data = referenceStore.getMessageReference(id);
+            if (data == null) {
+                data = new ReferenceData();
                 data.setExpiration(message.getExpiration());
                 data.setFileId(location.getDataFileId());
                 data.setOffset(location.getOffset());
-                referenceStore.addMessageReference(context,id,data);
+                referenceStore.addMessageReference(context, id, data);
                 return true;
             }
-        }catch(Throwable e){
-            log.warn("Could not replay add for message '"+id+"'.  Message may have already been added. reason: "+e,e);
+        } catch (Throwable e) {
+            LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: " + e, e);
         }
         return false;
     }
 
     /**
      */
-    public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
-        JournalQueueAck remove=new JournalQueueAck();
+    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+        JournalQueueAck remove = new JournalQueueAck();
         remove.setDestination(destination);
         remove.setMessageAck(ack);
-        final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired());
-        if(!context.isInTransaction()){
-            if(debug)
-                log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
-            removeMessage(ack,location);
-        }else{
-            if(debug)
-                log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
-            synchronized(this){
+        final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+        if (!context.isInTransaction()) {
+            if (debug) {
+                LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
+            }
+            removeMessage(ack, location);
+        } else {
+            if (debug) {
+                LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
+            }
+            synchronized (this) {
                 inFlightTxLocations.add(location);
             }
-            transactionStore.removeMessage(this,ack,location);
-            context.getTransaction().addSynchronization(new Synchronization(){
+            transactionStore.removeMessage(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization() {
 
-                public void afterCommit() throws Exception{
-                    if(debug)
-                        log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
-                    synchronized(AMQMessageStore.this){
+                public void afterCommit() throws Exception {
+                    if (debug) {
+                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
+                    }
+                    synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        removeMessage(ack,location);
+                        removeMessage(ack, location);
                     }
                 }
 
-                public void afterRollback() throws Exception{
-                    if(debug)
-                        log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
-                    synchronized(AMQMessageStore.this){
+                public void afterRollback() throws Exception {
+                    if (debug) {
+                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
+                    }
+                    synchronized (AMQMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                 }
@@ -205,36 +214,35 @@
         }
     }
 
-    final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
+    final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
         ReferenceData data;
-        synchronized(this){
-            lastLocation=location;
-            MessageId id=ack.getLastMessageId();
-            data=messages.remove(id);
-            if(data==null){
+        synchronized (this) {
+            lastLocation = location;
+            MessageId id = ack.getLastMessageId();
+            data = messages.remove(id);
+            if (data == null) {
                 messageAcks.add(ack);
             }
         }
-        if(data==null){
-            try{
+        if (data == null) {
+            try {
                 asyncWriteTask.wakeup();
-            }catch(InterruptedException e){
+            } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
         }
     }
 
-    public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){
-        try{
+    public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+        try {
             // Only remove the message if it has not already been removed.
-            ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId());
-            if(t!=null){
-                referenceStore.removeMessage(context,messageAck);
+            ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
+            if (t != null) {
+                referenceStore.removeMessage(context, messageAck);
                 return true;
             }
-        }catch(Throwable e){
-            log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId()
-                    +"'.  Message may have already been acknowledged. reason: "+e);
+        } catch (Throwable e) {
+            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
         }
         return false;
     }
@@ -244,28 +252,28 @@
      * 
      * @throws InterruptedIOException
      */
-    public void flush() throws InterruptedIOException{
-        if(log.isDebugEnabled()){
-            log.debug("flush starting ...");
+    public void flush() throws InterruptedIOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("flush starting ...");
         }
         CountDownLatch countDown;
-        synchronized(this){
-            if(lastWrittenLocation==lastLocation){
+        synchronized (this) {
+            if (lastWrittenLocation == lastLocation) {
                 return;
             }
-            if(flushLatch==null){
-                flushLatch=new CountDownLatch(1);
+            if (flushLatch == null) {
+                flushLatch = new CountDownLatch(1);
             }
-            countDown=flushLatch;
+            countDown = flushLatch;
         }
-        try{
+        try {
             asyncWriteTask.wakeup();
             countDown.await();
-        }catch(InterruptedException e){
+        } catch (InterruptedException e) {
             throw new InterruptedIOException();
         }
-        if(log.isDebugEnabled()){
-            log.debug("flush finished");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("flush finished");
         }
     }
 
@@ -273,19 +281,19 @@
      * @return
      * @throws IOException
      */
-    void asyncWrite(){
-        try{
+    void asyncWrite() {
+        try {
             CountDownLatch countDown;
-            synchronized(this){
-                countDown=flushLatch;
-                flushLatch=null;
+            synchronized (this) {
+                countDown = flushLatch;
+                flushLatch = null;
             }
             mark.set(doAsyncWrite());
-            if(countDown!=null){
+            if (countDown != null) {
                 countDown.countDown();
             }
-        }catch(IOException e){
-            log.error("Checkpoint failed: "+e,e);
+        } catch (IOException e) {
+            LOG.error("Checkpoint failed: " + e, e);
         }
     }
 
@@ -293,67 +301,67 @@
      * @return
      * @throws IOException
      */
-    protected Location doAsyncWrite() throws IOException{
+    protected Location doAsyncWrite() throws IOException {
         final ArrayList<MessageAck> cpRemovedMessageLocations;
         final ArrayList<Location> cpActiveJournalLocations;
-        final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize();
+        final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
         final Location lastLocation;
         // swap out the message hash maps..
-        synchronized(this){
-            cpAddedMessageIds=this.messages;
-            cpRemovedMessageLocations=this.messageAcks;
-            cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
-            this.messages=new LinkedHashMap<MessageId,ReferenceData>();
-            this.messageAcks=new ArrayList<MessageAck>();
-            lastLocation=this.lastLocation;
-        }
-        if(log.isDebugEnabled())
-            log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "
-                    +cpRemovedMessageLocations.size()+" ");
-        transactionTemplate.run(new Callback(){
-
-            public void execute() throws Exception{
-                int size=0;
-                PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter();
-                ConnectionContext context=transactionTemplate.getContext();
+        synchronized (this) {
+            cpAddedMessageIds = this.messages;
+            cpRemovedMessageLocations = this.messageAcks;
+            cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
+            this.messages = new LinkedHashMap<MessageId, ReferenceData>();
+            this.messageAcks = new ArrayList<MessageAck>();
+            lastLocation = this.lastLocation;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
+        }
+        transactionTemplate.run(new Callback() {
+
+            public void execute() throws Exception {
+                int size = 0;
+                PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
+                ConnectionContext context = transactionTemplate.getContext();
                 // Checkpoint the added messages.
-                Iterator<Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator();
-                while(iterator.hasNext()){
-                    Entry<MessageId,ReferenceData> entry=iterator.next();
-                    try{
-                        referenceStore.addMessageReference(context,entry.getKey(),entry.getValue());
-                    }catch(Throwable e){
-                        log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+                Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Entry<MessageId, ReferenceData> entry = iterator.next();
+                    try {
+                        referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
+                    } catch (Throwable e) {
+                        LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
                     }
                     size++;
                     // Commit the batch if it's getting too big
-                    if(size>=maxCheckpointMessageAddSize){
+                    if (size >= maxCheckpointMessageAddSize) {
                         persitanceAdapter.commitTransaction(context);
                         persitanceAdapter.beginTransaction(context);
-                        size=0;
+                        size = 0;
                     }
                 }
                 persitanceAdapter.commitTransaction(context);
                 persitanceAdapter.beginTransaction(context);
                 // Checkpoint the removed messages.
-                for(MessageAck ack:cpRemovedMessageLocations){
-                    try{
-                        referenceStore.removeMessage(transactionTemplate.getContext(),ack);
-                    }catch(Throwable e){
-                        log.warn("Message could not be removed from long term store: "+e.getMessage(),e);
+                for (MessageAck ack : cpRemovedMessageLocations) {
+                    try {
+                        referenceStore.removeMessage(transactionTemplate.getContext(), ack);
+                    } catch (Throwable e) {
+                        LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
                     }
                 }
             }
         });
-        log.debug("Batch update done.");
-        synchronized(this){
-            cpAddedMessageIds=null;
-            lastWrittenLocation=lastLocation;
+        LOG.debug("Batch update done.");
+        synchronized (this) {
+            cpAddedMessageIds = null;
+            lastWrittenLocation = lastLocation;
         }
-        if(cpActiveJournalLocations.size()>0){
+        if (cpActiveJournalLocations.size() > 0) {
             Collections.sort(cpActiveJournalLocations);
             return cpActiveJournalLocations.get(0);
-        }else{
+        } else {
             return lastLocation;
         }
     }
@@ -361,50 +369,50 @@
     /**
      * 
      */
-    public Message getMessage(MessageId identity) throws IOException{
-        ReferenceData data=null;
-        synchronized(this){
+    public Message getMessage(MessageId identity) throws IOException {
+        ReferenceData data = null;
+        synchronized (this) {
             // Is it still in flight???
-            data=messages.get(identity);
-            if(data==null&&cpAddedMessageIds!=null){
-                data=cpAddedMessageIds.get(identity);
+            data = messages.get(identity);
+            if (data == null && cpAddedMessageIds != null) {
+                data = cpAddedMessageIds.get(identity);
             }
         }
-        if(data==null){
-            data=referenceStore.getMessageReference(identity);
-            if(data==null){
+        if (data == null) {
+            data = referenceStore.getMessageReference(identity);
+            if (data == null) {
                 return null;
             }
         }
-        Location location=new Location();
+        Location location = new Location();
         location.setDataFileId(data.getFileId());
         location.setOffset(data.getOffset());
-        DataStructure rc=peristenceAdapter.readCommand(location);
-        try{
+        DataStructure rc = peristenceAdapter.readCommand(location);
+        try {
             return (Message)rc;
-        }catch(ClassCastException e){
-            throw new IOException("Could not read message "+identity+" at location "+location
-                    +", expected a message, but got: "+rc);
+        } catch (ClassCastException e) {
+            throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
         }
     }
 
     /**
-     * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
-     * transaction log and then the cache is updated.
+     * Replays the referenceStore first as those messages are the oldest ones,
+     * then messages are replayed from the transaction log and then the cache is
+     * updated.
      * 
      * @param listener
      * @throws Exception
      */
-    public void recover(final MessageRecoveryListener listener) throws Exception{
+    public void recover(final MessageRecoveryListener listener) throws Exception {
         flush();
-        referenceStore.recover(new RecoveryListenerAdapter(this,listener));
+        referenceStore.recover(new RecoveryListenerAdapter(this, listener));
     }
 
-    public void start() throws Exception{
+    public void start() throws Exception {
         referenceStore.start();
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         flush();
         asyncWriteTask.shutdown();
         referenceStore.stop();
@@ -413,28 +421,27 @@
     /**
      * @return Returns the longTermStore.
      */
-    public ReferenceStore getReferenceStore(){
+    public ReferenceStore getReferenceStore() {
         return referenceStore;
     }
 
     /**
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
      */
-    public void removeAllMessages(ConnectionContext context) throws IOException{
+    public void removeAllMessages(ConnectionContext context) throws IOException {
         flush();
         referenceStore.removeAllMessages(context);
     }
 
-    public ActiveMQDestination getDestination(){
+    public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
-            throws IOException{
+    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
         throw new IOException("The journal does not support message references.");
     }
 
-    public String getMessageReference(MessageId identity) throws IOException{
+    public String getMessageReference(MessageId identity) throws IOException {
         throw new IOException("The journal does not support message references.");
     }
 
@@ -443,61 +450,54 @@
      * @throws IOException
      * @see org.apache.activemq.store.MessageStore#getMessageCount()
      */
-    public int getMessageCount() throws IOException{
+    public int getMessageCount() throws IOException {
         flush();
         return referenceStore.getMessageCount();
     }
 
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         /*
-        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
-        if(referenceStore.supportsExternalBatchControl()){
-            synchronized(this){
-                referenceStore.recoverNextMessages(maxReturned,recoveryListener);
-                if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
-                    // check for inflight messages
-                    int count=0;
-                    Iterator<Entry<MessageId,ReferenceData>> iterator=messages.entrySet().iterator();
-                    while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
-                        Entry<MessageId,ReferenceData> entry=iterator.next();
-                        ReferenceData data=entry.getValue();
-                        Message message=getMessage(data);
-                        recoveryListener.recoverMessage(message);
-                        count++;
-                    }
-                    referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId());
-                }
-            }
-        }else{
-            flush();
-            referenceStore.recoverNextMessages(maxReturned,recoveryListener);
-        }
-        */
-        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
-        referenceStore.recoverNextMessages(maxReturned,recoveryListener);
-        if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
+         * RecoveryListenerAdapter recoveryListener=new
+         * RecoveryListenerAdapter(this,listener);
+         * if(referenceStore.supportsExternalBatchControl()){
+         * synchronized(this){
+         * referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+         * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check
+         * for inflight messages int count=0; Iterator<Entry<MessageId,ReferenceData>>
+         * iterator=messages.entrySet().iterator();
+         * while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
+         * Entry<MessageId,ReferenceData> entry=iterator.next(); ReferenceData
+         * data=entry.getValue(); Message message=getMessage(data);
+         * recoveryListener.recoverMessage(message); count++; }
+         * referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } }
+         * }else{ flush();
+         * referenceStore.recoverNextMessages(maxReturned,recoveryListener); }
+         */
+        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
+        referenceStore.recoverNextMessages(maxReturned, recoveryListener);
+        if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
             flush();
-            referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         }
     }
 
-    Message getMessage(ReferenceData data) throws IOException{
-        Location location=new Location();
+    Message getMessage(ReferenceData data) throws IOException {
+        Location location = new Location();
         location.setDataFileId(data.getFileId());
         location.setOffset(data.getOffset());
-        DataStructure rc=peristenceAdapter.readCommand(location);
-        try{
+        DataStructure rc = peristenceAdapter.readCommand(location);
+        try {
             return (Message)rc;
-        }catch(ClassCastException e){
-            throw new IOException("Could not read message  at location "+location+", expected a message, but got: "+rc);
+        } catch (ClassCastException e) {
+            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: " + rc);
         }
     }
 
-    public void resetBatching(){
+    public void resetBatching() {
         referenceStore.resetBatching();
     }
 
-    public Location getMark(){
+    public Location getMark() {
         return mark.get();
     }
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Aug  8 11:56:59 2007
@@ -22,10 +22,11 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activeio.journal.Journal;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -35,7 +36,6 @@
 import org.apache.activemq.command.JournalTrace;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.memory.UsageListener;
@@ -62,46 +62,46 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
- * asynchronously on a timeout with some other long term persistent storage.
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
  * 
  * @org.apache.xbean.XBean element="amqPersistenceAdapter"
- * 
  * @version $Revision: 1.17 $
  */
 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
 
-    private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class);
-    private final ConcurrentHashMap<ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,AMQMessageStore>();
-    private final ConcurrentHashMap<ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,AMQMessageStore>();
+    private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
+    private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
+    private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
     private AsyncDataManager asyncDataManager;
     private ReferenceStoreAdapter referenceStoreAdapter;
     private TaskRunnerFactory taskRunnerFactory;
-    private WireFormat wireFormat=new OpenWireFormat();
+    private WireFormat wireFormat = new OpenWireFormat();
     private UsageManager usageManager;
-    private long cleanupInterval=1000*60;
-    private long checkpointInterval=1000*10;
-    private int maxCheckpointWorkers=1;
-    private int maxCheckpointMessageAddSize=1024*4;
-    private AMQTransactionStore transactionStore=new AMQTransactionStore(this);
+    private long cleanupInterval = 1000 * 60;
+    private long checkpointInterval = 1000 * 10;
+    private int maxCheckpointWorkers = 1;
+    private int maxCheckpointMessageAddSize = 1024 * 4;
+    private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
     private TaskRunner checkpointTask;
-    private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
-    private final AtomicBoolean started=new AtomicBoolean(false);
+    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private Runnable periodicCheckpointTask;
     private Runnable periodicCleanupTask;
     private boolean deleteAllMessages;
     private boolean syncOnWrite;
-    private String brokerName="";
+    private String brokerName = "";
     private File directory;
     private BrokerService brokerService;
 
-    public String getBrokerName(){
+    public String getBrokerName() {
         return this.brokerName;
     }
 
-    public void setBrokerName(String brokerName){
-        this.brokerName=brokerName;
-        if(this.referenceStoreAdapter!=null){
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+        if (this.referenceStoreAdapter != null) {
             this.referenceStoreAdapter.setBrokerName(brokerName);
         }
     }
@@ -114,165 +114,170 @@
         this.brokerService = brokerService;
     }
 
-    public synchronized void start() throws Exception{
-        if(!started.compareAndSet(false,true))
+    public synchronized void start() throws Exception {
+        if (!started.compareAndSet(false, true)) {
             return;
-        if(this.directory==null) {
+        }
+        if (this.directory == null) {
             if (brokerService != null) {
                 this.directory = brokerService.getBrokerDataDirectory();
-            }
-            else {
-                this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
-                this.directory=new File(directory,"amqstore");
+            } else {
+                this.directory = new File(IOHelper.getDefaultDataDirectory(), brokerName);
+                this.directory = new File(directory, "amqstore");
             }
         }
         log.info("AMQStore starting using directory: " + directory);
         this.directory.mkdirs();
 
-        if(this.usageManager!=null){
+        if (this.usageManager != null) {
             this.usageManager.addUsageListener(this);
         }
-        if(asyncDataManager==null){
-            asyncDataManager=createAsyncDataManager();
+        if (asyncDataManager == null) {
+            asyncDataManager = createAsyncDataManager();
         }
-        if(referenceStoreAdapter==null){
-            referenceStoreAdapter=createReferenceStoreAdapter();
+        if (referenceStoreAdapter == null) {
+            referenceStoreAdapter = createReferenceStoreAdapter();
         }
-        referenceStoreAdapter.setDirectory(new File(directory,"kr-store"));
+        referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
         referenceStoreAdapter.setBrokerName(getBrokerName());
         referenceStoreAdapter.setUsageManager(usageManager);
-        if(taskRunnerFactory==null){
-            taskRunnerFactory=createTaskRunnerFactory();
+        if (taskRunnerFactory == null) {
+            taskRunnerFactory = createTaskRunnerFactory();
         }
         asyncDataManager.start();
-        if(deleteAllMessages){
+        if (deleteAllMessages) {
             asyncDataManager.delete();
-            try{
-                JournalTrace trace=new JournalTrace();
-                trace.setMessage("DELETED "+new Date());
-                Location location=asyncDataManager.write(wireFormat.marshal(trace),false);
-                asyncDataManager.setMark(location,true);
+            try {
+                JournalTrace trace = new JournalTrace();
+                trace.setMessage("DELETED " + new Date());
+                Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
+                asyncDataManager.setMark(location, true);
                 log.info("Journal deleted: ");
-                deleteAllMessages=false;
-            }catch(IOException e){
+                deleteAllMessages = false;
+            } catch (IOException e) {
                 throw e;
-            }catch(Throwable e){
+            } catch (Throwable e) {
                 throw IOExceptionSupport.create(e);
             }
             referenceStoreAdapter.deleteAllMessages();
         }
         referenceStoreAdapter.start();
-        Set<Integer> files=referenceStoreAdapter.getReferenceFileIdsInUse();
-        log.info("Active data files: "+files);
-        checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
+        Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
+        log.info("Active data files: " + files);
+        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
 
-            public boolean iterate(){
+            public boolean iterate() {
                 doCheckpoint();
                 return false;
             }
-        },"ActiveMQ Journal Checkpoint Worker");
+        }, "ActiveMQ Journal Checkpoint Worker");
         createTransactionStore();
-        
-//
-// The following was attempting to reduce startup times by avoiding the log 
-// file scanning that recovery performs.  The problem with it is that XA transactions
-// only live in transaction log and are not stored in the reference store, but they still
-// need to be recovered when the broker starts up.  
-        
-        if(referenceStoreAdapter.isStoreValid()==false){
+
+        //
+        // The following was attempting to reduce startup times by avoiding the
+        // log
+        // file scanning that recovery performs. The problem with it is that XA
+        // transactions
+        // only live in transaction log and are not stored in the reference
+        // store, but they still
+        // need to be recovered when the broker starts up.
+
+        if (referenceStoreAdapter.isStoreValid() == false) {
             log.warn("The ReferenceStore is not valid - recovering ...");
             recover();
             log.info("Finished recovering the ReferenceStore");
-        }else {
-           Location location=writeTraceMessage("RECOVERED "+new Date(),true);
-            asyncDataManager.setMark(location,true);
-            //recover transactions
+        } else {
+            Location location = writeTraceMessage("RECOVERED " + new Date(), true);
+            asyncDataManager.setMark(location, true);
+            // recover transactions
             getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState());
-       }
-        
-              
+        }
+
         // Do a checkpoint periodically.
-        periodicCheckpointTask=new Runnable(){
+        periodicCheckpointTask = new Runnable() {
 
-            public void run(){
+            public void run() {
                 checkpoint(false);
             }
         };
-        Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval);
-        periodicCleanupTask=new Runnable(){
+        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+        periodicCleanupTask = new Runnable() {
 
-            public void run(){
+            public void run() {
                 cleanup();
             }
         };
-        Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval);
+        Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
     }
 
-    public void stop() throws Exception{
-      
-        if(!started.compareAndSet(true,false))
+    public void stop() throws Exception {
+
+        if (!started.compareAndSet(true, false)) {
             return;
+        }
         this.usageManager.removeUsageListener(this);
-        synchronized(this){
+        synchronized (this) {
             Scheduler.cancel(periodicCheckpointTask);
             Scheduler.cancel(periodicCleanupTask);
         }
-        Iterator<AMQMessageStore> iterator=queues.values().iterator();
-        while(iterator.hasNext()){
-            AMQMessageStore ms=iterator.next();
+        Iterator<AMQMessageStore> iterator = queues.values().iterator();
+        while (iterator.hasNext()) {
+            AMQMessageStore ms = iterator.next();
             ms.stop();
         }
-        iterator=topics.values().iterator();
-        while(iterator.hasNext()){
-            final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
+        iterator = topics.values().iterator();
+        while (iterator.hasNext()) {
+            final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
             ms.stop();
         }
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true);
-        synchronized(this){
+        synchronized (this) {
             checkpointTask.shutdown();
         }
         referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
         queues.clear();
         topics.clear();
-        IOException firstException=null;
+        IOException firstException = null;
         referenceStoreAdapter.stop();
-        try{
+        try {
             log.debug("Journal close");
             asyncDataManager.close();
-        }catch(Exception e){
-            firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
+        } catch (Exception e) {
+            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
         }
-        if(firstException!=null){
+        if (firstException != null) {
             throw firstException;
         }
     }
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * @param sync    
+     * 
+     * @param sync
      */
-    public void checkpoint(boolean sync){
-        try{
-            if(asyncDataManager==null)
+    public void checkpoint(boolean sync) {
+        try {
+            if (asyncDataManager == null) {
                 throw new IllegalStateException("Journal is closed.");
-            CountDownLatch latch=null;
-            synchronized(this){
-                latch=nextCheckpointCountDownLatch;
+            }
+            CountDownLatch latch = null;
+            synchronized (this) {
+                latch = nextCheckpointCountDownLatch;
                 checkpointTask.wakeup();
             }
-            if(sync){
-                if(log.isDebugEnabled()){
+            if (sync) {
+                if (log.isDebugEnabled()) {
                     log.debug("Waitng for checkpoint to complete.");
                 }
                 latch.await();
             }
             referenceStoreAdapter.checkpoint(sync);
-        }catch(InterruptedException e){
+        } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            log.warn("Request to start checkpoint failed: "+e,e);
-        }catch(IOException e){
-            log.error("checkpoint failed: "+e,e);
+            log.warn("Request to start checkpoint failed: " + e, e);
+        } catch (IOException e) {
+            log.error("checkpoint failed: " + e, e);
         }
     }
 
@@ -281,49 +286,49 @@
      * 
      * @return true if successful
      */
-    public boolean doCheckpoint(){
-        CountDownLatch latch=null;
-        synchronized(this){
-            latch=nextCheckpointCountDownLatch;
-            nextCheckpointCountDownLatch=new CountDownLatch(1);
+    public boolean doCheckpoint() {
+        CountDownLatch latch = null;
+        synchronized (this) {
+            latch = nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch = new CountDownLatch(1);
         }
-        try{
-            if(log.isDebugEnabled()){
+        try {
+            if (log.isDebugEnabled()) {
                 log.debug("Checkpoint started.");
             }
-           
-            Location newMark=null;
-            Iterator<AMQMessageStore> iterator=queues.values().iterator();
-            while(iterator.hasNext()){
-                final AMQMessageStore ms=iterator.next();
-                Location mark=(Location)ms.getMark();
-                if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
-                    newMark=mark;
+
+            Location newMark = null;
+            Iterator<AMQMessageStore> iterator = queues.values().iterator();
+            while (iterator.hasNext()) {
+                final AMQMessageStore ms = iterator.next();
+                Location mark = (Location)ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                    newMark = mark;
                 }
             }
-            iterator=topics.values().iterator();
-            while(iterator.hasNext()){
-                final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
-                Location mark=(Location)ms.getMark();
-                if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
-                    newMark=mark;
+            iterator = topics.values().iterator();
+            while (iterator.hasNext()) {
+                final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
+                Location mark = (Location)ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+                    newMark = mark;
                 }
             }
-            try{
-                if(newMark!=null){
-                    if(log.isDebugEnabled()){
-                        log.debug("Marking journal at: "+newMark);
+            try {
+                if (newMark != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Marking journal at: " + newMark);
                     }
-                    asyncDataManager.setMark(newMark,false);
-                    writeTraceMessage("CHECKPOINT "+new Date(),true);
+                    asyncDataManager.setMark(newMark, false);
+                    writeTraceMessage("CHECKPOINT " + new Date(), true);
                 }
-            }catch(Exception e){
-                log.error("Failed to mark the Journal: "+e,e);
+            } catch (Exception e) {
+                log.error("Failed to mark the Journal: " + e, e);
             }
-            if(log.isDebugEnabled()){
+            if (log.isDebugEnabled()) {
                 log.debug("Checkpoint done.");
             }
-        }finally{
+        } finally {
             latch.countDown();
         }
         return true;
@@ -335,79 +340,77 @@
      * @return
      * @throws IOException
      */
-    public void cleanup(){
-        try{
-            Set<Integer> inUse=referenceStoreAdapter.getReferenceFileIdsInUse();
+    public void cleanup() {
+        try {
+            Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
             asyncDataManager.consolidateDataFilesNotIn(inUse);
-        }catch(IOException e){
-            log.error("Could not cleanup data files: "+e,e);
+        } catch (IOException e) {
+            log.error("Could not cleanup data files: " + e, e);
         }
     }
 
-    public Set<ActiveMQDestination> getDestinations(){
-        Set<ActiveMQDestination> destinations=new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
         destinations.addAll(queues.keySet());
         destinations.addAll(topics.keySet());
         return destinations;
     }
 
-    MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
-        if(destination.isQueue()){
+    MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+        if (destination.isQueue()) {
             return createQueueMessageStore((ActiveMQQueue)destination);
-        }else{
+        } else {
             return createTopicMessageStore((ActiveMQTopic)destination);
         }
     }
-    
-    
 
-    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
-        AMQMessageStore store=queues.get(destination);
-        if(store==null){
-            ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination);
-            store=new AMQMessageStore(this,checkpointStore,destination);
-            try{
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        AMQMessageStore store = queues.get(destination);
+        if (store == null) {
+            ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
+            store = new AMQMessageStore(this, checkpointStore, destination);
+            try {
                 store.start();
-            }catch(Exception e){
+            } catch (Exception e) {
                 throw IOExceptionSupport.create(e);
             }
-            queues.put(destination,store);
+            queues.put(destination, store);
         }
         return store;
     }
 
-    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{
-        AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName);
-        if(store==null){
-            TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName);
-            store=new AMQTopicMessageStore(this,checkpointStore,destinationName);
-            try{
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
+        AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName);
+        if (store == null) {
+            TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
+            store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
+            try {
                 store.start();
-            }catch(Exception e){
+            } catch (Exception e) {
                 throw IOExceptionSupport.create(e);
             }
-            topics.put(destinationName,store);
+            topics.put(destinationName, store);
         }
         return store;
     }
 
-    public TransactionStore createTransactionStore() throws IOException{
+    public TransactionStore createTransactionStore() throws IOException {
         return transactionStore;
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException{
+    public long getLastMessageBrokerSequenceId() throws IOException {
         return referenceStoreAdapter.getLastMessageBrokerSequenceId();
     }
 
-    public void beginTransaction(ConnectionContext context) throws IOException{
+    public void beginTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.beginTransaction(context);
     }
 
-    public void commitTransaction(ConnectionContext context) throws IOException{
+    public void commitTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.commitTransaction(context);
     }
 
-    public void rollbackTransaction(ConnectionContext context) throws IOException{
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.rollbackTransaction(context);
     }
 
@@ -416,91 +419,93 @@
      * @return
      * @throws IOException
      */
-    public DataStructure readCommand(Location location) throws IOException{
-        try{
-            ByteSequence packet=asyncDataManager.read(location);
+    public DataStructure readCommand(Location location) throws IOException {
+        try {
+            ByteSequence packet = asyncDataManager.read(location);
             return (DataStructure)wireFormat.unmarshal(packet);
-        }catch(IOException e){
-            throw createReadException(location,e);
+        } catch (IOException e) {
+            throw createReadException(location, e);
         }
     }
 
     /**
-     * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
+     * Move all the messages that were in the journal into long term storage. We
+     * just replay and do a checkpoint.
      * 
      * @throws IOException
      * @throws IOException
      * @throws InvalidLocationException
      * @throws IllegalStateException
      */
-    private void recover() throws IllegalStateException,IOException{
+    private void recover() throws IllegalStateException, IOException {
         referenceStoreAdapter.clearMessages();
         referenceStoreAdapter.recoverState();
-        Location pos=null;
-        int redoCounter=0;
-        log.info("Journal Recovery Started from: "+asyncDataManager);
-        long start=System.currentTimeMillis();
-        ConnectionContext context=new ConnectionContext();
+        Location pos = null;
+        int redoCounter = 0;
+        log.info("Journal Recovery Started from: " + asyncDataManager);
+        long start = System.currentTimeMillis();
+        ConnectionContext context = new ConnectionContext();
         // While we have records in the journal.
-        while((pos=asyncDataManager.getNextLocation(pos))!=null){
-            ByteSequence data=asyncDataManager.read(pos);
-            DataStructure c=(DataStructure)wireFormat.unmarshal(data);
-            if(c instanceof Message){
-                Message message=(Message)c;
-                AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination());
-                if(message.isInTransaction()){
-                    transactionStore.addMessage(store,message,pos);
-                }else{
-                    if(store.replayAddMessage(context,message,pos)){
+        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+            ByteSequence data = asyncDataManager.read(pos);
+            DataStructure c = (DataStructure)wireFormat.unmarshal(data);
+            if (c instanceof Message) {
+                Message message = (Message)c;
+                AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
+                if (message.isInTransaction()) {
+                    transactionStore.addMessage(store, message, pos);
+                } else {
+                    if (store.replayAddMessage(context, message, pos)) {
                         redoCounter++;
                     }
                 }
-            }else{
-                switch(c.getDataStructureType()){
+            } else {
+                switch (c.getDataStructureType()) {
                 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
-                    JournalQueueAck command=(JournalQueueAck)c;
-                    AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination());
-                    if(command.getMessageAck().isInTransaction()){
-                        transactionStore.removeMessage(store,command.getMessageAck(),pos);
-                    }else{
-                        if(store.replayRemoveMessage(context,command.getMessageAck())){
+                    JournalQueueAck command = (JournalQueueAck)c;
+                    AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
+                    if (command.getMessageAck().isInTransaction()) {
+                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
+                    } else {
+                        if (store.replayRemoveMessage(context, command.getMessageAck())) {
                             redoCounter++;
                         }
                     }
                 }
                     break;
                 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
-                    JournalTopicAck command=(JournalTopicAck)c;
-                    AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination());
-                    if(command.getTransactionId()!=null){
-                        transactionStore.acknowledge(store,command,pos);
-                    }else{
-                        if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
-                                .getMessageId())){
+                    JournalTopicAck command = (JournalTopicAck)c;
+                    AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
+                    if (command.getTransactionId() != null) {
+                        transactionStore.acknowledge(store, command, pos);
+                    } else {
+                        if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
                             redoCounter++;
                         }
                     }
                 }
                     break;
                 case JournalTransaction.DATA_STRUCTURE_TYPE: {
-                    JournalTransaction command=(JournalTransaction)c;
-                    try{
+                    JournalTransaction command = (JournalTransaction)c;
+                    try {
                         // Try to replay the packet.
-                        switch(command.getType()){
+                        switch (command.getType()) {
                         case JournalTransaction.XA_PREPARE:
                             transactionStore.replayPrepare(command.getTransactionId());
                             break;
                         case JournalTransaction.XA_COMMIT:
                         case JournalTransaction.LOCAL_COMMIT:
-                            AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
-                            if(tx==null)
-                                break; // We may be trying to replay a commit that
+                            AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+                            if (tx == null) {
+                                break; // We may be trying to replay a commit
+                            }
+                            // that
                             // was already committed.
                             // Replay the committed operations.
                             tx.getOperations();
-                            for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
-                                AMQTxOperation op=(AMQTxOperation)iter.next();
-                                if (op.replay(this,context)) {
+                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+                                AMQTxOperation op = (AMQTxOperation)iter.next();
+                                if (op.replay(this, context)) {
                                     redoCounter++;
                                 }
                             }
@@ -510,176 +515,174 @@
                             transactionStore.replayRollback(command.getTransactionId());
                             break;
                         }
-                    }catch(IOException e){
-                        log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
+                    } catch (IOException e) {
+                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
                     }
                 }
                     break;
                 case JournalTrace.DATA_STRUCTURE_TYPE:
-                    JournalTrace trace=(JournalTrace)c;
-                    log.debug("TRACE Entry: "+trace.getMessage());
+                    JournalTrace trace = (JournalTrace)c;
+                    log.debug("TRACE Entry: " + trace.getMessage());
                     break;
                 default:
-                    log.error("Unknown type of record in transaction log which will be discarded: "+c);
+                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
                 }
             }
         }
-        Location location=writeTraceMessage("RECOVERED "+new Date(),true);
-        asyncDataManager.setMark(location,true);
-        long end=System.currentTimeMillis();
-        log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds.");
+        Location location = writeTraceMessage("RECOVERED " + new Date(), true);
+        asyncDataManager.setMark(location, true);
+        long end = System.currentTimeMillis();
+        log.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
     }
 
-    private IOException createReadException(Location location,Exception e){
-        return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
+    private IOException createReadException(Location location, Exception e) {
+        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
     }
 
-    protected IOException createWriteException(DataStructure packet,Exception e){
-        return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
+    protected IOException createWriteException(DataStructure packet, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
     }
 
-    protected IOException createWriteException(String command,Exception e){
-        return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
+    protected IOException createWriteException(String command, Exception e) {
+        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
     }
 
-    protected IOException createRecoveryFailedException(Exception e){
-        return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
+    protected IOException createRecoveryFailedException(Exception e) {
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
     }
 
     /**
-     * 
      * @param command
      * @param syncHint
      * @return
      * @throws IOException
      */
-    public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{
-        return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite));
+    public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
+        return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite));
     }
 
-    private Location writeTraceMessage(String message,boolean sync) throws IOException{
-        JournalTrace trace=new JournalTrace();
+    private Location writeTraceMessage(String message, boolean sync) throws IOException {
+        JournalTrace trace = new JournalTrace();
         trace.setMessage(message);
-        return writeCommand(trace,sync);
+        return writeCommand(trace, sync);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
-        newPercentUsage=((newPercentUsage)/10)*10;
-        oldPercentUsage=((oldPercentUsage)/10)*10;
-        if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){
+    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+        newPercentUsage = ((newPercentUsage) / 10) * 10;
+        oldPercentUsage = ((oldPercentUsage) / 10) * 10;
+        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
             checkpoint(false);
         }
     }
 
-    public AMQTransactionStore getTransactionStore(){
+    public AMQTransactionStore getTransactionStore() {
         return transactionStore;
     }
 
-    public synchronized void deleteAllMessages() throws IOException{
-        deleteAllMessages=true;
+    public synchronized void deleteAllMessages() throws IOException {
+        deleteAllMessages = true;
     }
 
-    public String toString(){
-        return "AMQPersistenceAdapter("+directory+")";
+    public String toString() {
+        return "AMQPersistenceAdapter(" + directory + ")";
     }
 
     // /////////////////////////////////////////////////////////////////
     // Subclass overridables
     // /////////////////////////////////////////////////////////////////
-    protected AsyncDataManager createAsyncDataManager(){
-        AsyncDataManager manager=new AsyncDataManager();
-        manager.setDirectory(new File(directory,"journal"));
+    protected AsyncDataManager createAsyncDataManager() {
+        AsyncDataManager manager = new AsyncDataManager();
+        manager.setDirectory(new File(directory, "journal"));
         return manager;
     }
 
-    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException{
-        KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter();
+    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+        KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter();
         return adaptor;
     }
 
-    protected TaskRunnerFactory createTaskRunnerFactory(){
+    protected TaskRunnerFactory createTaskRunnerFactory() {
         return DefaultThreadPools.getDefaultTaskRunnerFactory();
     }
 
     // /////////////////////////////////////////////////////////////////
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
-    public AsyncDataManager getAsyncDataManager(){
+    public AsyncDataManager getAsyncDataManager() {
         return asyncDataManager;
     }
 
-    public void setAsyncDataManager(AsyncDataManager asyncDataManager){
-        this.asyncDataManager=asyncDataManager;
+    public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+        this.asyncDataManager = asyncDataManager;
     }
 
-    public ReferenceStoreAdapter getReferenceStoreAdapter(){
+    public ReferenceStoreAdapter getReferenceStoreAdapter() {
         return referenceStoreAdapter;
     }
 
-    public TaskRunnerFactory getTaskRunnerFactory(){
+    public TaskRunnerFactory getTaskRunnerFactory() {
         return taskRunnerFactory;
     }
 
-    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
-        this.taskRunnerFactory=taskRunnerFactory;
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+        this.taskRunnerFactory = taskRunnerFactory;
     }
 
     /**
      * @return Returns the wireFormat.
      */
-    public WireFormat getWireFormat(){
+    public WireFormat getWireFormat() {
         return wireFormat;
     }
 
-    public void setWireFormat(WireFormat wireFormat){
-        this.wireFormat=wireFormat;
+    public void setWireFormat(WireFormat wireFormat) {
+        this.wireFormat = wireFormat;
     }
 
-    public UsageManager getUsageManager(){
+    public UsageManager getUsageManager() {
         return usageManager;
     }
 
-    public void setUsageManager(UsageManager usageManager){
-        this.usageManager=usageManager;
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
     }
 
-    public int getMaxCheckpointMessageAddSize(){
+    public int getMaxCheckpointMessageAddSize() {
         return maxCheckpointMessageAddSize;
     }
 
-    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
-        this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
     }
 
-    public int getMaxCheckpointWorkers(){
+    public int getMaxCheckpointWorkers() {
         return maxCheckpointWorkers;
     }
 
-    public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
-        this.maxCheckpointWorkers=maxCheckpointWorkers;
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+        this.maxCheckpointWorkers = maxCheckpointWorkers;
     }
 
-    public synchronized File getDirectory(){
+    public synchronized File getDirectory() {
         return directory;
     }
 
-    public synchronized void setDirectory(File directory){
-        this.directory=directory;
+    public synchronized void setDirectory(File directory) {
+        this.directory = directory;
     }
 
-    public boolean isSyncOnWrite(){
+    public boolean isSyncOnWrite() {
         return this.syncOnWrite;
     }
 
-    public void setSyncOnWrite(boolean syncOnWrite){
-        this.syncOnWrite=syncOnWrite;
+    public void setSyncOnWrite(boolean syncOnWrite) {
+        this.syncOnWrite = syncOnWrite;
     }
 
-    
     /**
      * @param referenceStoreAdapter the referenceStoreAdapter to set
      */
-    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){
-        this.referenceStoreAdapter=referenceStoreAdapter;
+    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+        this.referenceStoreAdapter = referenceStoreAdapter;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Wed Aug  8 11:56:59 2007
@@ -28,106 +28,101 @@
  * 
  * @version $Revision: 1.17 $
  */
-public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{
+public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
 
     private TaskRunnerFactory taskRunnerFactory;
     private File dataDirectory;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
-    private String brokerName="localhost";
+    private String brokerName = "localhost";
     private ReferenceStoreAdapter referenceStoreAdapter;
-    
+
     /**
      * @return a AMQPersistenceAdapter
      * @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter()
      */
-    public PersistenceAdapter createPersistenceAdapter(){
-        AMQPersistenceAdapter result =  new AMQPersistenceAdapter();
+    public PersistenceAdapter createPersistenceAdapter() {
+        AMQPersistenceAdapter result = new AMQPersistenceAdapter();
         result.setDirectory(getDataDirectory());
         result.setTaskRunnerFactory(getTaskRunnerFactory());
         result.setBrokerName(getBrokerName());
         result.setReferenceStoreAdapter(getReferenceStoreAdapter());
         return result;
     }
-    
+
     /**
      * @return the dataDirectory
      */
-    public File getDataDirectory(){
-        if(this.dataDirectory==null){
-            this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+    public File getDataDirectory() {
+        if (this.dataDirectory == null) {
+            this.dataDirectory = new File(IOHelper.getDefaultDataDirectory(), brokerName);
         }
         return this.dataDirectory;
     }
-    
+
     /**
      * @param dataDirectory the dataDirectory to set
      */
-    public void setDataDirectory(File dataDirectory){
-        this.dataDirectory=dataDirectory;
+    public void setDataDirectory(File dataDirectory) {
+        this.dataDirectory = dataDirectory;
     }
-    
+
     /**
      * @return the taskRunnerFactory
      */
-    public TaskRunnerFactory getTaskRunnerFactory(){
-        if( taskRunnerFactory == null ) {
-            taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000);
+    public TaskRunnerFactory getTaskRunnerFactory() {
+        if (taskRunnerFactory == null) {
+            taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
+                                                      true, 1000);
         }
         return taskRunnerFactory;
     }
-    
+
     /**
      * @param taskRunnerFactory the taskRunnerFactory to set
      */
-    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
-        this.taskRunnerFactory=taskRunnerFactory;
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+        this.taskRunnerFactory = taskRunnerFactory;
     }
 
-    
     /**
      * @return the journalThreadPriority
      */
-    public int getJournalThreadPriority(){
+    public int getJournalThreadPriority() {
         return this.journalThreadPriority;
     }
 
-    
     /**
      * @param journalThreadPriority the journalThreadPriority to set
      */
-    public void setJournalThreadPriority(int journalThreadPriority){
-        this.journalThreadPriority=journalThreadPriority;
+    public void setJournalThreadPriority(int journalThreadPriority) {
+        this.journalThreadPriority = journalThreadPriority;
     }
 
-    
     /**
      * @return the brokerName
      */
-    public String getBrokerName(){
+    public String getBrokerName() {
         return this.brokerName;
     }
 
-    
     /**
      * @param brokerName the brokerName to set
      */
-    public void setBrokerName(String brokerName){
-        this.brokerName=brokerName;
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
     }
 
-    
     /**
      * @return the referenceStoreAdapter
      */
-    public ReferenceStoreAdapter getReferenceStoreAdapter(){
+    public ReferenceStoreAdapter getReferenceStoreAdapter() {
         return this.referenceStoreAdapter;
     }
 
-    
     /**
      * @param referenceStoreAdapter the referenceStoreAdapter to set
      */
-    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){
-        this.referenceStoreAdapter=referenceStoreAdapter;
+    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+        this.referenceStoreAdapter = referenceStoreAdapter;
     }
 }



Mime
View raw message