Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 49798 invoked from network); 8 Aug 2007 19:00:29 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Aug 2007 19:00:29 -0000 Received: (qmail 67920 invoked by uid 500); 8 Aug 2007 19:00:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 67903 invoked by uid 500); 8 Aug 2007 19:00:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 67894 invoked by uid 99); 8 Aug 2007 19:00:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 12:00:28 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 19:00:25 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 498341A9877; Wed, 8 Aug 2007 11:59:20 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r563982 [16/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm... Date: Wed, 08 Aug 2007 18:58:13 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070808185920.498341A9877@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Aug 8 11:56:59 2007 @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.DataStructure; @@ -51,78 +52,82 @@ * * @version $Revision: 1.14 $ */ -public class AMQMessageStore implements MessageStore{ +public class AMQMessageStore implements MessageStore { - private static final Log log=LogFactory.getLog(AMQMessageStore.class); + private static final Log LOG = LogFactory.getLog(AMQMessageStore.class); protected final AMQPersistenceAdapter peristenceAdapter; protected final AMQTransactionStore transactionStore; protected final ReferenceStore referenceStore; protected final ActiveMQDestination destination; protected final TransactionTemplate transactionTemplate; - private LinkedHashMap messages=new LinkedHashMap(); - private ArrayList messageAcks=new ArrayList(); + private LinkedHashMap messages = new LinkedHashMap(); + private ArrayList messageAcks = new ArrayList(); /** A MessageStore that we can use to retrieve messages quickly. */ - private LinkedHashMap cpAddedMessageIds; + private LinkedHashMap cpAddedMessageIds; protected Location lastLocation; protected Location lastWrittenLocation; - protected HashSet inFlightTxLocations=new HashSet(); + protected HashSet inFlightTxLocations = new HashSet(); protected final TaskRunner asyncWriteTask; protected CountDownLatch flushLatch; - private final boolean debug=log.isDebugEnabled(); - private final AtomicReference mark=new AtomicReference(); - - public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){ - this.peristenceAdapter=adapter; - this.transactionStore=adapter.getTransactionStore(); - this.referenceStore=referenceStore; - this.destination=destination; - this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext()); - asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){ + private final boolean debug = LOG.isDebugEnabled(); + private final AtomicReference mark = new AtomicReference(); + + public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { + this.peristenceAdapter = adapter; + this.transactionStore = adapter.getTransactionStore(); + this.referenceStore = referenceStore; + this.destination = destination; + this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() { - public boolean iterate(){ + public boolean iterate() { asyncWrite(); return false; } - },"Checkpoint: "+destination); + }, "Checkpoint: " + destination); } - public void setUsageManager(UsageManager usageManager){ + public void setUsageManager(UsageManager usageManager) { referenceStore.setUsageManager(usageManager); } /** - * Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it - * is doing. + * Not synchronized since the Journal has better throughput if you increase + * the number of concurrent writes that it is doing. */ - public void addMessage(ConnectionContext context,final Message message) throws IOException{ - final MessageId id=message.getMessageId(); - final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired()); - if(!context.isInTransaction()){ - if(debug) - log.debug("Journalled message add for: "+id+", at: "+location); - addMessage(message,location); - }else{ - if(debug) - log.debug("Journalled transacted message add for: "+id+", at: "+location); - synchronized(this){ + public void addMessage(ConnectionContext context, final Message message) throws IOException { + final MessageId id = message.getMessageId(); + final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); + if (!context.isInTransaction()) { + if (debug) { + LOG.debug("Journalled message add for: " + id + ", at: " + location); + } + addMessage(message, location); + } else { + if (debug) { + LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); + } + synchronized (this) { inFlightTxLocations.add(location); } - transactionStore.addMessage(this,message,location); - context.getTransaction().addSynchronization(new Synchronization(){ + transactionStore.addMessage(this, message, location); + context.getTransaction().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception{ - if(debug) - log.debug("Transacted message add commit for: "+id+", at: "+location); - synchronized(AMQMessageStore.this){ + public void afterCommit() throws Exception { + if (debug) { + LOG.debug("Transacted message add commit for: " + id + ", at: " + location); + } + synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); - addMessage(message,location); + addMessage(message, location); } } - public void afterRollback() throws Exception{ - if(debug) - log.debug("Transacted message add rollback for: "+id+", at: "+location); - synchronized(AMQMessageStore.this){ + public void afterRollback() throws Exception { + if (debug) { + LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); + } + synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); } } @@ -130,74 +135,78 @@ } } - void addMessage(final Message message,final Location location) throws InterruptedIOException{ - ReferenceData data=new ReferenceData(); + void addMessage(final Message message, final Location location) throws InterruptedIOException { + ReferenceData data = new ReferenceData(); data.setExpiration(message.getExpiration()); data.setFileId(location.getDataFileId()); data.setOffset(location.getOffset()); - synchronized(this){ - lastLocation=location; - messages.put(message.getMessageId(),data); + synchronized (this) { + lastLocation = location; + messages.put(message.getMessageId(), data); } - try{ + try { asyncWriteTask.wakeup(); - }catch(InterruptedException e){ + } catch (InterruptedException e) { throw new InterruptedIOException(); } } - public boolean replayAddMessage(ConnectionContext context,Message message,Location location){ - MessageId id=message.getMessageId(); - try{ + public boolean replayAddMessage(ConnectionContext context, Message message, Location location) { + MessageId id = message.getMessageId(); + try { // Only add the message if it has not already been added. - ReferenceData data=referenceStore.getMessageReference(id); - if(data==null){ - data=new ReferenceData(); + ReferenceData data = referenceStore.getMessageReference(id); + if (data == null) { + data = new ReferenceData(); data.setExpiration(message.getExpiration()); data.setFileId(location.getDataFileId()); data.setOffset(location.getOffset()); - referenceStore.addMessageReference(context,id,data); + referenceStore.addMessageReference(context, id, data); return true; } - }catch(Throwable e){ - log.warn("Could not replay add for message '"+id+"'. Message may have already been added. reason: "+e,e); + } catch (Throwable e) { + LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e, e); } return false; } /** */ - public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{ - JournalQueueAck remove=new JournalQueueAck(); + public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { + JournalQueueAck remove = new JournalQueueAck(); remove.setDestination(destination); remove.setMessageAck(ack); - final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired()); - if(!context.isInTransaction()){ - if(debug) - log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); - removeMessage(ack,location); - }else{ - if(debug) - log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); - synchronized(this){ + final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); + if (!context.isInTransaction()) { + if (debug) { + LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); + } + removeMessage(ack, location); + } else { + if (debug) { + LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); + } + synchronized (this) { inFlightTxLocations.add(location); } - transactionStore.removeMessage(this,ack,location); - context.getTransaction().addSynchronization(new Synchronization(){ + transactionStore.removeMessage(this, ack, location); + context.getTransaction().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception{ - if(debug) - log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); - synchronized(AMQMessageStore.this){ + public void afterCommit() throws Exception { + if (debug) { + LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); + } + synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); - removeMessage(ack,location); + removeMessage(ack, location); } } - public void afterRollback() throws Exception{ - if(debug) - log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); - synchronized(AMQMessageStore.this){ + public void afterRollback() throws Exception { + if (debug) { + LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); + } + synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); } } @@ -205,36 +214,35 @@ } } - final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{ + final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { ReferenceData data; - synchronized(this){ - lastLocation=location; - MessageId id=ack.getLastMessageId(); - data=messages.remove(id); - if(data==null){ + synchronized (this) { + lastLocation = location; + MessageId id = ack.getLastMessageId(); + data = messages.remove(id); + if (data == null) { messageAcks.add(ack); } } - if(data==null){ - try{ + if (data == null) { + try { asyncWriteTask.wakeup(); - }catch(InterruptedException e){ + } catch (InterruptedException e) { throw new InterruptedIOException(); } } } - public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){ - try{ + public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { + try { // Only remove the message if it has not already been removed. - ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId()); - if(t!=null){ - referenceStore.removeMessage(context,messageAck); + ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId()); + if (t != null) { + referenceStore.removeMessage(context, messageAck); return true; } - }catch(Throwable e){ - log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId() - +"'. Message may have already been acknowledged. reason: "+e); + } catch (Throwable e) { + LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); } return false; } @@ -244,28 +252,28 @@ * * @throws InterruptedIOException */ - public void flush() throws InterruptedIOException{ - if(log.isDebugEnabled()){ - log.debug("flush starting ..."); + public void flush() throws InterruptedIOException { + if (LOG.isDebugEnabled()) { + LOG.debug("flush starting ..."); } CountDownLatch countDown; - synchronized(this){ - if(lastWrittenLocation==lastLocation){ + synchronized (this) { + if (lastWrittenLocation == lastLocation) { return; } - if(flushLatch==null){ - flushLatch=new CountDownLatch(1); + if (flushLatch == null) { + flushLatch = new CountDownLatch(1); } - countDown=flushLatch; + countDown = flushLatch; } - try{ + try { asyncWriteTask.wakeup(); countDown.await(); - }catch(InterruptedException e){ + } catch (InterruptedException e) { throw new InterruptedIOException(); } - if(log.isDebugEnabled()){ - log.debug("flush finished"); + if (LOG.isDebugEnabled()) { + LOG.debug("flush finished"); } } @@ -273,19 +281,19 @@ * @return * @throws IOException */ - void asyncWrite(){ - try{ + void asyncWrite() { + try { CountDownLatch countDown; - synchronized(this){ - countDown=flushLatch; - flushLatch=null; + synchronized (this) { + countDown = flushLatch; + flushLatch = null; } mark.set(doAsyncWrite()); - if(countDown!=null){ + if (countDown != null) { countDown.countDown(); } - }catch(IOException e){ - log.error("Checkpoint failed: "+e,e); + } catch (IOException e) { + LOG.error("Checkpoint failed: " + e, e); } } @@ -293,67 +301,67 @@ * @return * @throws IOException */ - protected Location doAsyncWrite() throws IOException{ + protected Location doAsyncWrite() throws IOException { final ArrayList cpRemovedMessageLocations; final ArrayList cpActiveJournalLocations; - final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize(); + final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); final Location lastLocation; // swap out the message hash maps.. - synchronized(this){ - cpAddedMessageIds=this.messages; - cpRemovedMessageLocations=this.messageAcks; - cpActiveJournalLocations=new ArrayList(inFlightTxLocations); - this.messages=new LinkedHashMap(); - this.messageAcks=new ArrayList(); - lastLocation=this.lastLocation; - } - if(log.isDebugEnabled()) - log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: " - +cpRemovedMessageLocations.size()+" "); - transactionTemplate.run(new Callback(){ - - public void execute() throws Exception{ - int size=0; - PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter(); - ConnectionContext context=transactionTemplate.getContext(); + synchronized (this) { + cpAddedMessageIds = this.messages; + cpRemovedMessageLocations = this.messageAcks; + cpActiveJournalLocations = new ArrayList(inFlightTxLocations); + this.messages = new LinkedHashMap(); + this.messageAcks = new ArrayList(); + lastLocation = this.lastLocation; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " "); + } + transactionTemplate.run(new Callback() { + + public void execute() throws Exception { + int size = 0; + PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); + ConnectionContext context = transactionTemplate.getContext(); // Checkpoint the added messages. - Iterator> iterator=cpAddedMessageIds.entrySet().iterator(); - while(iterator.hasNext()){ - Entry entry=iterator.next(); - try{ - referenceStore.addMessageReference(context,entry.getKey(),entry.getValue()); - }catch(Throwable e){ - log.warn("Message could not be added to long term store: "+e.getMessage(),e); + Iterator> iterator = cpAddedMessageIds.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + try { + referenceStore.addMessageReference(context, entry.getKey(), entry.getValue()); + } catch (Throwable e) { + LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); } size++; // Commit the batch if it's getting too big - if(size>=maxCheckpointMessageAddSize){ + if (size >= maxCheckpointMessageAddSize) { persitanceAdapter.commitTransaction(context); persitanceAdapter.beginTransaction(context); - size=0; + size = 0; } } persitanceAdapter.commitTransaction(context); persitanceAdapter.beginTransaction(context); // Checkpoint the removed messages. - for(MessageAck ack:cpRemovedMessageLocations){ - try{ - referenceStore.removeMessage(transactionTemplate.getContext(),ack); - }catch(Throwable e){ - log.warn("Message could not be removed from long term store: "+e.getMessage(),e); + for (MessageAck ack : cpRemovedMessageLocations) { + try { + referenceStore.removeMessage(transactionTemplate.getContext(), ack); + } catch (Throwable e) { + LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e); } } } }); - log.debug("Batch update done."); - synchronized(this){ - cpAddedMessageIds=null; - lastWrittenLocation=lastLocation; + LOG.debug("Batch update done."); + synchronized (this) { + cpAddedMessageIds = null; + lastWrittenLocation = lastLocation; } - if(cpActiveJournalLocations.size()>0){ + if (cpActiveJournalLocations.size() > 0) { Collections.sort(cpActiveJournalLocations); return cpActiveJournalLocations.get(0); - }else{ + } else { return lastLocation; } } @@ -361,50 +369,50 @@ /** * */ - public Message getMessage(MessageId identity) throws IOException{ - ReferenceData data=null; - synchronized(this){ + public Message getMessage(MessageId identity) throws IOException { + ReferenceData data = null; + synchronized (this) { // Is it still in flight??? - data=messages.get(identity); - if(data==null&&cpAddedMessageIds!=null){ - data=cpAddedMessageIds.get(identity); + data = messages.get(identity); + if (data == null && cpAddedMessageIds != null) { + data = cpAddedMessageIds.get(identity); } } - if(data==null){ - data=referenceStore.getMessageReference(identity); - if(data==null){ + if (data == null) { + data = referenceStore.getMessageReference(identity); + if (data == null) { return null; } } - Location location=new Location(); + Location location = new Location(); location.setDataFileId(data.getFileId()); location.setOffset(data.getOffset()); - DataStructure rc=peristenceAdapter.readCommand(location); - try{ + DataStructure rc = peristenceAdapter.readCommand(location); + try { return (Message)rc; - }catch(ClassCastException e){ - throw new IOException("Could not read message "+identity+" at location "+location - +", expected a message, but got: "+rc); + } catch (ClassCastException e) { + throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc); } } /** - * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the - * transaction log and then the cache is updated. + * Replays the referenceStore first as those messages are the oldest ones, + * then messages are replayed from the transaction log and then the cache is + * updated. * * @param listener * @throws Exception */ - public void recover(final MessageRecoveryListener listener) throws Exception{ + public void recover(final MessageRecoveryListener listener) throws Exception { flush(); - referenceStore.recover(new RecoveryListenerAdapter(this,listener)); + referenceStore.recover(new RecoveryListenerAdapter(this, listener)); } - public void start() throws Exception{ + public void start() throws Exception { referenceStore.start(); } - public void stop() throws Exception{ + public void stop() throws Exception { flush(); asyncWriteTask.shutdown(); referenceStore.stop(); @@ -413,28 +421,27 @@ /** * @return Returns the longTermStore. */ - public ReferenceStore getReferenceStore(){ + public ReferenceStore getReferenceStore() { return referenceStore; } /** * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) */ - public void removeAllMessages(ConnectionContext context) throws IOException{ + public void removeAllMessages(ConnectionContext context) throws IOException { flush(); referenceStore.removeAllMessages(context); } - public ActiveMQDestination getDestination(){ + public ActiveMQDestination getDestination() { return destination; } - public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) - throws IOException{ + public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { throw new IOException("The journal does not support message references."); } - public String getMessageReference(MessageId identity) throws IOException{ + public String getMessageReference(MessageId identity) throws IOException { throw new IOException("The journal does not support message references."); } @@ -443,61 +450,54 @@ * @throws IOException * @see org.apache.activemq.store.MessageStore#getMessageCount() */ - public int getMessageCount() throws IOException{ + public int getMessageCount() throws IOException { flush(); return referenceStore.getMessageCount(); } - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { /* - RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); - if(referenceStore.supportsExternalBatchControl()){ - synchronized(this){ - referenceStore.recoverNextMessages(maxReturned,recoveryListener); - if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ - // check for inflight messages - int count=0; - Iterator> iterator=messages.entrySet().iterator(); - while(iterator.hasNext()&&count entry=iterator.next(); - ReferenceData data=entry.getValue(); - Message message=getMessage(data); - recoveryListener.recoverMessage(message); - count++; - } - referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); - } - } - }else{ - flush(); - referenceStore.recoverNextMessages(maxReturned,recoveryListener); - } - */ - RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); - referenceStore.recoverNextMessages(maxReturned,recoveryListener); - if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ + * RecoveryListenerAdapter recoveryListener=new + * RecoveryListenerAdapter(this,listener); + * if(referenceStore.supportsExternalBatchControl()){ + * synchronized(this){ + * referenceStore.recoverNextMessages(maxReturned,recoveryListener); + * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check + * for inflight messages int count=0; Iterator> + * iterator=messages.entrySet().iterator(); + * while(iterator.hasNext()&&count entry=iterator.next(); ReferenceData + * data=entry.getValue(); Message message=getMessage(data); + * recoveryListener.recoverMessage(message); count++; } + * referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } } + * }else{ flush(); + * referenceStore.recoverNextMessages(maxReturned,recoveryListener); } + */ + RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); + referenceStore.recoverNextMessages(maxReturned, recoveryListener); + if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { flush(); - referenceStore.recoverNextMessages(maxReturned,recoveryListener); + referenceStore.recoverNextMessages(maxReturned, recoveryListener); } } - Message getMessage(ReferenceData data) throws IOException{ - Location location=new Location(); + Message getMessage(ReferenceData data) throws IOException { + Location location = new Location(); location.setDataFileId(data.getFileId()); location.setOffset(data.getOffset()); - DataStructure rc=peristenceAdapter.readCommand(location); - try{ + DataStructure rc = peristenceAdapter.readCommand(location); + try { return (Message)rc; - }catch(ClassCastException e){ - throw new IOException("Could not read message at location "+location+", expected a message, but got: "+rc); + } catch (ClassCastException e) { + throw new IOException("Could not read message at location " + location + ", expected a message, but got: " + rc); } } - public void resetBatching(){ + public void resetBatching() { referenceStore.resetBatching(); } - public Location getMark(){ + public Location getMark() { return mark.get(); } -} \ No newline at end of file +} Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Aug 8 11:56:59 2007 @@ -22,10 +22,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activeio.journal.Journal; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -35,7 +36,6 @@ import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.memory.UsageListener; @@ -62,46 +62,46 @@ import org.apache.commons.logging.LogFactory; /** - * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing - * asynchronously on a timeout with some other long term persistent storage. + * An implementation of {@link PersistenceAdapter} designed for use with a + * {@link Journal} and then check pointing asynchronously on a timeout with some + * other long term persistent storage. * * @org.apache.xbean.XBean element="amqPersistenceAdapter" - * * @version $Revision: 1.17 $ */ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { - private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class); - private final ConcurrentHashMap queues=new ConcurrentHashMap(); - private final ConcurrentHashMap topics=new ConcurrentHashMap(); + private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class); + private final ConcurrentHashMap queues = new ConcurrentHashMap(); + private final ConcurrentHashMap topics = new ConcurrentHashMap(); private AsyncDataManager asyncDataManager; private ReferenceStoreAdapter referenceStoreAdapter; private TaskRunnerFactory taskRunnerFactory; - private WireFormat wireFormat=new OpenWireFormat(); + private WireFormat wireFormat = new OpenWireFormat(); private UsageManager usageManager; - private long cleanupInterval=1000*60; - private long checkpointInterval=1000*10; - private int maxCheckpointWorkers=1; - private int maxCheckpointMessageAddSize=1024*4; - private AMQTransactionStore transactionStore=new AMQTransactionStore(this); + private long cleanupInterval = 1000 * 60; + private long checkpointInterval = 1000 * 10; + private int maxCheckpointWorkers = 1; + private int maxCheckpointMessageAddSize = 1024 * 4; + private AMQTransactionStore transactionStore = new AMQTransactionStore(this); private TaskRunner checkpointTask; - private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1); - private final AtomicBoolean started=new AtomicBoolean(false); + private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); + private final AtomicBoolean started = new AtomicBoolean(false); private Runnable periodicCheckpointTask; private Runnable periodicCleanupTask; private boolean deleteAllMessages; private boolean syncOnWrite; - private String brokerName=""; + private String brokerName = ""; private File directory; private BrokerService brokerService; - public String getBrokerName(){ + public String getBrokerName() { return this.brokerName; } - public void setBrokerName(String brokerName){ - this.brokerName=brokerName; - if(this.referenceStoreAdapter!=null){ + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + if (this.referenceStoreAdapter != null) { this.referenceStoreAdapter.setBrokerName(brokerName); } } @@ -114,165 +114,170 @@ this.brokerService = brokerService; } - public synchronized void start() throws Exception{ - if(!started.compareAndSet(false,true)) + public synchronized void start() throws Exception { + if (!started.compareAndSet(false, true)) { return; - if(this.directory==null) { + } + if (this.directory == null) { if (brokerService != null) { this.directory = brokerService.getBrokerDataDirectory(); - } - else { - this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName); - this.directory=new File(directory,"amqstore"); + } else { + this.directory = new File(IOHelper.getDefaultDataDirectory(), brokerName); + this.directory = new File(directory, "amqstore"); } } log.info("AMQStore starting using directory: " + directory); this.directory.mkdirs(); - if(this.usageManager!=null){ + if (this.usageManager != null) { this.usageManager.addUsageListener(this); } - if(asyncDataManager==null){ - asyncDataManager=createAsyncDataManager(); + if (asyncDataManager == null) { + asyncDataManager = createAsyncDataManager(); } - if(referenceStoreAdapter==null){ - referenceStoreAdapter=createReferenceStoreAdapter(); + if (referenceStoreAdapter == null) { + referenceStoreAdapter = createReferenceStoreAdapter(); } - referenceStoreAdapter.setDirectory(new File(directory,"kr-store")); + referenceStoreAdapter.setDirectory(new File(directory, "kr-store")); referenceStoreAdapter.setBrokerName(getBrokerName()); referenceStoreAdapter.setUsageManager(usageManager); - if(taskRunnerFactory==null){ - taskRunnerFactory=createTaskRunnerFactory(); + if (taskRunnerFactory == null) { + taskRunnerFactory = createTaskRunnerFactory(); } asyncDataManager.start(); - if(deleteAllMessages){ + if (deleteAllMessages) { asyncDataManager.delete(); - try{ - JournalTrace trace=new JournalTrace(); - trace.setMessage("DELETED "+new Date()); - Location location=asyncDataManager.write(wireFormat.marshal(trace),false); - asyncDataManager.setMark(location,true); + try { + JournalTrace trace = new JournalTrace(); + trace.setMessage("DELETED " + new Date()); + Location location = asyncDataManager.write(wireFormat.marshal(trace), false); + asyncDataManager.setMark(location, true); log.info("Journal deleted: "); - deleteAllMessages=false; - }catch(IOException e){ + deleteAllMessages = false; + } catch (IOException e) { throw e; - }catch(Throwable e){ + } catch (Throwable e) { throw IOExceptionSupport.create(e); } referenceStoreAdapter.deleteAllMessages(); } referenceStoreAdapter.start(); - Set files=referenceStoreAdapter.getReferenceFileIdsInUse(); - log.info("Active data files: "+files); - checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){ + Set files = referenceStoreAdapter.getReferenceFileIdsInUse(); + log.info("Active data files: " + files); + checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { - public boolean iterate(){ + public boolean iterate() { doCheckpoint(); return false; } - },"ActiveMQ Journal Checkpoint Worker"); + }, "ActiveMQ Journal Checkpoint Worker"); createTransactionStore(); - -// -// The following was attempting to reduce startup times by avoiding the log -// file scanning that recovery performs. The problem with it is that XA transactions -// only live in transaction log and are not stored in the reference store, but they still -// need to be recovered when the broker starts up. - - if(referenceStoreAdapter.isStoreValid()==false){ + + // + // The following was attempting to reduce startup times by avoiding the + // log + // file scanning that recovery performs. The problem with it is that XA + // transactions + // only live in transaction log and are not stored in the reference + // store, but they still + // need to be recovered when the broker starts up. + + if (referenceStoreAdapter.isStoreValid() == false) { log.warn("The ReferenceStore is not valid - recovering ..."); recover(); log.info("Finished recovering the ReferenceStore"); - }else { - Location location=writeTraceMessage("RECOVERED "+new Date(),true); - asyncDataManager.setMark(location,true); - //recover transactions + } else { + Location location = writeTraceMessage("RECOVERED " + new Date(), true); + asyncDataManager.setMark(location, true); + // recover transactions getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState()); - } - - + } + // Do a checkpoint periodically. - periodicCheckpointTask=new Runnable(){ + periodicCheckpointTask = new Runnable() { - public void run(){ + public void run() { checkpoint(false); } }; - Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval); - periodicCleanupTask=new Runnable(){ + Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval); + periodicCleanupTask = new Runnable() { - public void run(){ + public void run() { cleanup(); } }; - Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval); + Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval); } - public void stop() throws Exception{ - - if(!started.compareAndSet(true,false)) + public void stop() throws Exception { + + if (!started.compareAndSet(true, false)) { return; + } this.usageManager.removeUsageListener(this); - synchronized(this){ + synchronized (this) { Scheduler.cancel(periodicCheckpointTask); Scheduler.cancel(periodicCleanupTask); } - Iterator iterator=queues.values().iterator(); - while(iterator.hasNext()){ - AMQMessageStore ms=iterator.next(); + Iterator iterator = queues.values().iterator(); + while (iterator.hasNext()) { + AMQMessageStore ms = iterator.next(); ms.stop(); } - iterator=topics.values().iterator(); - while(iterator.hasNext()){ - final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); + iterator = topics.values().iterator(); + while (iterator.hasNext()) { + final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next(); ms.stop(); } // Take one final checkpoint and stop checkpoint processing. checkpoint(true); - synchronized(this){ + synchronized (this) { checkpointTask.shutdown(); } referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions()); queues.clear(); topics.clear(); - IOException firstException=null; + IOException firstException = null; referenceStoreAdapter.stop(); - try{ + try { log.debug("Journal close"); asyncDataManager.close(); - }catch(Exception e){ - firstException=IOExceptionSupport.create("Failed to close journals: "+e,e); + } catch (Exception e) { + firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); } - if(firstException!=null){ + if (firstException != null) { throw firstException; } } /** * When we checkpoint we move all the journalled data to long term storage. - * @param sync + * + * @param sync */ - public void checkpoint(boolean sync){ - try{ - if(asyncDataManager==null) + public void checkpoint(boolean sync) { + try { + if (asyncDataManager == null) { throw new IllegalStateException("Journal is closed."); - CountDownLatch latch=null; - synchronized(this){ - latch=nextCheckpointCountDownLatch; + } + CountDownLatch latch = null; + synchronized (this) { + latch = nextCheckpointCountDownLatch; checkpointTask.wakeup(); } - if(sync){ - if(log.isDebugEnabled()){ + if (sync) { + if (log.isDebugEnabled()) { log.debug("Waitng for checkpoint to complete."); } latch.await(); } referenceStoreAdapter.checkpoint(sync); - }catch(InterruptedException e){ + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.warn("Request to start checkpoint failed: "+e,e); - }catch(IOException e){ - log.error("checkpoint failed: "+e,e); + log.warn("Request to start checkpoint failed: " + e, e); + } catch (IOException e) { + log.error("checkpoint failed: " + e, e); } } @@ -281,49 +286,49 @@ * * @return true if successful */ - public boolean doCheckpoint(){ - CountDownLatch latch=null; - synchronized(this){ - latch=nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch=new CountDownLatch(1); + public boolean doCheckpoint() { + CountDownLatch latch = null; + synchronized (this) { + latch = nextCheckpointCountDownLatch; + nextCheckpointCountDownLatch = new CountDownLatch(1); } - try{ - if(log.isDebugEnabled()){ + try { + if (log.isDebugEnabled()) { log.debug("Checkpoint started."); } - - Location newMark=null; - Iterator iterator=queues.values().iterator(); - while(iterator.hasNext()){ - final AMQMessageStore ms=iterator.next(); - Location mark=(Location)ms.getMark(); - if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ - newMark=mark; + + Location newMark = null; + Iterator iterator = queues.values().iterator(); + while (iterator.hasNext()) { + final AMQMessageStore ms = iterator.next(); + Location mark = (Location)ms.getMark(); + if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + newMark = mark; } } - iterator=topics.values().iterator(); - while(iterator.hasNext()){ - final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); - Location mark=(Location)ms.getMark(); - if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ - newMark=mark; + iterator = topics.values().iterator(); + while (iterator.hasNext()) { + final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next(); + Location mark = (Location)ms.getMark(); + if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + newMark = mark; } } - try{ - if(newMark!=null){ - if(log.isDebugEnabled()){ - log.debug("Marking journal at: "+newMark); + try { + if (newMark != null) { + if (log.isDebugEnabled()) { + log.debug("Marking journal at: " + newMark); } - asyncDataManager.setMark(newMark,false); - writeTraceMessage("CHECKPOINT "+new Date(),true); + asyncDataManager.setMark(newMark, false); + writeTraceMessage("CHECKPOINT " + new Date(), true); } - }catch(Exception e){ - log.error("Failed to mark the Journal: "+e,e); + } catch (Exception e) { + log.error("Failed to mark the Journal: " + e, e); } - if(log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Checkpoint done."); } - }finally{ + } finally { latch.countDown(); } return true; @@ -335,79 +340,77 @@ * @return * @throws IOException */ - public void cleanup(){ - try{ - Set inUse=referenceStoreAdapter.getReferenceFileIdsInUse(); + public void cleanup() { + try { + Set inUse = referenceStoreAdapter.getReferenceFileIdsInUse(); asyncDataManager.consolidateDataFilesNotIn(inUse); - }catch(IOException e){ - log.error("Could not cleanup data files: "+e,e); + } catch (IOException e) { + log.error("Could not cleanup data files: " + e, e); } } - public Set getDestinations(){ - Set destinations=new HashSet(referenceStoreAdapter.getDestinations()); + public Set getDestinations() { + Set destinations = new HashSet(referenceStoreAdapter.getDestinations()); destinations.addAll(queues.keySet()); destinations.addAll(topics.keySet()); return destinations; } - MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{ - if(destination.isQueue()){ + MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { + if (destination.isQueue()) { return createQueueMessageStore((ActiveMQQueue)destination); - }else{ + } else { return createTopicMessageStore((ActiveMQTopic)destination); } } - - - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ - AMQMessageStore store=queues.get(destination); - if(store==null){ - ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination); - store=new AMQMessageStore(this,checkpointStore,destination); - try{ + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { + AMQMessageStore store = queues.get(destination); + if (store == null) { + ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); + store = new AMQMessageStore(this, checkpointStore, destination); + try { store.start(); - }catch(Exception e){ + } catch (Exception e) { throw IOExceptionSupport.create(e); } - queues.put(destination,store); + queues.put(destination, store); } return store; } - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{ - AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName); - if(store==null){ - TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName); - store=new AMQTopicMessageStore(this,checkpointStore,destinationName); - try{ + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { + AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName); + if (store == null) { + TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); + store = new AMQTopicMessageStore(this, checkpointStore, destinationName); + try { store.start(); - }catch(Exception e){ + } catch (Exception e) { throw IOExceptionSupport.create(e); } - topics.put(destinationName,store); + topics.put(destinationName, store); } return store; } - public TransactionStore createTransactionStore() throws IOException{ + public TransactionStore createTransactionStore() throws IOException { return transactionStore; } - public long getLastMessageBrokerSequenceId() throws IOException{ + public long getLastMessageBrokerSequenceId() throws IOException { return referenceStoreAdapter.getLastMessageBrokerSequenceId(); } - public void beginTransaction(ConnectionContext context) throws IOException{ + public void beginTransaction(ConnectionContext context) throws IOException { referenceStoreAdapter.beginTransaction(context); } - public void commitTransaction(ConnectionContext context) throws IOException{ + public void commitTransaction(ConnectionContext context) throws IOException { referenceStoreAdapter.commitTransaction(context); } - public void rollbackTransaction(ConnectionContext context) throws IOException{ + public void rollbackTransaction(ConnectionContext context) throws IOException { referenceStoreAdapter.rollbackTransaction(context); } @@ -416,91 +419,93 @@ * @return * @throws IOException */ - public DataStructure readCommand(Location location) throws IOException{ - try{ - ByteSequence packet=asyncDataManager.read(location); + public DataStructure readCommand(Location location) throws IOException { + try { + ByteSequence packet = asyncDataManager.read(location); return (DataStructure)wireFormat.unmarshal(packet); - }catch(IOException e){ - throw createReadException(location,e); + } catch (IOException e) { + throw createReadException(location, e); } } /** - * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint. + * Move all the messages that were in the journal into long term storage. We + * just replay and do a checkpoint. * * @throws IOException * @throws IOException * @throws InvalidLocationException * @throws IllegalStateException */ - private void recover() throws IllegalStateException,IOException{ + private void recover() throws IllegalStateException, IOException { referenceStoreAdapter.clearMessages(); referenceStoreAdapter.recoverState(); - Location pos=null; - int redoCounter=0; - log.info("Journal Recovery Started from: "+asyncDataManager); - long start=System.currentTimeMillis(); - ConnectionContext context=new ConnectionContext(); + Location pos = null; + int redoCounter = 0; + log.info("Journal Recovery Started from: " + asyncDataManager); + long start = System.currentTimeMillis(); + ConnectionContext context = new ConnectionContext(); // While we have records in the journal. - while((pos=asyncDataManager.getNextLocation(pos))!=null){ - ByteSequence data=asyncDataManager.read(pos); - DataStructure c=(DataStructure)wireFormat.unmarshal(data); - if(c instanceof Message){ - Message message=(Message)c; - AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination()); - if(message.isInTransaction()){ - transactionStore.addMessage(store,message,pos); - }else{ - if(store.replayAddMessage(context,message,pos)){ + while ((pos = asyncDataManager.getNextLocation(pos)) != null) { + ByteSequence data = asyncDataManager.read(pos); + DataStructure c = (DataStructure)wireFormat.unmarshal(data); + if (c instanceof Message) { + Message message = (Message)c; + AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination()); + if (message.isInTransaction()) { + transactionStore.addMessage(store, message, pos); + } else { + if (store.replayAddMessage(context, message, pos)) { redoCounter++; } } - }else{ - switch(c.getDataStructureType()){ + } else { + switch (c.getDataStructureType()) { case JournalQueueAck.DATA_STRUCTURE_TYPE: { - JournalQueueAck command=(JournalQueueAck)c; - AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination()); - if(command.getMessageAck().isInTransaction()){ - transactionStore.removeMessage(store,command.getMessageAck(),pos); - }else{ - if(store.replayRemoveMessage(context,command.getMessageAck())){ + JournalQueueAck command = (JournalQueueAck)c; + AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination()); + if (command.getMessageAck().isInTransaction()) { + transactionStore.removeMessage(store, command.getMessageAck(), pos); + } else { + if (store.replayRemoveMessage(context, command.getMessageAck())) { redoCounter++; } } } break; case JournalTopicAck.DATA_STRUCTURE_TYPE: { - JournalTopicAck command=(JournalTopicAck)c; - AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination()); - if(command.getTransactionId()!=null){ - transactionStore.acknowledge(store,command,pos); - }else{ - if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command - .getMessageId())){ + JournalTopicAck command = (JournalTopicAck)c; + AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination()); + if (command.getTransactionId() != null) { + transactionStore.acknowledge(store, command, pos); + } else { + if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) { redoCounter++; } } } break; case JournalTransaction.DATA_STRUCTURE_TYPE: { - JournalTransaction command=(JournalTransaction)c; - try{ + JournalTransaction command = (JournalTransaction)c; + try { // Try to replay the packet. - switch(command.getType()){ + switch (command.getType()) { case JournalTransaction.XA_PREPARE: transactionStore.replayPrepare(command.getTransactionId()); break; case JournalTransaction.XA_COMMIT: case JournalTransaction.LOCAL_COMMIT: - AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared()); - if(tx==null) - break; // We may be trying to replay a commit that + AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); + if (tx == null) { + break; // We may be trying to replay a commit + } + // that // was already committed. // Replay the committed operations. tx.getOperations(); - for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){ - AMQTxOperation op=(AMQTxOperation)iter.next(); - if (op.replay(this,context)) { + for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { + AMQTxOperation op = (AMQTxOperation)iter.next(); + if (op.replay(this, context)) { redoCounter++; } } @@ -510,176 +515,174 @@ transactionStore.replayRollback(command.getTransactionId()); break; } - }catch(IOException e){ - log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e); + } catch (IOException e) { + log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); } } break; case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace=(JournalTrace)c; - log.debug("TRACE Entry: "+trace.getMessage()); + JournalTrace trace = (JournalTrace)c; + log.debug("TRACE Entry: " + trace.getMessage()); break; default: - log.error("Unknown type of record in transaction log which will be discarded: "+c); + log.error("Unknown type of record in transaction log which will be discarded: " + c); } } } - Location location=writeTraceMessage("RECOVERED "+new Date(),true); - asyncDataManager.setMark(location,true); - long end=System.currentTimeMillis(); - log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds."); + Location location = writeTraceMessage("RECOVERED " + new Date(), true); + asyncDataManager.setMark(location, true); + long end = System.currentTimeMillis(); + log.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); } - private IOException createReadException(Location location,Exception e){ - return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e); + private IOException createReadException(Location location, Exception e) { + return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); } - protected IOException createWriteException(DataStructure packet,Exception e){ - return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e); + protected IOException createWriteException(DataStructure packet, Exception e) { + return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); } - protected IOException createWriteException(String command,Exception e){ - return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e); + protected IOException createWriteException(String command, Exception e) { + return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); } - protected IOException createRecoveryFailedException(Exception e){ - return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e); + protected IOException createRecoveryFailedException(Exception e) { + return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); } /** - * * @param command * @param syncHint * @return * @throws IOException */ - public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{ - return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite)); + public Location writeCommand(DataStructure command, boolean syncHint) throws IOException { + return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite)); } - private Location writeTraceMessage(String message,boolean sync) throws IOException{ - JournalTrace trace=new JournalTrace(); + private Location writeTraceMessage(String message, boolean sync) throws IOException { + JournalTrace trace = new JournalTrace(); trace.setMessage(message); - return writeCommand(trace,sync); + return writeCommand(trace, sync); } - public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ - newPercentUsage=((newPercentUsage)/10)*10; - oldPercentUsage=((oldPercentUsage)/10)*10; - if(newPercentUsage>=70&&oldPercentUsage= 70 && oldPercentUsage < newPercentUsage) { checkpoint(false); } } - public AMQTransactionStore getTransactionStore(){ + public AMQTransactionStore getTransactionStore() { return transactionStore; } - public synchronized void deleteAllMessages() throws IOException{ - deleteAllMessages=true; + public synchronized void deleteAllMessages() throws IOException { + deleteAllMessages = true; } - public String toString(){ - return "AMQPersistenceAdapter("+directory+")"; + public String toString() { + return "AMQPersistenceAdapter(" + directory + ")"; } // ///////////////////////////////////////////////////////////////// // Subclass overridables // ///////////////////////////////////////////////////////////////// - protected AsyncDataManager createAsyncDataManager(){ - AsyncDataManager manager=new AsyncDataManager(); - manager.setDirectory(new File(directory,"journal")); + protected AsyncDataManager createAsyncDataManager() { + AsyncDataManager manager = new AsyncDataManager(); + manager.setDirectory(new File(directory, "journal")); return manager; } - protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException{ - KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter(); + protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { + KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(); return adaptor; } - protected TaskRunnerFactory createTaskRunnerFactory(){ + protected TaskRunnerFactory createTaskRunnerFactory() { return DefaultThreadPools.getDefaultTaskRunnerFactory(); } // ///////////////////////////////////////////////////////////////// // Property Accessors // ///////////////////////////////////////////////////////////////// - public AsyncDataManager getAsyncDataManager(){ + public AsyncDataManager getAsyncDataManager() { return asyncDataManager; } - public void setAsyncDataManager(AsyncDataManager asyncDataManager){ - this.asyncDataManager=asyncDataManager; + public void setAsyncDataManager(AsyncDataManager asyncDataManager) { + this.asyncDataManager = asyncDataManager; } - public ReferenceStoreAdapter getReferenceStoreAdapter(){ + public ReferenceStoreAdapter getReferenceStoreAdapter() { return referenceStoreAdapter; } - public TaskRunnerFactory getTaskRunnerFactory(){ + public TaskRunnerFactory getTaskRunnerFactory() { return taskRunnerFactory; } - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){ - this.taskRunnerFactory=taskRunnerFactory; + public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { + this.taskRunnerFactory = taskRunnerFactory; } /** * @return Returns the wireFormat. */ - public WireFormat getWireFormat(){ + public WireFormat getWireFormat() { return wireFormat; } - public void setWireFormat(WireFormat wireFormat){ - this.wireFormat=wireFormat; + public void setWireFormat(WireFormat wireFormat) { + this.wireFormat = wireFormat; } - public UsageManager getUsageManager(){ + public UsageManager getUsageManager() { return usageManager; } - public void setUsageManager(UsageManager usageManager){ - this.usageManager=usageManager; + public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; } - public int getMaxCheckpointMessageAddSize(){ + public int getMaxCheckpointMessageAddSize() { return maxCheckpointMessageAddSize; } - public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){ - this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize; + public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { + this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; } - public int getMaxCheckpointWorkers(){ + public int getMaxCheckpointWorkers() { return maxCheckpointWorkers; } - public void setMaxCheckpointWorkers(int maxCheckpointWorkers){ - this.maxCheckpointWorkers=maxCheckpointWorkers; + public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { + this.maxCheckpointWorkers = maxCheckpointWorkers; } - public synchronized File getDirectory(){ + public synchronized File getDirectory() { return directory; } - public synchronized void setDirectory(File directory){ - this.directory=directory; + public synchronized void setDirectory(File directory) { + this.directory = directory; } - public boolean isSyncOnWrite(){ + public boolean isSyncOnWrite() { return this.syncOnWrite; } - public void setSyncOnWrite(boolean syncOnWrite){ - this.syncOnWrite=syncOnWrite; + public void setSyncOnWrite(boolean syncOnWrite) { + this.syncOnWrite = syncOnWrite; } - /** * @param referenceStoreAdapter the referenceStoreAdapter to set */ - public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){ - this.referenceStoreAdapter=referenceStoreAdapter; + public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { + this.referenceStoreAdapter = referenceStoreAdapter; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Wed Aug 8 11:56:59 2007 @@ -28,106 +28,101 @@ * * @version $Revision: 1.17 $ */ -public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{ +public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { private TaskRunnerFactory taskRunnerFactory; private File dataDirectory; private int journalThreadPriority = Thread.MAX_PRIORITY; - private String brokerName="localhost"; + private String brokerName = "localhost"; private ReferenceStoreAdapter referenceStoreAdapter; - + /** * @return a AMQPersistenceAdapter * @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter() */ - public PersistenceAdapter createPersistenceAdapter(){ - AMQPersistenceAdapter result = new AMQPersistenceAdapter(); + public PersistenceAdapter createPersistenceAdapter() { + AMQPersistenceAdapter result = new AMQPersistenceAdapter(); result.setDirectory(getDataDirectory()); result.setTaskRunnerFactory(getTaskRunnerFactory()); result.setBrokerName(getBrokerName()); result.setReferenceStoreAdapter(getReferenceStoreAdapter()); return result; } - + /** * @return the dataDirectory */ - public File getDataDirectory(){ - if(this.dataDirectory==null){ - this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName); + public File getDataDirectory() { + if (this.dataDirectory == null) { + this.dataDirectory = new File(IOHelper.getDefaultDataDirectory(), brokerName); } return this.dataDirectory; } - + /** * @param dataDirectory the dataDirectory to set */ - public void setDataDirectory(File dataDirectory){ - this.dataDirectory=dataDirectory; + public void setDataDirectory(File dataDirectory) { + this.dataDirectory = dataDirectory; } - + /** * @return the taskRunnerFactory */ - public TaskRunnerFactory getTaskRunnerFactory(){ - if( taskRunnerFactory == null ) { - taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000); + public TaskRunnerFactory getTaskRunnerFactory() { + if (taskRunnerFactory == null) { + taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, + true, 1000); } return taskRunnerFactory; } - + /** * @param taskRunnerFactory the taskRunnerFactory to set */ - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){ - this.taskRunnerFactory=taskRunnerFactory; + public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { + this.taskRunnerFactory = taskRunnerFactory; } - /** * @return the journalThreadPriority */ - public int getJournalThreadPriority(){ + public int getJournalThreadPriority() { return this.journalThreadPriority; } - /** * @param journalThreadPriority the journalThreadPriority to set */ - public void setJournalThreadPriority(int journalThreadPriority){ - this.journalThreadPriority=journalThreadPriority; + public void setJournalThreadPriority(int journalThreadPriority) { + this.journalThreadPriority = journalThreadPriority; } - /** * @return the brokerName */ - public String getBrokerName(){ + public String getBrokerName() { return this.brokerName; } - /** * @param brokerName the brokerName to set */ - public void setBrokerName(String brokerName){ - this.brokerName=brokerName; + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; } - /** * @return the referenceStoreAdapter */ - public ReferenceStoreAdapter getReferenceStoreAdapter(){ + public ReferenceStoreAdapter getReferenceStoreAdapter() { return this.referenceStoreAdapter; } - /** * @param referenceStoreAdapter the referenceStoreAdapter to set */ - public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){ - this.referenceStoreAdapter=referenceStoreAdapter; + public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { + this.referenceStoreAdapter = referenceStoreAdapter; } }