Author: chirino Date: Fri May 12 10:52:51 2006 New Revision: 405807 URL: http://svn.apache.org/viewcvs?rev=405807&view=rev Log: Implementing a Rapid store which is a mix of the QuickJournal and the Kaha store. Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (with props) Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java?rev=405807&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java Fri May 12 10:52:51 2006 @@ -0,0 +1,46 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.rapid; + +import org.apache.activeio.journal.RecordLocation; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; + +public class RapidMessageReference { + public final MessageId messageId; + public final long expiration; + public final RecordLocation location; + + public RapidMessageReference(Message message, RecordLocation location) { + this.messageId = message.getMessageId(); + this.expiration = message.getExpiration(); + this.location=location; + } + + public long getExpiration() { + return expiration; + } + + public MessageId getMessageId() { + return messageId; + } + + public RecordLocation getLocation() { + return location; + } +} \ No newline at end of file Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?rev=405807&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri May 12 10:52:51 2006 @@ -0,0 +1,289 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.rapid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.journal.active.Location; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.JournalQueueAck; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.TransactionTemplate; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A MessageStore that uses a Journal to store it's messages. + * + * @version $Revision: 1.14 $ + */ +public class RapidMessageStore implements MessageStore { + + private static final Log log = LogFactory.getLog(RapidMessageStore.class); + + protected final RapidPersistenceAdapter peristenceAdapter; + protected final RapidTransactionStore transactionStore; + protected final MapContainer messageContainer; + protected final ActiveMQDestination destination; + protected final TransactionTemplate transactionTemplate; + +// private LinkedHashMap messages = new LinkedHashMap(); +// private ArrayList messageAcks = new ArrayList(); + +// /** A MessageStore that we can use to retrieve messages quickly. */ +// private LinkedHashMap cpAddedMessageIds; + + protected RecordLocation lastLocation; + protected HashSet inFlightTxLocations = new HashSet(); + + public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) { + this.peristenceAdapter = adapter; + this.transactionStore = adapter.getTransactionStore(); + this.messageContainer = container; + this.destination = destination; + this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + } + + + /** + * Not synchronized since the Journal has better throughput if you increase + * the number of concurrent writes that it is doing. + */ + public void addMessage(ConnectionContext context, final Message message) throws IOException { + + final MessageId id = message.getMessageId(); + + final boolean debug = log.isDebugEnabled(); + final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); + final RapidMessageReference md = new RapidMessageReference(message, location); + + if( !context.isInTransaction() ) { + if( debug ) + log.debug("Journalled message add for: "+id+", at: "+location); + addMessage(md); + } else { + message.incrementReferenceCount(); + if( debug ) + log.debug("Journalled transacted message add for: "+id+", at: "+location); + synchronized( this ) { + inFlightTxLocations.add(location); + } + transactionStore.addMessage(this, message, location); + context.getTransaction().addSynchronization(new Synchronization(){ + public void afterCommit() throws Exception { + if( debug ) + log.debug("Transacted message add commit for: "+id+", at: "+location); + message.decrementReferenceCount(); + synchronized( RapidMessageStore.this ) { + inFlightTxLocations.remove(location); + addMessage(md); + } + } + public void afterRollback() throws Exception { + if( debug ) + log.debug("Transacted message add rollback for: "+id+", at: "+location); + message.decrementReferenceCount(); + synchronized( RapidMessageStore.this ) { + inFlightTxLocations.remove(location); + } + } + }); + } + } + + private void addMessage(final RapidMessageReference messageReference) { + synchronized (this) { + lastLocation = messageReference.getLocation(); + MessageId id = messageReference.getMessageId(); + messageContainer.put(id.toString(), messageReference); + } + } + + static protected String toString(RecordLocation location) { + Location l = (Location) location; + return l.getLogFileId()+":"+l.getLogFileOffset(); + } + + static protected RecordLocation toRecordLocation(String t) { + String[] strings = t.split(":"); + if( strings.length!=2 ) + throw new IllegalArgumentException("Invalid location: "+t); + return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1])); + } + + public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) { + try { + RapidMessageReference messageReference = new RapidMessageReference(message, location); + messageContainer.put(message.getMessageId().toString(), messageReference); + } + catch (Throwable e) { + log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); + } + } + + /** + */ + public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { + final boolean debug = log.isDebugEnabled(); + JournalQueueAck remove = new JournalQueueAck(); + remove.setDestination(destination); + remove.setMessageAck(ack); + + final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); + if( !context.isInTransaction() ) { + if( debug ) + log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); + removeMessage(ack, location); + } else { + if( debug ) + log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); + synchronized( this ) { + inFlightTxLocations.add(location); + } + transactionStore.removeMessage(this, ack, location); + context.getTransaction().addSynchronization(new Synchronization(){ + public void afterCommit() throws Exception { + if( debug ) + log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); + synchronized( RapidMessageStore.this ) { + inFlightTxLocations.remove(location); + removeMessage(ack, location); + } + } + public void afterRollback() throws Exception { + if( debug ) + log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); + synchronized( RapidMessageStore.this ) { + inFlightTxLocations.remove(location); + } + } + }); + + } + } + + private void removeMessage(final MessageAck ack, final RecordLocation location) { + synchronized (this) { + lastLocation = location; + MessageId id = ack.getLastMessageId(); + messageContainer.remove(id.toString()); + } + } + + public void replayRemoveMessage(ConnectionContext context, MessageAck ack) { + try { + MessageId id = ack.getLastMessageId(); + messageContainer.remove(id.toString()); + } + catch (Throwable e) { + log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); + } + } + + /** + * + */ + public Message getMessage(MessageId id) throws IOException { + RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString()); + if (messageReference == null ) + return null; + return (Message) peristenceAdapter.readCommand(messageReference.getLocation()); + } + + /** + * Replays the checkpointStore first as those messages are the oldest ones, + * then messages are replayed from the transaction log and then the cache is + * updated. + * + * @param listener + * @throws Exception + */ + public void recover(final MessageRecoveryListener listener) throws Exception { + + for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){ + RapidMessageReference messageReference=(RapidMessageReference) iter.next(); + Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation()); + listener.recoverMessage(m); + } + listener.finished(); + + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + /** + * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) + */ + public void removeAllMessages(ConnectionContext context) throws IOException { + messageContainer.clear(); + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { + throw new IOException("The journal does not support message references."); + } + + public String getMessageReference(MessageId identity) throws IOException { + throw new IOException("The journal does not support message references."); + } + + + public void setUsageManager(UsageManager usageManager) { + } + + /** + * @return + * @throws IOException + */ + public RecordLocation checkpoint() throws IOException { + + ArrayList cpActiveJournalLocations; + + // swap out the message hash maps.. + synchronized (this) { + cpActiveJournalLocations=new ArrayList(inFlightTxLocations); + } + + if( cpActiveJournalLocations.size() > 0 ) { + Collections.sort(cpActiveJournalLocations); + return (RecordLocation) cpActiveJournalLocations.get(0); + } else { + return lastLocation; + } + } + +} \ No newline at end of file Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?rev=405807&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Fri May 12 10:52:51 2006 @@ -0,0 +1,672 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.rapid; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.activeio.command.WireFormat; +import org.apache.activeio.journal.InvalidRecordLocationException; +import org.apache.activeio.journal.Journal; +import org.apache.activeio.journal.JournalEventListener; +import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.journal.active.JournalImpl; +import org.apache.activeio.packet.Packet; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.JournalQueueAck; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.JournalTrace; +import org.apache.activemq.command.JournalTransaction; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.Store; +import org.apache.activemq.kaha.StoreFactory; +import org.apache.activemq.kaha.StringMarshaller; +import org.apache.activemq.memory.UsageListener; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller; +import org.apache.activemq.store.kahadaptor.CommandMarshaller; +import org.apache.activemq.store.rapid.RapidTransactionStore.Tx; +import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation; +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.thread.Task; +import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.Callable; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import edu.emory.mathcs.backport.java.util.concurrent.FutureTask; +import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +/** + * An implementation of {@link PersistenceAdapter} designed for use with a + * {@link Journal} and then check pointing asynchronously on a timeout with some + * other long term persistent storage. + * + * @org.apache.xbean.XBean + * + * @version $Revision: 1.17 $ + */ +public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener { + + private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class); + private final Journal journal; + + private final WireFormat wireFormat = new OpenWireFormat(); + + private final ConcurrentHashMap queues = new ConcurrentHashMap(); + private final ConcurrentHashMap topics = new ConcurrentHashMap(); + + private long checkpointInterval = 1000 * 60 * 5; + private long lastCheckpointRequest = System.currentTimeMillis(); + private long lastCleanup = System.currentTimeMillis(); + private int maxCheckpointWorkers = 10; + private int maxCheckpointMessageAddSize = 5000; + + private RapidTransactionStore transactionStore = new RapidTransactionStore(this); + private ThreadPoolExecutor checkpointExecutor; + + private TaskRunner checkpointTask; + private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); + private boolean fullCheckPoint; + + private AtomicBoolean started = new AtomicBoolean(false); + + Store store; + private boolean useExternalMessageReferences; + + + private final Runnable periodicCheckpointTask = new Runnable() { + public void run() { + if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { + checkpoint(false, true); + } + } + }; + + public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException { + + this.journal = journal; + journal.setJournalEventListener(this); + + File dir = ((JournalImpl)journal).getLogDirectory(); + String name=dir.getAbsolutePath()+File.separator+"kaha.db"; + store=StoreFactory.open(name,"rw"); + + checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ + public boolean iterate() { + return doCheckpoint(); + } + }, "ActiveMQ Checkpoint Worker"); + + } + + public Set getDestinations() { + Set rc=new HashSet(); + try { + for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ + Object obj=i.next(); + if(obj instanceof ActiveMQDestination){ + rc.add(obj); + } + } + }catch(IOException e){ + log.error("Failed to get destinations " ,e); + } + return rc; + } + + private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { + if (destination.isQueue()) { + return createQueueMessageStore((ActiveMQQueue) destination); + } + else { + return createTopicMessageStore((ActiveMQTopic) destination); + } + } + + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { + RapidMessageStore store = (RapidMessageStore) queues.get(destination); + if (store == null) { + MapContainer messageContainer=getMapContainer(destination,"topic-data"); + store = new RapidMessageStore(this, destination, messageContainer); + queues.put(destination, store); + } + return store; + } + + protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ + MapContainer container=store.getMapContainer(id,containerName); + container.setKeyMarshaller(new StringMarshaller()); + if(useExternalMessageReferences){ + container.setValueMarshaller(new StringMarshaller()); + }else{ + container.setValueMarshaller(new CommandMarshaller(wireFormat)); + } + container.load(); + return container; + } + + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { + RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination); + if (store == null) { + + MapContainer messageContainer=getMapContainer(destination,"topic-data"); + MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs"); + MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks"); + + ackContainer.setKeyMarshaller(new StringMarshaller()); + ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); + + store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer); + topics.put(destination, store); + } + return store; + } + + public TransactionStore createTransactionStore() throws IOException { + return transactionStore; + } + + public long getLastMessageBrokerSequenceId() throws IOException { + // TODO: implement this. + return 0; + } + + public void beginTransaction(ConnectionContext context) throws IOException { + } + + public void commitTransaction(ConnectionContext context) throws IOException { + } + + public void rollbackTransaction(ConnectionContext context) throws IOException { + } + + public synchronized void start() throws Exception { + if( !started.compareAndSet(false, true) ) + return; + + checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runable) { + Thread t = new Thread(runable, "Journal checkpoint worker"); + t.setPriority(7); + return t; + } + }); + checkpointExecutor.allowCoreThreadTimeOut(true); + + createTransactionStore(); + recover(); + + // Do a checkpoint periodically. + Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10); + + } + + public void stop() throws Exception { + + if( !started.compareAndSet(true, false) ) + return; + + Scheduler.cancel(periodicCheckpointTask); + + // Take one final checkpoint and stop checkpoint processing. + checkpoint(false, true); + checkpointTask.shutdown(); + checkpointExecutor.shutdown(); + + queues.clear(); + topics.clear(); + + IOException firstException = null; + try { + journal.close(); + } catch (Exception e) { + firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); + } + store.close(); + + if (firstException != null) { + throw firstException; + } + } + + // Properties + // ------------------------------------------------------------------------- + + /** + * @return Returns the wireFormat. + */ + public WireFormat getWireFormat() { + return wireFormat; + } + + // Implementation methods + // ------------------------------------------------------------------------- + + /** + * The Journal give us a call back so that we can move old data out of the + * journal. Taking a checkpoint does this for us. + * + * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) + */ + public void overflowNotification(RecordLocation safeLocation) { + checkpoint(false, true); + } + + /** + * When we checkpoint we move all the journalled data to long term storage. + * @param stopping + * + * @param b + */ + public void checkpoint(boolean sync, boolean fullCheckpoint) { + try { + if (journal == null ) + throw new IllegalStateException("Journal is closed."); + + long now = System.currentTimeMillis(); + CountDownLatch latch = null; + synchronized(this) { + latch = nextCheckpointCountDownLatch; + lastCheckpointRequest = now; + if( fullCheckpoint ) { + this.fullCheckPoint = true; + } + } + + checkpointTask.wakeup(); + + if (sync) { + log.debug("Waking for checkpoint to complete."); + latch.await(); + } + } + catch (InterruptedException e) { + log.warn("Request to start checkpoint failed: " + e, e); + } + } + + /** + * This does the actual checkpoint. + * @return + */ + public boolean doCheckpoint() { + CountDownLatch latch = null; + boolean fullCheckpoint; + synchronized(this) { + latch = nextCheckpointCountDownLatch; + nextCheckpointCountDownLatch = new CountDownLatch(1); + fullCheckpoint = this.fullCheckPoint; + this.fullCheckPoint=false; + } + try { + + log.debug("Checkpoint started."); + RecordLocation newMark = null; + + ArrayList futureTasks = new ArrayList(queues.size()+topics.size()); + + // + // We do many partial checkpoints (fullCheckpoint==false) to move topic messages + // to long term store as soon as possible. + // + // We want to avoid doing that for queue messages since removes the come in the same + // checkpoint cycle will nullify the previous message add. Therefore, we only + // checkpoint queues on the fullCheckpoint cycles. + // + if( fullCheckpoint ) { + Iterator iterator = queues.values().iterator(); + while (iterator.hasNext()) { + try { + final RapidMessageStore ms = (RapidMessageStore) iterator.next(); + FutureTask task = new FutureTask(new Callable() { + public Object call() throws Exception { + return ms.checkpoint(); + }}); + futureTasks.add(task); + checkpointExecutor.execute(task); + } + catch (Exception e) { + log.error("Failed to checkpoint a message store: " + e, e); + } + } + } + + Iterator iterator = topics.values().iterator(); + while (iterator.hasNext()) { + try { + final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next(); + FutureTask task = new FutureTask(new Callable() { + public Object call() throws Exception { + return ms.checkpoint(); + }}); + futureTasks.add(task); + checkpointExecutor.execute(task); + } + catch (Exception e) { + log.error("Failed to checkpoint a message store: " + e, e); + } + } + + try { + for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { + FutureTask ft = (FutureTask) iter.next(); + RecordLocation mark = (RecordLocation) ft.get(); + // We only set a newMark on full checkpoints. + if( fullCheckpoint ) { + if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + newMark = mark; + } + } + } + } catch (Throwable e) { + log.error("Failed to checkpoint a message store: " + e, e); + } + + + if( fullCheckpoint ) { + try { + if (newMark != null) { + log.debug("Marking journal at: " + newMark); + journal.setMark(newMark, true); + } + } + catch (Exception e) { + log.error("Failed to mark the Journal: " + e, e); + } + +// TODO: do we need to implement a periodic clean up? + +// if (longTermPersistence instanceof JDBCPersistenceAdapter) { +// // We may be check pointing more often than the checkpointInterval if under high use +// // But we don't want to clean up the db that often. +// long now = System.currentTimeMillis(); +// if( now > lastCleanup+checkpointInterval ) { +// lastCleanup = now; +// ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); +// } +// } + } + + log.debug("Checkpoint done."); + } + finally { + latch.countDown(); + } + synchronized(this) { + return this.fullCheckPoint; + } + + } + + /** + * @param location + * @return + * @throws IOException + */ + public DataStructure readCommand(RecordLocation location) throws IOException { + try { + Packet data = journal.read(location); + return (DataStructure) wireFormat.unmarshal(data); + } + catch (InvalidRecordLocationException e) { + throw createReadException(location, e); + } + catch (IOException e) { + throw createReadException(location, e); + } + } + + /** + * Move all the messages that were in the journal into long term storage. We + * just replay and do a checkpoint. + * + * @throws IOException + * @throws IOException + * @throws InvalidRecordLocationException + * @throws IllegalStateException + */ + private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { + + RecordLocation pos = null; + int transactionCounter = 0; + + log.info("Journal Recovery Started."); + ConnectionContext context = new ConnectionContext(); + + // While we have records in the journal. + while ((pos = journal.getNextRecordLocation(pos)) != null) { + Packet data = journal.read(pos); + DataStructure c = (DataStructure) wireFormat.unmarshal(data); + + if (c instanceof Message ) { + Message message = (Message) c; + RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination()); + if ( message.isInTransaction()) { + transactionStore.addMessage(store, message, pos); + } + else { + store.replayAddMessage(context, message, pos); + transactionCounter++; + } + } else { + switch (c.getDataStructureType()) { + case JournalQueueAck.DATA_STRUCTURE_TYPE: + { + JournalQueueAck command = (JournalQueueAck) c; + RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination()); + if (command.getMessageAck().isInTransaction()) { + transactionStore.removeMessage(store, command.getMessageAck(), pos); + } + else { + store.replayRemoveMessage(context, command.getMessageAck()); + transactionCounter++; + } + } + break; + case JournalTopicAck.DATA_STRUCTURE_TYPE: + { + JournalTopicAck command = (JournalTopicAck) c; + RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination()); + if (command.getTransactionId() != null) { + transactionStore.acknowledge(store, command, pos); + } + else { + store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); + transactionCounter++; + } + } + break; + case JournalTransaction.DATA_STRUCTURE_TYPE: + { + JournalTransaction command = (JournalTransaction) c; + try { + // Try to replay the packet. + switch (command.getType()) { + case JournalTransaction.XA_PREPARE: + transactionStore.replayPrepare(command.getTransactionId()); + break; + case JournalTransaction.XA_COMMIT: + case JournalTransaction.LOCAL_COMMIT: + Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); + if (tx == null) + break; // We may be trying to replay a commit that + // was already committed. + + // Replay the committed operations. + for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { + TxOperation op = (TxOperation) iter.next(); + if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { + op.store.replayAddMessage(context, (Message) op.data, op.location); + } + if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { + op.store.replayRemoveMessage(context, (MessageAck) op.data); + } + if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { + JournalTopicAck ack = (JournalTopicAck) op.data; + ((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack + .getMessageId()); + } + } + transactionCounter++; + break; + case JournalTransaction.LOCAL_ROLLBACK: + case JournalTransaction.XA_ROLLBACK: + transactionStore.replayRollback(command.getTransactionId()); + break; + } + } + catch (IOException e) { + log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); + } + } + break; + case JournalTrace.DATA_STRUCTURE_TYPE: + JournalTrace trace = (JournalTrace) c; + log.debug("TRACE Entry: " + trace.getMessage()); + break; + default: + log.error("Unknown type of record in transaction log which will be discarded: " + c); + } + } + } + + RecordLocation location = writeTraceMessage("RECOVERED", true); + journal.setMark(location, true); + + log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); + } + + private IOException createReadException(RecordLocation location, Exception e) { + return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); + } + + protected IOException createWriteException(DataStructure packet, Exception e) { + return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); + } + + protected IOException createWriteException(String command, Exception e) { + return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); + } + + protected IOException createRecoveryFailedException(Exception e) { + return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); + } + + /** + * + * @param command + * @param sync + * @return + * @throws IOException + */ + public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { + if( started.get() ) + return journal.write(wireFormat.marshal(command), sync); + throw new IOException("closed"); + } + + private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { + JournalTrace trace = new JournalTrace(); + trace.setMessage(message); + return writeCommand(trace, sync); + } + + public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { + if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) { + checkpoint(false, true); + } + } + + public RapidTransactionStore getTransactionStore() { + return transactionStore; + } + + public void deleteAllMessages() throws IOException { + try { + JournalTrace trace = new JournalTrace(); + trace.setMessage("DELETED"); + RecordLocation location = journal.write(wireFormat.marshal(trace), false); + journal.setMark(location, true); + log.info("Journal deleted: "); + } catch (IOException e) { + throw e; + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + + if(store!=null){ + store.delete(); + } + } + + public int getMaxCheckpointMessageAddSize() { + return maxCheckpointMessageAddSize; + } + + public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { + this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; + } + + public int getMaxCheckpointWorkers() { + return maxCheckpointWorkers; + } + + public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { + this.maxCheckpointWorkers = maxCheckpointWorkers; + } + + public boolean isUseExternalMessageReferences() { + return false; + } + + public void setUseExternalMessageReferences(boolean enable) { + if( enable ) + throw new IllegalArgumentException("The journal does not support message references."); + } + + public void setUsageManager(UsageManager usageManager) { + } + + public Store getStore() { + return store; + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?rev=405807&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri May 12 10:52:51 2006 @@ -0,0 +1,294 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.rapid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.activeio.journal.RecordLocation; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.kaha.ListContainer; +import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.Store; +import org.apache.activemq.kaha.StringMarshaller; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +/** + * A MessageStore that uses a Journal to store it's messages. + * + * @version $Revision: 1.13 $ + */ +public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore { + + private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class); + + private HashMap ackedLastAckLocations = new HashMap(); + private final MapContainer subscriberContainer; + private final MapContainer ackContainer; + private final Store store; + private Map subscriberAcks=new ConcurrentHashMap(); + + public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException { + super(adapter, destination, messageContainer); + this.subscriberContainer = subsContainer; + this.ackContainer = ackContainer; + this.store=adapter.getStore(); + + for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ + Object key=i.next(); + addSubscriberAckContainer(key); + } + } + + public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { + + String key=getSubscriptionKey(clientId,subscriptionName); + ListContainer list=(ListContainer) subscriberAcks.get(key); + if(list!=null){ + for(Iterator i=list.iterator();i.hasNext();){ + Object msg=messageContainer.get(i.next()); + if(msg!=null){ + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String) msg); + }else{ + listener.recoverMessage((Message) msg); + } + } + listener.finished(); + } + } else { + listener.finished(); + } + + } + + public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { + return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); + } + + public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + SubscriptionInfo info=new SubscriptionInfo(); + info.setDestination(destination); + info.setClientId(clientId); + info.setSelector(selector); + info.setSubcriptionName(subscriptionName); + String key=getSubscriptionKey(clientId,subscriptionName); + // if already exists - won't add it again as it causes data files + // to hang around + if(!subscriberContainer.containsKey(key)){ + subscriberContainer.put(key,info); + } + addSubscriberAckContainer(key); + } + + public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ + int subscriberCount=subscriberAcks.size(); + if(subscriberCount>0){ + String id=message.getMessageId().toString(); + ackContainer.put(id,new AtomicInteger(subscriberCount)); + for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){ + Object key=i.next(); + ListContainer container=store.getListContainer(key,"durable-subs"); + container.add(id); + } + super.addMessage(context,message); + } + } + + + /** + */ + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { + final boolean debug = log.isDebugEnabled(); + + JournalTopicAck ack = new JournalTopicAck(); + ack.setDestination(destination); + ack.setMessageId(messageId); + ack.setMessageSequenceId(messageId.getBrokerSequenceId()); + ack.setSubscritionName(subscriptionName); + ack.setClientId(clientId); + ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null); + final RecordLocation location = peristenceAdapter.writeCommand(ack, false); + + final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); + if( !context.isInTransaction() ) { + if( debug ) + log.debug("Journalled acknowledge for: "+messageId+", at: "+location); + acknowledge(messageId, location, key); + } else { + if( debug ) + log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); + synchronized (this) { + inFlightTxLocations.add(location); + } + transactionStore.acknowledge(this, ack, location); + context.getTransaction().addSynchronization(new Synchronization(){ + public void afterCommit() throws Exception { + if( debug ) + log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); + synchronized (RapidTopicMessageStore.this) { + inFlightTxLocations.remove(location); + acknowledge(messageId, location, key); + } + } + public void afterRollback() throws Exception { + if( debug ) + log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); + synchronized (RapidTopicMessageStore.this) { + inFlightTxLocations.remove(location); + } + } + }); + } + + } + + public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { + try { + synchronized(this) { + String subcriberId=getSubscriptionKey(clientId,subscritionName); + String id=messageId.toString(); + ListContainer container=(ListContainer) subscriberAcks.get(subcriberId); + if(container!=null){ + //container.remove(id); + container.removeFirst(); + AtomicInteger count=(AtomicInteger) ackContainer.remove(id); + if(count!=null){ + if(count.decrementAndGet()>0){ + ackContainer.put(id,count); + } else { + // no more references to message messageContainer so remove it + messageContainer.remove(messageId.toString()); + } + } + } + } + } + catch (Throwable e) { + log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); + } + } + + + /** + * @param messageId + * @param location + * @param key + */ + private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { + synchronized(this) { + lastLocation = location; + ackedLastAckLocations.put(key, messageId); + + String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName()); + String id=messageId.toString(); + ListContainer container=(ListContainer) subscriberAcks.get(subcriberId); + if(container!=null){ + //container.remove(id); + container.removeFirst(); + AtomicInteger count=(AtomicInteger) ackContainer.remove(id); + if(count!=null){ + if(count.decrementAndGet()>0){ + ackContainer.put(id,count); + } else { + // no more references to message messageContainer so remove it + messageContainer.remove(messageId.toString()); + } + } + } + } + } + + protected String getSubscriptionKey(String clientId,String subscriberName){ + String result=clientId+":"; + result+=subscriberName!=null?subscriberName:"NOT_SET"; + return result; + } + + + public RecordLocation checkpoint() throws IOException { + + ArrayList cpAckedLastAckLocations; + + // swap out the hash maps.. + synchronized (this) { + cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values()); + this.ackedLastAckLocations = new HashMap(); + } + + RecordLocation rc = super.checkpoint(); + if(!cpAckedLastAckLocations.isEmpty()) { + Collections.sort(cpAckedLastAckLocations); + RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0); + if( rc == null || t.compareTo(rc)<0 ) { + rc = t; + } + } + + return rc; + } + + + public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + String key=getSubscriptionKey(clientId,subscriptionName); + subscriberContainer.remove(key); + ListContainer list=(ListContainer) subscriberAcks.get(key); + for(Iterator i=list.iterator();i.hasNext();){ + String id=i.next().toString(); + AtomicInteger count=(AtomicInteger) ackContainer.remove(id); + if(count!=null){ + if(count.decrementAndGet()>0){ + ackContainer.put(id,count); + }else{ + // no more references to message messageContainer so remove it + messageContainer.remove(id); + } + } + } + } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return (SubscriptionInfo[]) subscriberContainer.values().toArray( + new SubscriptionInfo[subscriberContainer.size()]); + } + + protected void addSubscriberAckContainer(Object key) throws IOException{ + ListContainer container=store.getListContainer(key,"topic-subs"); + Marshaller marshaller=new StringMarshaller(); + container.setMarshaller(marshaller); + subscriberAcks.put(key,container); + } + +} \ No newline at end of file Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java?rev=405807&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java Fri May 12 10:52:51 2006 @@ -0,0 +1,303 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.rapid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.transaction.xa.XAException; + +import org.apache.activeio.journal.RecordLocation; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.JournalTransaction; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.apache.activemq.store.TransactionStore; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class RapidTransactionStore implements TransactionStore { + + private final RapidPersistenceAdapter peristenceAdapter; + ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); + ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); + private boolean doingRecover; + + + public static class TxOperation { + + static final byte ADD_OPERATION_TYPE = 0; + static final byte REMOVE_OPERATION_TYPE = 1; + static final byte ACK_OPERATION_TYPE = 3; + + public byte operationType; + public RapidMessageStore store; + public Object data; + public RecordLocation location; + + public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) { + this.operationType=operationType; + this.store=store; + this.data=data; + this.location = location; + } + + } + /** + * Operations + * @version $Revision: 1.6 $ + */ + public static class Tx { + + private final RecordLocation location; + private ArrayList operations = new ArrayList(); + + public Tx(RecordLocation location) { + this.location=location; + } + + public void add(RapidMessageStore store, Message msg, RecordLocation loc) { + operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc)); + } + + public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) { + operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc)); + } + + public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) { + operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc)); + } + + public Message[] getMessages() { + ArrayList list = new ArrayList(); + for (Iterator iter = operations.iterator(); iter.hasNext();) { + TxOperation op = (TxOperation) iter.next(); + if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { + list.add(op.data); + } + } + Message rc[] = new Message[list.size()]; + list.toArray(rc); + return rc; + } + + public MessageAck[] getAcks() { + ArrayList list = new ArrayList(); + for (Iterator iter = operations.iterator(); iter.hasNext();) { + TxOperation op = (TxOperation) iter.next(); + if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { + list.add(op.data); + } + } + MessageAck rc[] = new MessageAck[list.size()]; + list.toArray(rc); + return rc; + } + + public ArrayList getOperations() { + return operations; + } + + } + + public RapidTransactionStore(RapidPersistenceAdapter adapter) { + this.peristenceAdapter = adapter; + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) + */ + public void prepare(TransactionId txid) throws IOException { + Tx tx = (Tx) inflightTransactions.remove(txid); + if (tx == null) + return; + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true); + preparedTransactions.put(txid, tx); + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) + */ + public void replayPrepare(TransactionId txid) throws IOException { + Tx tx = (Tx) inflightTransactions.remove(txid); + if (tx == null) + return; + preparedTransactions.put(txid, tx); + } + + public Tx getTx(Object txid, RecordLocation location) { + Tx tx = (Tx) inflightTransactions.get(txid); + if (tx == null) { + tx = new Tx(location); + inflightTransactions.put(txid, tx); + } + return tx; + } + + /** + * @throws XAException + * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) + */ + public void commit(TransactionId txid, boolean wasPrepared) throws IOException { + Tx tx; + if (wasPrepared) { + tx = (Tx) preparedTransactions.remove(txid); + } else { + tx = (Tx) inflightTransactions.remove(txid); + } + + if (tx == null) + return; + + if (txid.isXATransaction()) { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), + true); + } else { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), + true); + } + } + + /** + * @throws XAException + * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) + */ + public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { + if (wasPrepared) { + return (Tx) preparedTransactions.remove(txid); + } else { + return (Tx) inflightTransactions.remove(txid); + } + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) + */ + public void rollback(TransactionId txid) throws IOException { + + Tx tx = (Tx) inflightTransactions.remove(txid); + if (tx != null) + tx = (Tx) preparedTransactions.remove(txid); + + if (tx != null) { + if (txid.isXATransaction()) { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), + true); + } else { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), + true); + } + } + + } + + /** + * @throws IOException + * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) + */ + public void replayRollback(TransactionId txid) throws IOException { + if (inflightTransactions.remove(txid) != null) + preparedTransactions.remove(txid); + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + synchronized public void recover(TransactionRecoveryListener listener) throws IOException { + // All the in-flight transactions get rolled back.. + inflightTransactions.clear(); + this.doingRecover = true; + try { + for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { + Object txid = (Object) iter.next(); + Tx tx = (Tx) preparedTransactions.get(txid); + listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); + } + } finally { + this.doingRecover = false; + } + } + + /** + * @param message + * @throws IOException + */ + void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException { + Tx tx = getTx(message.getTransactionId(), location); + tx.add(store, message, location); + } + + /** + * @param ack + * @throws IOException + */ + public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException { + Tx tx = getTx(ack.getTransactionId(), location); + tx.add(store, ack, location); + } + + + public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { + Tx tx = getTx(ack.getTransactionId(), location); + tx.add(store, ack, location); + } + + + public RecordLocation checkpoint() throws IOException { + + // Nothing really to checkpoint.. since, we don't + // checkpoint tx operations in to long term store until they are committed. + + // But we keep track of the first location of an operation + // that was associated with an active tx. The journal can not + // roll over active tx records. + RecordLocation rc = null; + for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { + Tx tx = (Tx) iter.next(); + RecordLocation location = tx.location; + if (rc == null || rc.compareTo(location) < 0) { + rc = location; + } + } + for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { + Tx tx = (Tx) iter.next(); + RecordLocation location = tx.location; + if (rc == null || rc.compareTo(location) < 0) { + rc = location; + } + } + return rc; + } + + public boolean isDoingRecover() { + return doingRecover; + } + + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java ------------------------------------------------------------------------------ svn:executable = *