Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Aug 8 11:56:59 2007
@@ -24,6 +24,7 @@
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
@@ -51,78 +52,82 @@
*
* @version $Revision: 1.14 $
*/
-public class AMQMessageStore implements MessageStore{
+public class AMQMessageStore implements MessageStore {
- private static final Log log=LogFactory.getLog(AMQMessageStore.class);
+ private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
protected final AMQPersistenceAdapter peristenceAdapter;
protected final AMQTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
- private LinkedHashMap<MessageId,ReferenceData> messages=new LinkedHashMap<MessageId,ReferenceData>();
- private ArrayList<MessageAck> messageAcks=new ArrayList<MessageAck>();
+ private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
+ private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
- private LinkedHashMap<MessageId,ReferenceData> cpAddedMessageIds;
+ private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
protected Location lastLocation;
protected Location lastWrittenLocation;
- protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
+ protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
- private final boolean debug=log.isDebugEnabled();
- private final AtomicReference<Location> mark=new AtomicReference<Location>();
-
- public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
- this.peristenceAdapter=adapter;
- this.transactionStore=adapter.getTransactionStore();
- this.referenceStore=referenceStore;
- this.destination=destination;
- this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext());
- asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
+ private final boolean debug = LOG.isDebugEnabled();
+ private final AtomicReference<Location> mark = new AtomicReference<Location>();
+
+ public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+ this.peristenceAdapter = adapter;
+ this.transactionStore = adapter.getTransactionStore();
+ this.referenceStore = referenceStore;
+ this.destination = destination;
+ this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+ asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
- public boolean iterate(){
+ public boolean iterate() {
asyncWrite();
return false;
}
- },"Checkpoint: "+destination);
+ }, "Checkpoint: " + destination);
}
- public void setUsageManager(UsageManager usageManager){
+ public void setUsageManager(UsageManager usageManager) {
referenceStore.setUsageManager(usageManager);
}
/**
- * Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it
- * is doing.
+ * Not synchronized since the Journal has better throughput if you increase
+ * the number of concurrent writes that it is doing.
*/
- public void addMessage(ConnectionContext context,final Message message) throws IOException{
- final MessageId id=message.getMessageId();
- final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
- if(!context.isInTransaction()){
- if(debug)
- log.debug("Journalled message add for: "+id+", at: "+location);
- addMessage(message,location);
- }else{
- if(debug)
- log.debug("Journalled transacted message add for: "+id+", at: "+location);
- synchronized(this){
+ public void addMessage(ConnectionContext context, final Message message) throws IOException {
+ final MessageId id = message.getMessageId();
+ final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+ if (!context.isInTransaction()) {
+ if (debug) {
+ LOG.debug("Journalled message add for: " + id + ", at: " + location);
+ }
+ addMessage(message, location);
+ } else {
+ if (debug) {
+ LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
+ }
+ synchronized (this) {
inFlightTxLocations.add(location);
}
- transactionStore.addMessage(this,message,location);
- context.getTransaction().addSynchronization(new Synchronization(){
+ transactionStore.addMessage(this, message, location);
+ context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception{
- if(debug)
- log.debug("Transacted message add commit for: "+id+", at: "+location);
- synchronized(AMQMessageStore.this){
+ public void afterCommit() throws Exception {
+ if (debug) {
+ LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
+ }
+ synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
- addMessage(message,location);
+ addMessage(message, location);
}
}
- public void afterRollback() throws Exception{
- if(debug)
- log.debug("Transacted message add rollback for: "+id+", at: "+location);
- synchronized(AMQMessageStore.this){
+ public void afterRollback() throws Exception {
+ if (debug) {
+ LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
+ }
+ synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
@@ -130,74 +135,78 @@
}
}
- void addMessage(final Message message,final Location location) throws InterruptedIOException{
- ReferenceData data=new ReferenceData();
+ void addMessage(final Message message, final Location location) throws InterruptedIOException {
+ ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
- synchronized(this){
- lastLocation=location;
- messages.put(message.getMessageId(),data);
+ synchronized (this) {
+ lastLocation = location;
+ messages.put(message.getMessageId(), data);
}
- try{
+ try {
asyncWriteTask.wakeup();
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
- public boolean replayAddMessage(ConnectionContext context,Message message,Location location){
- MessageId id=message.getMessageId();
- try{
+ public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
+ MessageId id = message.getMessageId();
+ try {
// Only add the message if it has not already been added.
- ReferenceData data=referenceStore.getMessageReference(id);
- if(data==null){
- data=new ReferenceData();
+ ReferenceData data = referenceStore.getMessageReference(id);
+ if (data == null) {
+ data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
- referenceStore.addMessageReference(context,id,data);
+ referenceStore.addMessageReference(context, id, data);
return true;
}
- }catch(Throwable e){
- log.warn("Could not replay add for message '"+id+"'. Message may have already been added. reason: "+e,e);
+ } catch (Throwable e) {
+ LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e, e);
}
return false;
}
/**
*/
- public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
- JournalQueueAck remove=new JournalQueueAck();
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
- final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired());
- if(!context.isInTransaction()){
- if(debug)
- log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
- removeMessage(ack,location);
- }else{
- if(debug)
- log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
- synchronized(this){
+ final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
+ if (!context.isInTransaction()) {
+ if (debug) {
+ LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
+ }
+ removeMessage(ack, location);
+ } else {
+ if (debug) {
+ LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
+ }
+ synchronized (this) {
inFlightTxLocations.add(location);
}
- transactionStore.removeMessage(this,ack,location);
- context.getTransaction().addSynchronization(new Synchronization(){
+ transactionStore.removeMessage(this, ack, location);
+ context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception{
- if(debug)
- log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
- synchronized(AMQMessageStore.this){
+ public void afterCommit() throws Exception {
+ if (debug) {
+ LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
+ }
+ synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
- removeMessage(ack,location);
+ removeMessage(ack, location);
}
}
- public void afterRollback() throws Exception{
- if(debug)
- log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
- synchronized(AMQMessageStore.this){
+ public void afterRollback() throws Exception {
+ if (debug) {
+ LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
+ }
+ synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
@@ -205,36 +214,35 @@
}
}
- final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
+ final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
ReferenceData data;
- synchronized(this){
- lastLocation=location;
- MessageId id=ack.getLastMessageId();
- data=messages.remove(id);
- if(data==null){
+ synchronized (this) {
+ lastLocation = location;
+ MessageId id = ack.getLastMessageId();
+ data = messages.remove(id);
+ if (data == null) {
messageAcks.add(ack);
}
}
- if(data==null){
- try{
+ if (data == null) {
+ try {
asyncWriteTask.wakeup();
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
- public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){
- try{
+ public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+ try {
// Only remove the message if it has not already been removed.
- ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId());
- if(t!=null){
- referenceStore.removeMessage(context,messageAck);
+ ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
+ if (t != null) {
+ referenceStore.removeMessage(context, messageAck);
return true;
}
- }catch(Throwable e){
- log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId()
- +"'. Message may have already been acknowledged. reason: "+e);
+ } catch (Throwable e) {
+ LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
@@ -244,28 +252,28 @@
*
* @throws InterruptedIOException
*/
- public void flush() throws InterruptedIOException{
- if(log.isDebugEnabled()){
- log.debug("flush starting ...");
+ public void flush() throws InterruptedIOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flush starting ...");
}
CountDownLatch countDown;
- synchronized(this){
- if(lastWrittenLocation==lastLocation){
+ synchronized (this) {
+ if (lastWrittenLocation == lastLocation) {
return;
}
- if(flushLatch==null){
- flushLatch=new CountDownLatch(1);
+ if (flushLatch == null) {
+ flushLatch = new CountDownLatch(1);
}
- countDown=flushLatch;
+ countDown = flushLatch;
}
- try{
+ try {
asyncWriteTask.wakeup();
countDown.await();
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
throw new InterruptedIOException();
}
- if(log.isDebugEnabled()){
- log.debug("flush finished");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flush finished");
}
}
@@ -273,19 +281,19 @@
* @return
* @throws IOException
*/
- void asyncWrite(){
- try{
+ void asyncWrite() {
+ try {
CountDownLatch countDown;
- synchronized(this){
- countDown=flushLatch;
- flushLatch=null;
+ synchronized (this) {
+ countDown = flushLatch;
+ flushLatch = null;
}
mark.set(doAsyncWrite());
- if(countDown!=null){
+ if (countDown != null) {
countDown.countDown();
}
- }catch(IOException e){
- log.error("Checkpoint failed: "+e,e);
+ } catch (IOException e) {
+ LOG.error("Checkpoint failed: " + e, e);
}
}
@@ -293,67 +301,67 @@
* @return
* @throws IOException
*/
- protected Location doAsyncWrite() throws IOException{
+ protected Location doAsyncWrite() throws IOException {
final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations;
- final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize();
+ final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
// swap out the message hash maps..
- synchronized(this){
- cpAddedMessageIds=this.messages;
- cpRemovedMessageLocations=this.messageAcks;
- cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
- this.messages=new LinkedHashMap<MessageId,ReferenceData>();
- this.messageAcks=new ArrayList<MessageAck>();
- lastLocation=this.lastLocation;
- }
- if(log.isDebugEnabled())
- log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "
- +cpRemovedMessageLocations.size()+" ");
- transactionTemplate.run(new Callback(){
-
- public void execute() throws Exception{
- int size=0;
- PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter();
- ConnectionContext context=transactionTemplate.getContext();
+ synchronized (this) {
+ cpAddedMessageIds = this.messages;
+ cpRemovedMessageLocations = this.messageAcks;
+ cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
+ this.messages = new LinkedHashMap<MessageId, ReferenceData>();
+ this.messageAcks = new ArrayList<MessageAck>();
+ lastLocation = this.lastLocation;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
+ }
+ transactionTemplate.run(new Callback() {
+
+ public void execute() throws Exception {
+ int size = 0;
+ PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
+ ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
- Iterator<Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator();
- while(iterator.hasNext()){
- Entry<MessageId,ReferenceData> entry=iterator.next();
- try{
- referenceStore.addMessageReference(context,entry.getKey(),entry.getValue());
- }catch(Throwable e){
- log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+ Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<MessageId, ReferenceData> entry = iterator.next();
+ try {
+ referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
+ } catch (Throwable e) {
+ LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
size++;
// Commit the batch if it's getting too big
- if(size>=maxCheckpointMessageAddSize){
+ if (size >= maxCheckpointMessageAddSize) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
- size=0;
+ size = 0;
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
- for(MessageAck ack:cpRemovedMessageLocations){
- try{
- referenceStore.removeMessage(transactionTemplate.getContext(),ack);
- }catch(Throwable e){
- log.warn("Message could not be removed from long term store: "+e.getMessage(),e);
+ for (MessageAck ack : cpRemovedMessageLocations) {
+ try {
+ referenceStore.removeMessage(transactionTemplate.getContext(), ack);
+ } catch (Throwable e) {
+ LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
}
});
- log.debug("Batch update done.");
- synchronized(this){
- cpAddedMessageIds=null;
- lastWrittenLocation=lastLocation;
+ LOG.debug("Batch update done.");
+ synchronized (this) {
+ cpAddedMessageIds = null;
+ lastWrittenLocation = lastLocation;
}
- if(cpActiveJournalLocations.size()>0){
+ if (cpActiveJournalLocations.size() > 0) {
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
- }else{
+ } else {
return lastLocation;
}
}
@@ -361,50 +369,50 @@
/**
*
*/
- public Message getMessage(MessageId identity) throws IOException{
- ReferenceData data=null;
- synchronized(this){
+ public Message getMessage(MessageId identity) throws IOException {
+ ReferenceData data = null;
+ synchronized (this) {
// Is it still in flight???
- data=messages.get(identity);
- if(data==null&&cpAddedMessageIds!=null){
- data=cpAddedMessageIds.get(identity);
+ data = messages.get(identity);
+ if (data == null && cpAddedMessageIds != null) {
+ data = cpAddedMessageIds.get(identity);
}
}
- if(data==null){
- data=referenceStore.getMessageReference(identity);
- if(data==null){
+ if (data == null) {
+ data = referenceStore.getMessageReference(identity);
+ if (data == null) {
return null;
}
}
- Location location=new Location();
+ Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
- DataStructure rc=peristenceAdapter.readCommand(location);
- try{
+ DataStructure rc = peristenceAdapter.readCommand(location);
+ try {
return (Message)rc;
- }catch(ClassCastException e){
- throw new IOException("Could not read message "+identity+" at location "+location
- +", expected a message, but got: "+rc);
+ } catch (ClassCastException e) {
+ throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
}
}
/**
- * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
- * transaction log and then the cache is updated.
+ * Replays the referenceStore first as those messages are the oldest ones,
+ * then messages are replayed from the transaction log and then the cache is
+ * updated.
*
* @param listener
* @throws Exception
*/
- public void recover(final MessageRecoveryListener listener) throws Exception{
+ public void recover(final MessageRecoveryListener listener) throws Exception {
flush();
- referenceStore.recover(new RecoveryListenerAdapter(this,listener));
+ referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
- public void start() throws Exception{
+ public void start() throws Exception {
referenceStore.start();
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
flush();
asyncWriteTask.shutdown();
referenceStore.stop();
@@ -413,28 +421,27 @@
/**
* @return Returns the longTermStore.
*/
- public ReferenceStore getReferenceStore(){
+ public ReferenceStore getReferenceStore() {
return referenceStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
- public void removeAllMessages(ConnectionContext context) throws IOException{
+ public void removeAllMessages(ConnectionContext context) throws IOException {
flush();
referenceStore.removeAllMessages(context);
}
- public ActiveMQDestination getDestination(){
+ public ActiveMQDestination getDestination() {
return destination;
}
- public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
- throws IOException{
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
- public String getMessageReference(MessageId identity) throws IOException{
+ public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references.");
}
@@ -443,61 +450,54 @@
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
- public int getMessageCount() throws IOException{
+ public int getMessageCount() throws IOException {
flush();
return referenceStore.getMessageCount();
}
- public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
/*
- RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
- if(referenceStore.supportsExternalBatchControl()){
- synchronized(this){
- referenceStore.recoverNextMessages(maxReturned,recoveryListener);
- if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
- // check for inflight messages
- int count=0;
- Iterator<Entry<MessageId,ReferenceData>> iterator=messages.entrySet().iterator();
- while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
- Entry<MessageId,ReferenceData> entry=iterator.next();
- ReferenceData data=entry.getValue();
- Message message=getMessage(data);
- recoveryListener.recoverMessage(message);
- count++;
- }
- referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId());
- }
- }
- }else{
- flush();
- referenceStore.recoverNextMessages(maxReturned,recoveryListener);
- }
- */
- RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
- referenceStore.recoverNextMessages(maxReturned,recoveryListener);
- if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
+ * RecoveryListenerAdapter recoveryListener=new
+ * RecoveryListenerAdapter(this,listener);
+ * if(referenceStore.supportsExternalBatchControl()){
+ * synchronized(this){
+ * referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+ * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check
+ * for inflight messages int count=0; Iterator<Entry<MessageId,ReferenceData>>
+ * iterator=messages.entrySet().iterator();
+ * while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
+ * Entry<MessageId,ReferenceData> entry=iterator.next(); ReferenceData
+ * data=entry.getValue(); Message message=getMessage(data);
+ * recoveryListener.recoverMessage(message); count++; }
+ * referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } }
+ * }else{ flush();
+ * referenceStore.recoverNextMessages(maxReturned,recoveryListener); }
+ */
+ RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
+ referenceStore.recoverNextMessages(maxReturned, recoveryListener);
+ if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush();
- referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+ referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
}
- Message getMessage(ReferenceData data) throws IOException{
- Location location=new Location();
+ Message getMessage(ReferenceData data) throws IOException {
+ Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
- DataStructure rc=peristenceAdapter.readCommand(location);
- try{
+ DataStructure rc = peristenceAdapter.readCommand(location);
+ try {
return (Message)rc;
- }catch(ClassCastException e){
- throw new IOException("Could not read message at location "+location+", expected a message, but got: "+rc);
+ } catch (ClassCastException e) {
+ throw new IOException("Could not read message at location " + location + ", expected a message, but got: " + rc);
}
}
- public void resetBatching(){
+ public void resetBatching() {
referenceStore.resetBatching();
}
- public Location getMark(){
+ public Location getMark() {
return mark.get();
}
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Aug 8 11:56:59 2007
@@ -22,10 +22,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activeio.journal.Journal;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -35,7 +36,6 @@
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageListener;
@@ -62,46 +62,46 @@
import org.apache.commons.logging.LogFactory;
/**
- * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
- * asynchronously on a timeout with some other long term persistent storage.
+ * An implementation of {@link PersistenceAdapter} designed for use with a
+ * {@link Journal} and then check pointing asynchronously on a timeout with some
+ * other long term persistent storage.
*
* @org.apache.xbean.XBean element="amqPersistenceAdapter"
- *
* @version $Revision: 1.17 $
*/
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
- private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class);
- private final ConcurrentHashMap<ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,AMQMessageStore>();
- private final ConcurrentHashMap<ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,AMQMessageStore>();
+ private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
+ private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
+ private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
- private WireFormat wireFormat=new OpenWireFormat();
+ private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
- private long cleanupInterval=1000*60;
- private long checkpointInterval=1000*10;
- private int maxCheckpointWorkers=1;
- private int maxCheckpointMessageAddSize=1024*4;
- private AMQTransactionStore transactionStore=new AMQTransactionStore(this);
+ private long cleanupInterval = 1000 * 60;
+ private long checkpointInterval = 1000 * 10;
+ private int maxCheckpointWorkers = 1;
+ private int maxCheckpointMessageAddSize = 1024 * 4;
+ private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
private TaskRunner checkpointTask;
- private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
- private final AtomicBoolean started=new AtomicBoolean(false);
+ private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private boolean syncOnWrite;
- private String brokerName="";
+ private String brokerName = "";
private File directory;
private BrokerService brokerService;
- public String getBrokerName(){
+ public String getBrokerName() {
return this.brokerName;
}
- public void setBrokerName(String brokerName){
- this.brokerName=brokerName;
- if(this.referenceStoreAdapter!=null){
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ if (this.referenceStoreAdapter != null) {
this.referenceStoreAdapter.setBrokerName(brokerName);
}
}
@@ -114,165 +114,170 @@
this.brokerService = brokerService;
}
- public synchronized void start() throws Exception{
- if(!started.compareAndSet(false,true))
+ public synchronized void start() throws Exception {
+ if (!started.compareAndSet(false, true)) {
return;
- if(this.directory==null) {
+ }
+ if (this.directory == null) {
if (brokerService != null) {
this.directory = brokerService.getBrokerDataDirectory();
- }
- else {
- this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
- this.directory=new File(directory,"amqstore");
+ } else {
+ this.directory = new File(IOHelper.getDefaultDataDirectory(), brokerName);
+ this.directory = new File(directory, "amqstore");
}
}
log.info("AMQStore starting using directory: " + directory);
this.directory.mkdirs();
- if(this.usageManager!=null){
+ if (this.usageManager != null) {
this.usageManager.addUsageListener(this);
}
- if(asyncDataManager==null){
- asyncDataManager=createAsyncDataManager();
+ if (asyncDataManager == null) {
+ asyncDataManager = createAsyncDataManager();
}
- if(referenceStoreAdapter==null){
- referenceStoreAdapter=createReferenceStoreAdapter();
+ if (referenceStoreAdapter == null) {
+ referenceStoreAdapter = createReferenceStoreAdapter();
}
- referenceStoreAdapter.setDirectory(new File(directory,"kr-store"));
+ referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
referenceStoreAdapter.setBrokerName(getBrokerName());
referenceStoreAdapter.setUsageManager(usageManager);
- if(taskRunnerFactory==null){
- taskRunnerFactory=createTaskRunnerFactory();
+ if (taskRunnerFactory == null) {
+ taskRunnerFactory = createTaskRunnerFactory();
}
asyncDataManager.start();
- if(deleteAllMessages){
+ if (deleteAllMessages) {
asyncDataManager.delete();
- try{
- JournalTrace trace=new JournalTrace();
- trace.setMessage("DELETED "+new Date());
- Location location=asyncDataManager.write(wireFormat.marshal(trace),false);
- asyncDataManager.setMark(location,true);
+ try {
+ JournalTrace trace = new JournalTrace();
+ trace.setMessage("DELETED " + new Date());
+ Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
+ asyncDataManager.setMark(location, true);
log.info("Journal deleted: ");
- deleteAllMessages=false;
- }catch(IOException e){
+ deleteAllMessages = false;
+ } catch (IOException e) {
throw e;
- }catch(Throwable e){
+ } catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
- Set<Integer> files=referenceStoreAdapter.getReferenceFileIdsInUse();
- log.info("Active data files: "+files);
- checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
+ Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
+ log.info("Active data files: " + files);
+ checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
- public boolean iterate(){
+ public boolean iterate() {
doCheckpoint();
return false;
}
- },"ActiveMQ Journal Checkpoint Worker");
+ }, "ActiveMQ Journal Checkpoint Worker");
createTransactionStore();
-
-//
-// The following was attempting to reduce startup times by avoiding the log
-// file scanning that recovery performs. The problem with it is that XA transactions
-// only live in transaction log and are not stored in the reference store, but they still
-// need to be recovered when the broker starts up.
-
- if(referenceStoreAdapter.isStoreValid()==false){
+
+ //
+ // The following was attempting to reduce startup times by avoiding the
+ // log
+ // file scanning that recovery performs. The problem with it is that XA
+ // transactions
+ // only live in transaction log and are not stored in the reference
+ // store, but they still
+ // need to be recovered when the broker starts up.
+
+ if (referenceStoreAdapter.isStoreValid() == false) {
log.warn("The ReferenceStore is not valid - recovering ...");
recover();
log.info("Finished recovering the ReferenceStore");
- }else {
- Location location=writeTraceMessage("RECOVERED "+new Date(),true);
- asyncDataManager.setMark(location,true);
- //recover transactions
+ } else {
+ Location location = writeTraceMessage("RECOVERED " + new Date(), true);
+ asyncDataManager.setMark(location, true);
+ // recover transactions
getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState());
- }
-
-
+ }
+
// Do a checkpoint periodically.
- periodicCheckpointTask=new Runnable(){
+ periodicCheckpointTask = new Runnable() {
- public void run(){
+ public void run() {
checkpoint(false);
}
};
- Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval);
- periodicCleanupTask=new Runnable(){
+ Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+ periodicCleanupTask = new Runnable() {
- public void run(){
+ public void run() {
cleanup();
}
};
- Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval);
+ Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
}
- public void stop() throws Exception{
-
- if(!started.compareAndSet(true,false))
+ public void stop() throws Exception {
+
+ if (!started.compareAndSet(true, false)) {
return;
+ }
this.usageManager.removeUsageListener(this);
- synchronized(this){
+ synchronized (this) {
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
}
- Iterator<AMQMessageStore> iterator=queues.values().iterator();
- while(iterator.hasNext()){
- AMQMessageStore ms=iterator.next();
+ Iterator<AMQMessageStore> iterator = queues.values().iterator();
+ while (iterator.hasNext()) {
+ AMQMessageStore ms = iterator.next();
ms.stop();
}
- iterator=topics.values().iterator();
- while(iterator.hasNext()){
- final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
+ iterator = topics.values().iterator();
+ while (iterator.hasNext()) {
+ final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
ms.stop();
}
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
- synchronized(this){
+ synchronized (this) {
checkpointTask.shutdown();
}
referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
queues.clear();
topics.clear();
- IOException firstException=null;
+ IOException firstException = null;
referenceStoreAdapter.stop();
- try{
+ try {
log.debug("Journal close");
asyncDataManager.close();
- }catch(Exception e){
- firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
+ } catch (Exception e) {
+ firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
- if(firstException!=null){
+ if (firstException != null) {
throw firstException;
}
}
/**
* When we checkpoint we move all the journalled data to long term storage.
- * @param sync
+ *
+ * @param sync
*/
- public void checkpoint(boolean sync){
- try{
- if(asyncDataManager==null)
+ public void checkpoint(boolean sync) {
+ try {
+ if (asyncDataManager == null) {
throw new IllegalStateException("Journal is closed.");
- CountDownLatch latch=null;
- synchronized(this){
- latch=nextCheckpointCountDownLatch;
+ }
+ CountDownLatch latch = null;
+ synchronized (this) {
+ latch = nextCheckpointCountDownLatch;
checkpointTask.wakeup();
}
- if(sync){
- if(log.isDebugEnabled()){
+ if (sync) {
+ if (log.isDebugEnabled()) {
log.debug("Waitng for checkpoint to complete.");
}
latch.await();
}
referenceStoreAdapter.checkpoint(sync);
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
- log.warn("Request to start checkpoint failed: "+e,e);
- }catch(IOException e){
- log.error("checkpoint failed: "+e,e);
+ log.warn("Request to start checkpoint failed: " + e, e);
+ } catch (IOException e) {
+ log.error("checkpoint failed: " + e, e);
}
}
@@ -281,49 +286,49 @@
*
* @return true if successful
*/
- public boolean doCheckpoint(){
- CountDownLatch latch=null;
- synchronized(this){
- latch=nextCheckpointCountDownLatch;
- nextCheckpointCountDownLatch=new CountDownLatch(1);
+ public boolean doCheckpoint() {
+ CountDownLatch latch = null;
+ synchronized (this) {
+ latch = nextCheckpointCountDownLatch;
+ nextCheckpointCountDownLatch = new CountDownLatch(1);
}
- try{
- if(log.isDebugEnabled()){
+ try {
+ if (log.isDebugEnabled()) {
log.debug("Checkpoint started.");
}
-
- Location newMark=null;
- Iterator<AMQMessageStore> iterator=queues.values().iterator();
- while(iterator.hasNext()){
- final AMQMessageStore ms=iterator.next();
- Location mark=(Location)ms.getMark();
- if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
- newMark=mark;
+
+ Location newMark = null;
+ Iterator<AMQMessageStore> iterator = queues.values().iterator();
+ while (iterator.hasNext()) {
+ final AMQMessageStore ms = iterator.next();
+ Location mark = (Location)ms.getMark();
+ if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+ newMark = mark;
}
}
- iterator=topics.values().iterator();
- while(iterator.hasNext()){
- final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
- Location mark=(Location)ms.getMark();
- if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
- newMark=mark;
+ iterator = topics.values().iterator();
+ while (iterator.hasNext()) {
+ final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
+ Location mark = (Location)ms.getMark();
+ if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
+ newMark = mark;
}
}
- try{
- if(newMark!=null){
- if(log.isDebugEnabled()){
- log.debug("Marking journal at: "+newMark);
+ try {
+ if (newMark != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Marking journal at: " + newMark);
}
- asyncDataManager.setMark(newMark,false);
- writeTraceMessage("CHECKPOINT "+new Date(),true);
+ asyncDataManager.setMark(newMark, false);
+ writeTraceMessage("CHECKPOINT " + new Date(), true);
}
- }catch(Exception e){
- log.error("Failed to mark the Journal: "+e,e);
+ } catch (Exception e) {
+ log.error("Failed to mark the Journal: " + e, e);
}
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Checkpoint done.");
}
- }finally{
+ } finally {
latch.countDown();
}
return true;
@@ -335,79 +340,77 @@
* @return
* @throws IOException
*/
- public void cleanup(){
- try{
- Set<Integer> inUse=referenceStoreAdapter.getReferenceFileIdsInUse();
+ public void cleanup() {
+ try {
+ Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
- }catch(IOException e){
- log.error("Could not cleanup data files: "+e,e);
+ } catch (IOException e) {
+ log.error("Could not cleanup data files: " + e, e);
}
}
- public Set<ActiveMQDestination> getDestinations(){
- Set<ActiveMQDestination> destinations=new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+ public Set<ActiveMQDestination> getDestinations() {
+ Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
- MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
- if(destination.isQueue()){
+ MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
+ if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue)destination);
- }else{
+ } else {
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
-
-
- public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
- AMQMessageStore store=queues.get(destination);
- if(store==null){
- ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination);
- store=new AMQMessageStore(this,checkpointStore,destination);
- try{
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ AMQMessageStore store = queues.get(destination);
+ if (store == null) {
+ ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
+ store = new AMQMessageStore(this, checkpointStore, destination);
+ try {
store.start();
- }catch(Exception e){
+ } catch (Exception e) {
throw IOExceptionSupport.create(e);
}
- queues.put(destination,store);
+ queues.put(destination, store);
}
return store;
}
- public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{
- AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName);
- if(store==null){
- TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName);
- store=new AMQTopicMessageStore(this,checkpointStore,destinationName);
- try{
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
+ AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName);
+ if (store == null) {
+ TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
+ store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
+ try {
store.start();
- }catch(Exception e){
+ } catch (Exception e) {
throw IOExceptionSupport.create(e);
}
- topics.put(destinationName,store);
+ topics.put(destinationName, store);
}
return store;
}
- public TransactionStore createTransactionStore() throws IOException{
+ public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
- public long getLastMessageBrokerSequenceId() throws IOException{
+ public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
- public void beginTransaction(ConnectionContext context) throws IOException{
+ public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
- public void commitTransaction(ConnectionContext context) throws IOException{
+ public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
- public void rollbackTransaction(ConnectionContext context) throws IOException{
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
@@ -416,91 +419,93 @@
* @return
* @throws IOException
*/
- public DataStructure readCommand(Location location) throws IOException{
- try{
- ByteSequence packet=asyncDataManager.read(location);
+ public DataStructure readCommand(Location location) throws IOException {
+ try {
+ ByteSequence packet = asyncDataManager.read(location);
return (DataStructure)wireFormat.unmarshal(packet);
- }catch(IOException e){
- throw createReadException(location,e);
+ } catch (IOException e) {
+ throw createReadException(location, e);
}
}
/**
- * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
+ * Move all the messages that were in the journal into long term storage. We
+ * just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
- private void recover() throws IllegalStateException,IOException{
+ private void recover() throws IllegalStateException, IOException {
referenceStoreAdapter.clearMessages();
referenceStoreAdapter.recoverState();
- Location pos=null;
- int redoCounter=0;
- log.info("Journal Recovery Started from: "+asyncDataManager);
- long start=System.currentTimeMillis();
- ConnectionContext context=new ConnectionContext();
+ Location pos = null;
+ int redoCounter = 0;
+ log.info("Journal Recovery Started from: " + asyncDataManager);
+ long start = System.currentTimeMillis();
+ ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
- while((pos=asyncDataManager.getNextLocation(pos))!=null){
- ByteSequence data=asyncDataManager.read(pos);
- DataStructure c=(DataStructure)wireFormat.unmarshal(data);
- if(c instanceof Message){
- Message message=(Message)c;
- AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination());
- if(message.isInTransaction()){
- transactionStore.addMessage(store,message,pos);
- }else{
- if(store.replayAddMessage(context,message,pos)){
+ while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+ ByteSequence data = asyncDataManager.read(pos);
+ DataStructure c = (DataStructure)wireFormat.unmarshal(data);
+ if (c instanceof Message) {
+ Message message = (Message)c;
+ AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
+ if (message.isInTransaction()) {
+ transactionStore.addMessage(store, message, pos);
+ } else {
+ if (store.replayAddMessage(context, message, pos)) {
redoCounter++;
}
}
- }else{
- switch(c.getDataStructureType()){
+ } else {
+ switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
- JournalQueueAck command=(JournalQueueAck)c;
- AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination());
- if(command.getMessageAck().isInTransaction()){
- transactionStore.removeMessage(store,command.getMessageAck(),pos);
- }else{
- if(store.replayRemoveMessage(context,command.getMessageAck())){
+ JournalQueueAck command = (JournalQueueAck)c;
+ AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
+ if (command.getMessageAck().isInTransaction()) {
+ transactionStore.removeMessage(store, command.getMessageAck(), pos);
+ } else {
+ if (store.replayRemoveMessage(context, command.getMessageAck())) {
redoCounter++;
}
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: {
- JournalTopicAck command=(JournalTopicAck)c;
- AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination());
- if(command.getTransactionId()!=null){
- transactionStore.acknowledge(store,command,pos);
- }else{
- if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
- .getMessageId())){
+ JournalTopicAck command = (JournalTopicAck)c;
+ AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
+ if (command.getTransactionId() != null) {
+ transactionStore.acknowledge(store, command, pos);
+ } else {
+ if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
redoCounter++;
}
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE: {
- JournalTransaction command=(JournalTransaction)c;
- try{
+ JournalTransaction command = (JournalTransaction)c;
+ try {
// Try to replay the packet.
- switch(command.getType()){
+ switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
- AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
- if(tx==null)
- break; // We may be trying to replay a commit that
+ AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
+ if (tx == null) {
+ break; // We may be trying to replay a commit
+ }
+ // that
// was already committed.
// Replay the committed operations.
tx.getOperations();
- for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
- AMQTxOperation op=(AMQTxOperation)iter.next();
- if (op.replay(this,context)) {
+ for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
+ AMQTxOperation op = (AMQTxOperation)iter.next();
+ if (op.replay(this, context)) {
redoCounter++;
}
}
@@ -510,176 +515,174 @@
transactionStore.replayRollback(command.getTransactionId());
break;
}
- }catch(IOException e){
- log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
+ } catch (IOException e) {
+ log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
- JournalTrace trace=(JournalTrace)c;
- log.debug("TRACE Entry: "+trace.getMessage());
+ JournalTrace trace = (JournalTrace)c;
+ log.debug("TRACE Entry: " + trace.getMessage());
break;
default:
- log.error("Unknown type of record in transaction log which will be discarded: "+c);
+ log.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
- Location location=writeTraceMessage("RECOVERED "+new Date(),true);
- asyncDataManager.setMark(location,true);
- long end=System.currentTimeMillis();
- log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds.");
+ Location location = writeTraceMessage("RECOVERED " + new Date(), true);
+ asyncDataManager.setMark(location, true);
+ long end = System.currentTimeMillis();
+ log.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
}
- private IOException createReadException(Location location,Exception e){
- return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
+ private IOException createReadException(Location location, Exception e) {
+ return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
- protected IOException createWriteException(DataStructure packet,Exception e){
- return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
+ protected IOException createWriteException(DataStructure packet, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
- protected IOException createWriteException(String command,Exception e){
- return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
+ protected IOException createWriteException(String command, Exception e) {
+ return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
- protected IOException createRecoveryFailedException(Exception e){
- return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
+ protected IOException createRecoveryFailedException(Exception e) {
+ return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
- *
* @param command
* @param syncHint
* @return
* @throws IOException
*/
- public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{
- return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite));
+ public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
+ return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite));
}
- private Location writeTraceMessage(String message,boolean sync) throws IOException{
- JournalTrace trace=new JournalTrace();
+ private Location writeTraceMessage(String message, boolean sync) throws IOException {
+ JournalTrace trace = new JournalTrace();
trace.setMessage(message);
- return writeCommand(trace,sync);
+ return writeCommand(trace, sync);
}
- public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
- newPercentUsage=((newPercentUsage)/10)*10;
- oldPercentUsage=((oldPercentUsage)/10)*10;
- if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){
+ public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ newPercentUsage = ((newPercentUsage) / 10) * 10;
+ oldPercentUsage = ((oldPercentUsage) / 10) * 10;
+ if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
checkpoint(false);
}
}
- public AMQTransactionStore getTransactionStore(){
+ public AMQTransactionStore getTransactionStore() {
return transactionStore;
}
- public synchronized void deleteAllMessages() throws IOException{
- deleteAllMessages=true;
+ public synchronized void deleteAllMessages() throws IOException {
+ deleteAllMessages = true;
}
- public String toString(){
- return "AMQPersistenceAdapter("+directory+")";
+ public String toString() {
+ return "AMQPersistenceAdapter(" + directory + ")";
}
// /////////////////////////////////////////////////////////////////
// Subclass overridables
// /////////////////////////////////////////////////////////////////
- protected AsyncDataManager createAsyncDataManager(){
- AsyncDataManager manager=new AsyncDataManager();
- manager.setDirectory(new File(directory,"journal"));
+ protected AsyncDataManager createAsyncDataManager() {
+ AsyncDataManager manager = new AsyncDataManager();
+ manager.setDirectory(new File(directory, "journal"));
return manager;
}
- protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException{
- KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter();
+ protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
+ KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter();
return adaptor;
}
- protected TaskRunnerFactory createTaskRunnerFactory(){
+ protected TaskRunnerFactory createTaskRunnerFactory() {
return DefaultThreadPools.getDefaultTaskRunnerFactory();
}
// /////////////////////////////////////////////////////////////////
// Property Accessors
// /////////////////////////////////////////////////////////////////
- public AsyncDataManager getAsyncDataManager(){
+ public AsyncDataManager getAsyncDataManager() {
return asyncDataManager;
}
- public void setAsyncDataManager(AsyncDataManager asyncDataManager){
- this.asyncDataManager=asyncDataManager;
+ public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+ this.asyncDataManager = asyncDataManager;
}
- public ReferenceStoreAdapter getReferenceStoreAdapter(){
+ public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
- public TaskRunnerFactory getTaskRunnerFactory(){
+ public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
- public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
- this.taskRunnerFactory=taskRunnerFactory;
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+ this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return Returns the wireFormat.
*/
- public WireFormat getWireFormat(){
+ public WireFormat getWireFormat() {
return wireFormat;
}
- public void setWireFormat(WireFormat wireFormat){
- this.wireFormat=wireFormat;
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
}
- public UsageManager getUsageManager(){
+ public UsageManager getUsageManager() {
return usageManager;
}
- public void setUsageManager(UsageManager usageManager){
- this.usageManager=usageManager;
+ public void setUsageManager(UsageManager usageManager) {
+ this.usageManager = usageManager;
}
- public int getMaxCheckpointMessageAddSize(){
+ public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
- public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
- this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
+ public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
+ this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
- public int getMaxCheckpointWorkers(){
+ public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
- public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
- this.maxCheckpointWorkers=maxCheckpointWorkers;
+ public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
+ this.maxCheckpointWorkers = maxCheckpointWorkers;
}
- public synchronized File getDirectory(){
+ public synchronized File getDirectory() {
return directory;
}
- public synchronized void setDirectory(File directory){
- this.directory=directory;
+ public synchronized void setDirectory(File directory) {
+ this.directory = directory;
}
- public boolean isSyncOnWrite(){
+ public boolean isSyncOnWrite() {
return this.syncOnWrite;
}
- public void setSyncOnWrite(boolean syncOnWrite){
- this.syncOnWrite=syncOnWrite;
+ public void setSyncOnWrite(boolean syncOnWrite) {
+ this.syncOnWrite = syncOnWrite;
}
-
/**
* @param referenceStoreAdapter the referenceStoreAdapter to set
*/
- public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){
- this.referenceStoreAdapter=referenceStoreAdapter;
+ public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+ this.referenceStoreAdapter = referenceStoreAdapter;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Wed Aug 8 11:56:59 2007
@@ -28,106 +28,101 @@
*
* @version $Revision: 1.17 $
*/
-public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{
+public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
private TaskRunnerFactory taskRunnerFactory;
private File dataDirectory;
private int journalThreadPriority = Thread.MAX_PRIORITY;
- private String brokerName="localhost";
+ private String brokerName = "localhost";
private ReferenceStoreAdapter referenceStoreAdapter;
-
+
/**
* @return a AMQPersistenceAdapter
* @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter()
*/
- public PersistenceAdapter createPersistenceAdapter(){
- AMQPersistenceAdapter result = new AMQPersistenceAdapter();
+ public PersistenceAdapter createPersistenceAdapter() {
+ AMQPersistenceAdapter result = new AMQPersistenceAdapter();
result.setDirectory(getDataDirectory());
result.setTaskRunnerFactory(getTaskRunnerFactory());
result.setBrokerName(getBrokerName());
result.setReferenceStoreAdapter(getReferenceStoreAdapter());
return result;
}
-
+
/**
* @return the dataDirectory
*/
- public File getDataDirectory(){
- if(this.dataDirectory==null){
- this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+ public File getDataDirectory() {
+ if (this.dataDirectory == null) {
+ this.dataDirectory = new File(IOHelper.getDefaultDataDirectory(), brokerName);
}
return this.dataDirectory;
}
-
+
/**
* @param dataDirectory the dataDirectory to set
*/
- public void setDataDirectory(File dataDirectory){
- this.dataDirectory=dataDirectory;
+ public void setDataDirectory(File dataDirectory) {
+ this.dataDirectory = dataDirectory;
}
-
+
/**
* @return the taskRunnerFactory
*/
- public TaskRunnerFactory getTaskRunnerFactory(){
- if( taskRunnerFactory == null ) {
- taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000);
+ public TaskRunnerFactory getTaskRunnerFactory() {
+ if (taskRunnerFactory == null) {
+ taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
+ true, 1000);
}
return taskRunnerFactory;
}
-
+
/**
* @param taskRunnerFactory the taskRunnerFactory to set
*/
- public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
- this.taskRunnerFactory=taskRunnerFactory;
+ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+ this.taskRunnerFactory = taskRunnerFactory;
}
-
/**
* @return the journalThreadPriority
*/
- public int getJournalThreadPriority(){
+ public int getJournalThreadPriority() {
return this.journalThreadPriority;
}
-
/**
* @param journalThreadPriority the journalThreadPriority to set
*/
- public void setJournalThreadPriority(int journalThreadPriority){
- this.journalThreadPriority=journalThreadPriority;
+ public void setJournalThreadPriority(int journalThreadPriority) {
+ this.journalThreadPriority = journalThreadPriority;
}
-
/**
* @return the brokerName
*/
- public String getBrokerName(){
+ public String getBrokerName() {
return this.brokerName;
}
-
/**
* @param brokerName the brokerName to set
*/
- public void setBrokerName(String brokerName){
- this.brokerName=brokerName;
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
}
-
/**
* @return the referenceStoreAdapter
*/
- public ReferenceStoreAdapter getReferenceStoreAdapter(){
+ public ReferenceStoreAdapter getReferenceStoreAdapter() {
return this.referenceStoreAdapter;
}
-
/**
* @param referenceStoreAdapter the referenceStoreAdapter to set
*/
- public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){
- this.referenceStoreAdapter=referenceStoreAdapter;
+ public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
+ this.referenceStoreAdapter = referenceStoreAdapter;
}
}
|