From commits-return-6821-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Aug 08 19:00:35 2007 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 49871 invoked from network); 8 Aug 2007 19:00:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Aug 2007 19:00:34 -0000 Received: (qmail 68045 invoked by uid 500); 8 Aug 2007 19:00:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 68015 invoked by uid 500); 8 Aug 2007 19:00:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 67994 invoked by uid 99); 8 Aug 2007 19:00:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 12:00:32 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Aug 2007 19:00:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 715911A987E; Wed, 8 Aug 2007 11:59:20 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r563982 [19/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm... Date: Wed, 08 Aug 2007 18:58:13 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070808185920.715911A987E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Aug 8 11:56:59 2007 @@ -60,62 +60,63 @@ /** A MessageStore that we can use to retrieve messages quickly. */ private LinkedHashMap cpAddedMessageIds; - + protected RecordLocation lastLocation; protected HashSet inFlightTxLocations = new HashSet(); private UsageManager usageManager; - - public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { + + public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, + ActiveMQDestination destination) { this.peristenceAdapter = adapter; this.transactionStore = adapter.getTransactionStore(); this.longTermStore = checkpointStore; this.destination = destination; this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); } - + public void setUsageManager(UsageManager usageManager) { this.usageManager = usageManager; longTermStore.setUsageManager(usageManager); } - /** * Not synchronized since the Journal has better throughput if you increase * the number of concurrent writes that it is doing. */ public void addMessage(ConnectionContext context, final Message message) throws IOException { - + final MessageId id = message.getMessageId(); - + final boolean debug = log.isDebugEnabled(); message.incrementReferenceCount(); - + final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message add for: "+id+", at: "+location); + if (!context.isInTransaction()) { + if (debug) + log.debug("Journalled message add for: " + id + ", at: " + location); addMessage(message, location); } else { - if( debug ) - log.debug("Journalled transacted message add for: "+id+", at: "+location); - synchronized( this ) { + if (debug) + log.debug("Journalled transacted message add for: " + id + ", at: " + location); + synchronized (this) { inFlightTxLocations.add(location); } transactionStore.addMessage(this, message, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message add commit for: "+id+", at: "+location); - synchronized( JournalMessageStore.this ) { + context.getTransaction().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + if (debug) + log.debug("Transacted message add commit for: " + id + ", at: " + location); + synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); addMessage(message, location); } } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message add rollback for: "+id+", at: "+location); - synchronized( JournalMessageStore.this ) { + + public void afterRollback() throws Exception { + if (debug) + log.debug("Transacted message add rollback for: " + id + ", at: " + location); + synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); } message.decrementReferenceCount(); @@ -131,17 +132,17 @@ messages.put(id, message); } } - + public void replayAddMessage(ConnectionContext context, Message message) { try { // Only add the message if it has not already been added. Message t = longTermStore.getMessage(message.getMessageId()); - if( t==null ) { + if (t == null) { longTermStore.addMessage(context, message); } - } - catch (Throwable e) { - log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); + } catch (Throwable e) { + log.warn("Could not replay add for message '" + message.getMessageId() + + "'. Message may have already been added. reason: " + e); } } @@ -152,32 +153,36 @@ JournalQueueAck remove = new JournalQueueAck(); remove.setDestination(destination); remove.setMessageAck(ack); - + final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); + if (!context.isInTransaction()) { + if (debug) + log.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); removeMessage(ack, location); } else { - if( debug ) - log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); - synchronized( this ) { + if (debug) + log.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + + location); + synchronized (this) { inFlightTxLocations.add(location); } transactionStore.removeMessage(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); - synchronized( JournalMessageStore.this ) { + context.getTransaction().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + if (debug) + log.debug("Transacted message remove commit for: " + ack.getLastMessageId() + + ", at: " + location); + synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); removeMessage(ack, location); } } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); - synchronized( JournalMessageStore.this ) { + + public void afterRollback() throws Exception { + if (debug) + log.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + + ", at: " + location); + synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); } } @@ -185,12 +190,12 @@ } } - + final void removeMessage(final MessageAck ack, final RecordLocation location) { synchronized (this) { lastLocation = location; MessageId id = ack.getLastMessageId(); - Message message = (Message) messages.remove(id); + Message message = (Message)messages.remove(id); if (message == null) { messageAcks.add(ack); } else { @@ -198,17 +203,17 @@ } } } - + public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { try { // Only remove the message if it has not already been removed. Message t = longTermStore.getMessage(messageAck.getLastMessageId()); - if( t!=null ) { + if (t != null) { longTermStore.removeMessage(context, messageAck); } - } - catch (Throwable e) { - log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); + } catch (Throwable e) { + log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + + "'. Message may have already been acknowledged. reason: " + e); } } @@ -219,14 +224,13 @@ public RecordLocation checkpoint() throws IOException { return checkpoint(null); } - + /** * @return * @throws IOException */ public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException { - RecordLocation rc; final ArrayList cpRemovedMessageLocations; final ArrayList cpActiveJournalLocations; @@ -237,37 +241,37 @@ cpAddedMessageIds = this.messages; cpRemovedMessageLocations = this.messageAcks; - cpActiveJournalLocations=new ArrayList(inFlightTxLocations); - + cpActiveJournalLocations = new ArrayList(inFlightTxLocations); + this.messages = new LinkedHashMap(); - this.messageAcks = new ArrayList(); + this.messageAcks = new ArrayList(); } transactionTemplate.run(new Callback() { public void execute() throws Exception { int size = 0; - + PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); ConnectionContext context = transactionTemplate.getContext(); - + // Checkpoint the added messages. - synchronized(JournalMessageStore.this){ - Iterator iterator=cpAddedMessageIds.values().iterator(); - while(iterator.hasNext()){ - Message message=(Message)iterator.next(); - try{ - longTermStore.addMessage(context,message); - }catch(Throwable e){ - log.warn("Message could not be added to long term store: "+e.getMessage(),e); + synchronized (JournalMessageStore.this) { + Iterator iterator = cpAddedMessageIds.values().iterator(); + while (iterator.hasNext()) { + Message message = (Message)iterator.next(); + try { + longTermStore.addMessage(context, message); + } catch (Throwable e) { + log.warn("Message could not be added to long term store: " + e.getMessage(), e); } - size+=message.getSize(); + size += message.getSize(); message.decrementReferenceCount(); // Commit the batch if it's getting too big - if(size>=maxCheckpointMessageAddSize){ + if (size >= maxCheckpointMessageAddSize) { persitanceAdapter.commitTransaction(context); persitanceAdapter.beginTransaction(context); - size=0; + size = 0; } } } @@ -279,14 +283,14 @@ Iterator iterator = cpRemovedMessageLocations.iterator(); while (iterator.hasNext()) { try { - MessageAck ack = (MessageAck) iterator.next(); + MessageAck ack = (MessageAck)iterator.next(); longTermStore.removeMessage(transactionTemplate.getContext(), ack); } catch (Throwable e) { log.debug("Message could not be removed from long term store: " + e.getMessage(), e); } } - - if( postCheckpointTest!= null ) { + + if (postCheckpointTest != null) { postCheckpointTest.execute(); } } @@ -296,12 +300,12 @@ synchronized (this) { cpAddedMessageIds = null; } - - if( cpActiveJournalLocations.size() > 0 ) { + + if (cpActiveJournalLocations.size() > 0) { Collections.sort(cpActiveJournalLocations); - return (RecordLocation) cpActiveJournalLocations.get(0); + return (RecordLocation)cpActiveJournalLocations.get(0); } - synchronized (this){ + synchronized (this) { return lastLocation; } } @@ -314,15 +318,15 @@ synchronized (this) { // Do we have a still have it in the journal? - answer = (Message) messages.get(identity); - if( answer==null && cpAddedMessageIds!=null ) - answer = (Message) cpAddedMessageIds.get(identity); + answer = (Message)messages.get(identity); + if (answer == null && cpAddedMessageIds != null) + answer = (Message)cpAddedMessageIds.get(identity); } - - if (answer != null ) { + + if (answer != null) { return answer; } - + // If all else fails try the long term message store. return longTermStore.getMessage(identity); } @@ -333,7 +337,7 @@ * updated. * * @param listener - * @throws Exception + * @throws Exception */ public void recover(final MessageRecoveryListener listener) throws Exception { peristenceAdapter.checkpoint(true, true); @@ -341,14 +345,14 @@ } public void start() throws Exception { - if( this.usageManager != null ) + if (this.usageManager != null) this.usageManager.addUsageListener(peristenceAdapter); longTermStore.start(); } public void stop() throws Exception { longTermStore.stop(); - if( this.usageManager != null ) + if (this.usageManager != null) this.usageManager.removeUsageListener(peristenceAdapter); } @@ -366,12 +370,13 @@ peristenceAdapter.checkpoint(true, true); longTermStore.removeAllMessages(context); } - + public ActiveMQDestination getDestination() { return destination; } - public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { + public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, + String messageRef) throws IOException { throw new IOException("The journal does not support message references."); } @@ -381,25 +386,23 @@ /** * @return - * @throws IOException + * @throws IOException * @see org.apache.activemq.store.MessageStore#getMessageCount() */ - public int getMessageCount() throws IOException{ + public int getMessageCount() throws IOException { peristenceAdapter.checkpoint(true, true); return longTermStore.getMessageCount(); } - - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { peristenceAdapter.checkpoint(true, true); - longTermStore.recoverNextMessages(maxReturned,listener); - + longTermStore.recoverNextMessages(maxReturned, listener); + } - - public void resetBatching(){ + public void resetBatching() { longTermStore.resetBatching(); - + } -} \ No newline at end of file +} Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Aug 8 11:56:59 2007 @@ -75,7 +75,6 @@ * other long term persistent storage. * * @org.apache.xbean.XBean - * * @version $Revision: 1.17 $ */ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener { @@ -89,45 +88,45 @@ private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); - + private UsageManager usageManager; long checkpointInterval = 1000 * 60 * 5; long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); private int maxCheckpointWorkers = 10; - private int maxCheckpointMessageAddSize = 1024*1024; + private int maxCheckpointMessageAddSize = 1024 * 1024; private JournalTransactionStore transactionStore = new JournalTransactionStore(this); private ThreadPoolExecutor checkpointExecutor; - + private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private boolean fullCheckPoint; - + private AtomicBoolean started = new AtomicBoolean(false); - private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); - + private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); + final Runnable createPeriodicCheckpointTask() { - return new Runnable() { - public void run() { + return new Runnable() { + public void run() { long lastTime = 0; - synchronized(this) { + synchronized (this) { lastTime = lastCheckpointRequest; } - if( System.currentTimeMillis()>lastTime+checkpointInterval ) { - checkpoint(false, true); - } - } - }; + if (System.currentTimeMillis() > lastTime + checkpointInterval) { + checkpoint(false, true); + } + } + }; } - + public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { this.journal = journal; journal.setJournalEventListener(this); - - checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ + + checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { public boolean iterate() { return doCheckpoint(); } @@ -137,7 +136,8 @@ } /** - * @param usageManager The UsageManager that is controlling the destination's memory usage. + * @param usageManager The UsageManager that is controlling the + * destination's memory usage. */ public void setUsageManager(UsageManager usageManager) { this.usageManager = usageManager; @@ -153,15 +153,14 @@ private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { if (destination.isQueue()) { - return createQueueMessageStore((ActiveMQQueue) destination); - } - else { - return createTopicMessageStore((ActiveMQTopic) destination); + return createQueueMessageStore((ActiveMQQueue)destination); + } else { + return createTopicMessageStore((ActiveMQTopic)destination); } } public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - JournalMessageStore store = (JournalMessageStore) queues.get(destination); + JournalMessageStore store = (JournalMessageStore)queues.get(destination); if (store == null) { MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); store = new JournalMessageStore(this, checkpointStore, destination); @@ -171,7 +170,7 @@ } public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { - JournalTopicMessageStore store = (JournalTopicMessageStore) topics.get(destinationName); + JournalTopicMessageStore store = (JournalTopicMessageStore)topics.get(destinationName); if (store == null) { TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); store = new JournalTopicMessageStore(this, checkpointStore, destinationName); @@ -201,24 +200,25 @@ } public synchronized void start() throws Exception { - if( !started.compareAndSet(false, true) ) + if (!started.compareAndSet(false, true)) { return; - + } + checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable runable) { Thread t = new Thread(runable, "Journal checkpoint worker"); t.setPriority(7); return t; - } + } }); - //checkpointExecutor.allowCoreThreadTimeOut(true); - + // checkpointExecutor.allowCoreThreadTimeOut(true); + this.usageManager.addUsageListener(this); if (longTermPersistence instanceof JDBCPersistenceAdapter) { // Disabled periodic clean up as it deadlocks with the checkpoint // operations. - ((JDBCPersistenceAdapter) longTermPersistence).setCleanupPeriod(0); + ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); } longTermPersistence.start(); @@ -226,23 +226,23 @@ recover(); // Do a checkpoint periodically. - Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10); + Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); } public void stop() throws Exception { - + this.usageManager.removeUsageListener(this); - if( !started.compareAndSet(true, false) ) + if (!started.compareAndSet(true, false)) return; - + Scheduler.cancel(periodicCheckpointTask); // Take one final checkpoint and stop checkpoint processing. checkpoint(true, true); - checkpointTask.shutdown(); + checkpointTask.shutdown(); checkpointExecutor.shutdown(); - + queues.clear(); topics.clear(); @@ -253,7 +253,7 @@ firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); } longTermPersistence.stop(); - + if (firstException != null) { throw firstException; } @@ -287,83 +287,86 @@ /** * When we checkpoint we move all the journalled data to long term storage. - * @param stopping * + * @param stopping * @param b */ public void checkpoint(boolean sync, boolean fullCheckpoint) { try { - if (journal == null ) + if (journal == null) throw new IllegalStateException("Journal is closed."); - + long now = System.currentTimeMillis(); CountDownLatch latch = null; - synchronized(this) { + synchronized (this) { latch = nextCheckpointCountDownLatch; lastCheckpointRequest = now; - if( fullCheckpoint ) { - this.fullCheckPoint = true; + if (fullCheckpoint) { + this.fullCheckPoint = true; } } - + checkpointTask.wakeup(); - + if (sync) { log.debug("Waking for checkpoint to complete."); latch.await(); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Request to start checkpoint failed: " + e, e); } } - + public void checkpoint(boolean sync) { - checkpoint(sync,sync); + checkpoint(sync, sync); } - + /** * This does the actual checkpoint. - * @return + * + * @return */ public boolean doCheckpoint() { CountDownLatch latch = null; boolean fullCheckpoint; - synchronized(this) { + synchronized (this) { latch = nextCheckpointCountDownLatch; nextCheckpointCountDownLatch = new CountDownLatch(1); fullCheckpoint = this.fullCheckPoint; - this.fullCheckPoint=false; - } + this.fullCheckPoint = false; + } try { log.debug("Checkpoint started."); RecordLocation newMark = null; - ArrayList futureTasks = new ArrayList(queues.size()+topics.size()); - + ArrayList futureTasks = new ArrayList(queues.size() + topics.size()); + // - // We do many partial checkpoints (fullCheckpoint==false) to move topic messages - // to long term store as soon as possible. + // We do many partial checkpoints (fullCheckpoint==false) to move + // topic messages + // to long term store as soon as possible. // - // We want to avoid doing that for queue messages since removes the come in the same - // checkpoint cycle will nullify the previous message add. Therefore, we only + // We want to avoid doing that for queue messages since removes the + // come in the same + // checkpoint cycle will nullify the previous message add. + // Therefore, we only // checkpoint queues on the fullCheckpoint cycles. // - if( fullCheckpoint ) { + if (fullCheckpoint) { Iterator iterator = queues.values().iterator(); while (iterator.hasNext()) { try { - final JournalMessageStore ms = (JournalMessageStore) iterator.next(); + final JournalMessageStore ms = (JournalMessageStore)iterator.next(); FutureTask task = new FutureTask(new Callable() { public Object call() throws Exception { return ms.checkpoint(); - }}); + } + }); futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { + checkpointExecutor.execute(task); + } catch (Exception e) { log.error("Failed to checkpoint a message store: " + e, e); } } @@ -372,25 +375,25 @@ Iterator iterator = topics.values().iterator(); while (iterator.hasNext()) { try { - final JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next(); + final JournalTopicMessageStore ms = (JournalTopicMessageStore)iterator.next(); FutureTask task = new FutureTask(new Callable() { public Object call() throws Exception { return ms.checkpoint(); - }}); + } + }); futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { + checkpointExecutor.execute(task); + } catch (Exception e) { log.error("Failed to checkpoint a message store: " + e, e); } } try { for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { - FutureTask ft = (FutureTask) iter.next(); - RecordLocation mark = (RecordLocation) ft.get(); + FutureTask ft = (FutureTask)iter.next(); + RecordLocation mark = (RecordLocation)ft.get(); // We only set a newMark on full checkpoints. - if( fullCheckpoint ) { + if (fullCheckpoint) { if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { newMark = mark; } @@ -399,38 +402,36 @@ } catch (Throwable e) { log.error("Failed to checkpoint a message store: " + e, e); } - - if( fullCheckpoint ) { + if (fullCheckpoint) { try { if (newMark != null) { log.debug("Marking journal at: " + newMark); journal.setMark(newMark, true); } - } - catch (Exception e) { + } catch (Exception e) { log.error("Failed to mark the Journal: " + e, e); } - + if (longTermPersistence instanceof JDBCPersistenceAdapter) { - // We may be check pointing more often than the checkpointInterval if under high use + // We may be check pointing more often than the + // checkpointInterval if under high use // But we don't want to clean up the db that often. long now = System.currentTimeMillis(); - if( now > lastCleanup+checkpointInterval ) { + if (now > lastCleanup + checkpointInterval) { lastCleanup = now; - ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); + ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); } } } log.debug("Checkpoint done."); - } - finally { + } finally { latch.countDown(); } - synchronized(this) { + synchronized (this) { return this.fullCheckPoint; - } + } } @@ -441,13 +442,11 @@ */ public DataStructure readCommand(RecordLocation location) throws IOException { try { - Packet packet = journal.read(location); - return (DataStructure) wireFormat.unmarshal(toByteSequence(packet)); - } - catch (InvalidRecordLocationException e) { + Packet packet = journal.read(location); + return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); + } catch (InvalidRecordLocationException e) { throw createReadException(location, e); - } - catch (IOException e) { + } catch (IOException e) { throw createReadException(location, e); } } @@ -472,49 +471,43 @@ // While we have records in the journal. while ((pos = journal.getNextRecordLocation(pos)) != null) { Packet data = journal.read(pos); - DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data)); + DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); - if (c instanceof Message ) { - Message message = (Message) c; - JournalMessageStore store = (JournalMessageStore) createMessageStore(message.getDestination()); - if ( message.isInTransaction()) { + if (c instanceof Message) { + Message message = (Message)c; + JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); + if (message.isInTransaction()) { transactionStore.addMessage(store, message, pos); - } - else { + } else { store.replayAddMessage(context, message); transactionCounter++; } } else { switch (c.getDataStructureType()) { - case JournalQueueAck.DATA_STRUCTURE_TYPE: - { - JournalQueueAck command = (JournalQueueAck) c; - JournalMessageStore store = (JournalMessageStore) createMessageStore(command.getDestination()); + case JournalQueueAck.DATA_STRUCTURE_TYPE: { + JournalQueueAck command = (JournalQueueAck)c; + JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); if (command.getMessageAck().isInTransaction()) { transactionStore.removeMessage(store, command.getMessageAck(), pos); - } - else { + } else { store.replayRemoveMessage(context, command.getMessageAck()); transactionCounter++; } } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: - { - JournalTopicAck command = (JournalTopicAck) c; - JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(command.getDestination()); + break; + case JournalTopicAck.DATA_STRUCTURE_TYPE: { + JournalTopicAck command = (JournalTopicAck)c; + JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); if (command.getTransactionId() != null) { transactionStore.acknowledge(store, command, pos); - } - else { + } else { store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); transactionCounter++; } } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: - { - JournalTransaction command = (JournalTransaction) c; + break; + case JournalTransaction.DATA_STRUCTURE_TYPE: { + JournalTransaction command = (JournalTransaction)c; try { // Try to replay the packet. switch (command.getType()) { @@ -525,23 +518,23 @@ case JournalTransaction.LOCAL_COMMIT: Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); if (tx == null) - break; // We may be trying to replay a commit that - // was already committed. + break; // We may be trying to replay a commit + // that + // was already committed. // Replay the committed operations. tx.getOperations(); for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); + TxOperation op = (TxOperation)iter.next(); if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - op.store.replayAddMessage(context, (Message) op.data); + op.store.replayAddMessage(context, (Message)op.data); } if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - op.store.replayRemoveMessage(context, (MessageAck) op.data); + op.store.replayRemoveMessage(context, (MessageAck)op.data); } if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { - JournalTopicAck ack = (JournalTopicAck) op.data; - ((JournalTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack - .getMessageId()); + JournalTopicAck ack = (JournalTopicAck)op.data; + ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); } } transactionCounter++; @@ -551,14 +544,13 @@ transactionStore.replayRollback(command.getTransactionId()); break; } - } - catch (IOException e) { + } catch (IOException e) { log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); } } - break; + break; case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace = (JournalTrace) c; + JournalTrace trace = (JournalTrace)c; log.debug("TRACE Entry: " + trace.getMessage()); break; default: @@ -590,14 +582,13 @@ } /** - * * @param command * @param sync * @return * @throws IOException */ public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { - if( started.get() ) + if (started.get()) return journal.write(toPacket(wireFormat.marshal(command)), sync); throw new IOException("closed"); } @@ -609,19 +600,19 @@ } public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { - newPercentUsage = ((newPercentUsage)/10)*10; - oldPercentUsage = ((oldPercentUsage)/10)*10; + newPercentUsage = ((newPercentUsage) / 10) * 10; + oldPercentUsage = ((oldPercentUsage) / 10) * 10; if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { boolean sync = newPercentUsage >= 90; checkpoint(sync, true); } } - + public JournalTransactionStore getTransactionStore() { return transactionStore; } - public void deleteAllMessages() throws IOException { + public void deleteAllMessages() throws IOException { try { JournalTrace trace = new JournalTrace(); trace.setMessage("DELETED"); @@ -661,28 +652,28 @@ } public void setUseExternalMessageReferences(boolean enable) { - if( enable ) + if (enable) throw new IllegalArgumentException("The journal does not support message references."); } - + public Packet toPacket(ByteSequence sequence) { - return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); + return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); } - + public ByteSequence toByteSequence(Packet packet) { - org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); - return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); + org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); + return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); } - - public void setBrokerName(String brokerName){ + + public void setBrokerName(String brokerName) { longTermPersistence.setBrokerName(brokerName); } - - public String toString(){ + + public String toString() { return "JournalPersistenceAdapator(" + longTermPersistence + ")"; } - public void setDirectory(File dir){ + public void setDirectory(File dir) { } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Wed Aug 8 11:56:59 2007 @@ -34,34 +34,34 @@ /** * Factory class that can create PersistenceAdapter objects. - * + * * @version $Revision: 1.4 $ */ public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { - - private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000; + + private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class); - - private int journalLogFileSize = 1024*1024*20; + + private int journalLogFileSize = 1024 * 1024 * 20; private int journalLogFiles = 2; private TaskRunnerFactory taskRunnerFactory; private Journal journal; - private boolean useJournal=true; - private boolean useQuickJournal=false; + private boolean useJournal = true; + private boolean useQuickJournal = false; private File journalArchiveDirectory; - private boolean failIfJournalIsLocked=false; + private boolean failIfJournalIsLocked = false; private int journalThreadPriority = Thread.MAX_PRIORITY; private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); - + public PersistenceAdapter createPersistenceAdapter() throws IOException { jdbcPersistenceAdapter.setDataSource(getDataSource()); - - if( !useJournal ) { + + if (!useJournal) { return jdbcPersistenceAdapter; } return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); - + } public int getJournalLogFiles() { @@ -81,13 +81,13 @@ /** * Sets the size of the journal log files - * + * * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" */ public void setJournalLogFileSize(int journalLogFileSize) { this.journalLogFileSize = journalLogFileSize; } - + public JDBCPersistenceAdapter getJdbcAdapter() { return jdbcPersistenceAdapter; } @@ -101,8 +101,9 @@ } /** - * Enables or disables the use of the journal. The default is to use the journal - * + * Enables or disables the use of the journal. The default is to use the + * journal + * * @param useJournal */ public void setUseJournal(boolean useJournal) { @@ -110,8 +111,9 @@ } public TaskRunnerFactory getTaskRunnerFactory() { - if( taskRunnerFactory == null ) { - taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000); + if (taskRunnerFactory == null) { + taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, + true, 1000); } return taskRunnerFactory; } @@ -121,7 +123,7 @@ } public Journal getJournal() throws IOException { - if( journal == null ) { + if (journal == null) { createJournal(); } return journal; @@ -132,7 +134,7 @@ } public File getJournalArchiveDirectory() { - if( journalArchiveDirectory == null && useQuickJournal ) { + if (journalArchiveDirectory == null && useQuickJournal) { journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); } return journalArchiveDirectory; @@ -142,15 +144,14 @@ this.journalArchiveDirectory = journalArchiveDirectory; } - public boolean isUseQuickJournal() { return useQuickJournal; } /** - * Enables or disables the use of quick journal, which keeps messages in the journal and just - * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside - * long term in the JDBC database. + * Enables or disables the use of quick journal, which keeps messages in the + * journal and just stores a reference to the messages in JDBC. Defaults to + * false so that messages actually reside long term in the JDBC database. */ public void setUseQuickJournal(boolean useQuickJournal) { this.useQuickJournal = useQuickJournal; @@ -167,6 +168,7 @@ public Statements getStatements() { return jdbcPersistenceAdapter.getStatements(); } + public void setStatements(Statements statements) { jdbcPersistenceAdapter.setStatements(statements); } @@ -176,7 +178,8 @@ } /** - * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default. + * Sets whether or not an exclusive database lock should be used to enable + * JDBC Master/Slave. Enabled by default. */ public void setUseDatabaseLock(boolean useDatabaseLock) { jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); @@ -192,16 +195,16 @@ public void setCreateTablesOnStartup(boolean createTablesOnStartup) { jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); } - - public int getJournalThreadPriority(){ + + public int getJournalThreadPriority() { return journalThreadPriority; } /** * Sets the thread priority of the journal thread */ - public void setJournalThreadPriority(int journalThreadPriority){ - this.journalThreadPriority=journalThreadPriority; + public void setJournalThreadPriority(int journalThreadPriority) { + this.journalThreadPriority = journalThreadPriority; } /** @@ -209,15 +212,18 @@ */ protected void createJournal() throws IOException { File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); - if( failIfJournalIsLocked ) { - journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory()); + if (failIfJournalIsLocked) { + journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, + getJournalArchiveDirectory()); } else { - while( true ) { + while (true) { try { - journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory()); + journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, + getJournalArchiveDirectory()); break; } catch (JournalLockedException e) { - log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked."); + log.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + + " seconds for the journal to be unlocked."); try { Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); } catch (InterruptedException e1) { @@ -226,7 +232,5 @@ } } } - - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Wed Aug 8 11:56:59 2007 @@ -41,26 +41,29 @@ * @version $Revision: 1.13 $ */ public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { - + private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class); private TopicMessageStore longTermStore; - private HashMap ackedLastAckLocations = new HashMap(); - - public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, ActiveMQTopic destinationName) { + private HashMap ackedLastAckLocations = new HashMap(); + + public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, + ActiveMQTopic destinationName) { super(adapter, checkpointStore, destinationName); this.longTermStore = checkpointStore; } - - public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { + + public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) + throws Exception { this.peristenceAdapter.checkpoint(true, true); longTermStore.recoverSubscription(clientId, subscriptionName, listener); } - - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{ + + public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, + MessageRecoveryListener listener) throws Exception { this.peristenceAdapter.checkpoint(true, true); - longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener); - + longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); + } public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { @@ -75,66 +78,69 @@ public void addMessage(ConnectionContext context, Message message) throws IOException { super.addMessage(context, message); } - + /** */ - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, + final MessageId messageId) throws IOException { final boolean debug = log.isDebugEnabled(); - + JournalTopicAck ack = new JournalTopicAck(); ack.setDestination(destination); ack.setMessageId(messageId); ack.setMessageSequenceId(messageId.getBrokerSequenceId()); ack.setSubscritionName(subscriptionName); ack.setClientId(clientId); - ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null); + ack.setTransactionId(context.getTransaction() != null + ? context.getTransaction().getTransactionId() : null); final RecordLocation location = peristenceAdapter.writeCommand(ack, false); - - final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled acknowledge for: "+messageId+", at: "+location); + + final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); + if (!context.isInTransaction()) { + if (debug) + log.debug("Journalled acknowledge for: " + messageId + ", at: " + location); acknowledge(messageId, location, key); } else { - if( debug ) - log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); + if (debug) + log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); synchronized (this) { inFlightTxLocations.add(location); } transactionStore.acknowledge(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); + context.getTransaction().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + if (debug) + log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); synchronized (JournalTopicMessageStore.this) { inFlightTxLocations.remove(location); acknowledge(messageId, location, key); } } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); + + public void afterRollback() throws Exception { + if (debug) + log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); synchronized (JournalTopicMessageStore.this) { inFlightTxLocations.remove(location); } } }); } - + } - - public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { + + public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, + MessageId messageId) { try { SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); - if( sub != null ) { + if (sub != null) { longTermStore.acknowledge(context, clientId, subscritionName, messageId); } - } - catch (Throwable e) { - log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); + } catch (Throwable e) { + log.debug("Could not replay acknowledge for message '" + messageId + + "'. Message may have already been acknowledged. reason: " + e); } } - /** * @param messageId @@ -142,15 +148,15 @@ * @param key */ protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { - synchronized(this) { - lastLocation = location; - ackedLastAckLocations.put(key, messageId); - } + synchronized (this) { + lastLocation = location; + ackedLastAckLocations.put(key, messageId); + } } - + public RecordLocation checkpoint() throws IOException { - - final HashMap cpAckedLastAckLocations; + + final HashMap cpAckedLastAckLocations; // swap out the hash maps.. synchronized (this) { @@ -158,15 +164,16 @@ this.ackedLastAckLocations = new HashMap(); } - return super.checkpoint( new Callback() { + return super.checkpoint(new Callback() { public void execute() throws Exception { // Checkpoint the acknowledged messages. Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); while (iterator.hasNext()) { - SubscriptionKey subscriptionKey = (SubscriptionKey) iterator.next(); - MessageId identity = (MessageId) cpAckedLastAckLocations.get(subscriptionKey); - longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); + SubscriptionKey subscriptionKey = (SubscriptionKey)iterator.next(); + MessageId identity = (MessageId)cpAckedLastAckLocations.get(subscriptionKey); + longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, + subscriptionKey.subscriptionName, identity); } } @@ -175,30 +182,27 @@ } /** - * @return Returns the longTermStore. - */ - public TopicMessageStore getLongTermTopicMessageStore() { - return longTermStore; - } + * @return Returns the longTermStore. + */ + public TopicMessageStore getLongTermTopicMessageStore() { + return longTermStore; + } public void deleteSubscription(String clientId, String subscriptionName) throws IOException { longTermStore.deleteSubscription(clientId, subscriptionName); } - + public SubscriptionInfo[] getAllSubscriptions() throws IOException { return longTermStore.getAllSubscriptions(); } - - public int getMessageCount(String clientId,String subscriberName) throws IOException{ + public int getMessageCount(String clientId, String subscriberName) throws IOException { this.peristenceAdapter.checkpoint(true, true); - return longTermStore.getMessageCount(clientId,subscriberName); - } - - public void resetBatching(String clientId,String subscriptionName) { - longTermStore.resetBatching(clientId,subscriptionName); + return longTermStore.getMessageCount(clientId, subscriberName); } - + public void resetBatching(String clientId, String subscriptionName) { + longTermStore.resetBatching(clientId, subscriptionName); + } -} \ No newline at end of file +} Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Wed Aug 8 11:56:59 2007 @@ -36,7 +36,6 @@ import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; - /** */ public class JournalTransactionStore implements TransactionStore { @@ -46,26 +45,27 @@ Map preparedTransactions = new LinkedHashMap(); private boolean doingRecover; - public static class TxOperation { - - static final byte ADD_OPERATION_TYPE = 0; - static final byte REMOVE_OPERATION_TYPE = 1; - static final byte ACK_OPERATION_TYPE = 3; - + + static final byte ADD_OPERATION_TYPE = 0; + static final byte REMOVE_OPERATION_TYPE = 1; + static final byte ACK_OPERATION_TYPE = 3; + public byte operationType; public JournalMessageStore store; public Object data; - + public TxOperation(byte operationType, JournalMessageStore store, Object data) { - this.operationType=operationType; - this.store=store; - this.data=data; + this.operationType = operationType; + this.store = store; + this.data = data; } - + } + /** * Operations + * * @version $Revision: 1.6 $ */ public static class Tx { @@ -74,7 +74,7 @@ private ArrayList operations = new ArrayList(); public Tx(RecordLocation location) { - this.location=location; + this.location = location; } public void add(JournalMessageStore store, Message msg) { @@ -88,12 +88,12 @@ public void add(JournalTopicMessageStore store, JournalTopicAck ack) { operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); } - + public Message[] getMessages() { ArrayList list = new ArrayList(); for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); - if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { + TxOperation op = (TxOperation)iter.next(); + if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { list.add(op.data); } } @@ -105,8 +105,8 @@ public MessageAck[] getAcks() { ArrayList list = new ArrayList(); for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); - if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { + TxOperation op = (TxOperation)iter.next(); + if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { list.add(op.data); } } @@ -129,43 +129,44 @@ * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ - public void prepare(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=(Tx)inflightTransactions.remove(txid); + public void prepare(TransactionId txid) throws IOException { + Tx tx = null; + synchronized (inflightTransactions) { + tx = (Tx)inflightTransactions.remove(txid); } - if(tx==null) + if (tx == null) return; - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); - synchronized(preparedTransactions){ - preparedTransactions.put(txid,tx); + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), + true); + synchronized (preparedTransactions) { + preparedTransactions.put(txid, tx); } } - + /** * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ - public void replayPrepare(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=(Tx)inflightTransactions.remove(txid); + public void replayPrepare(TransactionId txid) throws IOException { + Tx tx = null; + synchronized (inflightTransactions) { + tx = (Tx)inflightTransactions.remove(txid); } - if(tx==null) + if (tx == null) return; - synchronized(preparedTransactions){ - preparedTransactions.put(txid,tx); + synchronized (preparedTransactions) { + preparedTransactions.put(txid, tx); } } - public Tx getTx(Object txid,RecordLocation location){ - Tx tx=null; - synchronized(inflightTransactions){ - tx=(Tx)inflightTransactions.get(txid); + public Tx getTx(Object txid, RecordLocation location) { + Tx tx = null; + synchronized (inflightTransactions) { + tx = (Tx)inflightTransactions.get(txid); } - if(tx==null){ - tx=new Tx(location); - inflightTransactions.put(txid,tx); + if (tx == null) { + tx = new Tx(location); + inflightTransactions.put(txid, tx); } return tx; } @@ -174,24 +175,25 @@ * @throws XAException * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ - public void commit(TransactionId txid,boolean wasPrepared) throws IOException{ + public void commit(TransactionId txid, boolean wasPrepared) throws IOException { Tx tx; - if(wasPrepared){ - synchronized(preparedTransactions){ - tx=(Tx)preparedTransactions.remove(txid); + if (wasPrepared) { + synchronized (preparedTransactions) { + tx = (Tx)preparedTransactions.remove(txid); } - }else{ - synchronized(inflightTransactions){ - tx=(Tx)inflightTransactions.remove(txid); + } else { + synchronized (inflightTransactions) { + tx = (Tx)inflightTransactions.remove(txid); } } - if(tx==null) + if (tx == null) return; - if(txid.isXATransaction()){ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); - }else{ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), - true); + if (txid.isXATransaction()) { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, + wasPrepared), true); + } else { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, + wasPrepared), true); } } @@ -199,13 +201,13 @@ * @throws XAException * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ - public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ - if(wasPrepared){ - synchronized(preparedTransactions){ + public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { + if (wasPrepared) { + synchronized (preparedTransactions) { return (Tx)preparedTransactions.remove(txid); } - }else{ - synchronized(inflightTransactions){ + } else { + synchronized (inflightTransactions) { return (Tx)inflightTransactions.remove(txid); } } @@ -215,21 +217,22 @@ * @throws IOException * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ - public void rollback(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=(Tx)inflightTransactions.remove(txid); - } - if(tx!=null) - synchronized(preparedTransactions){ - tx=(Tx)preparedTransactions.remove(txid); - } - if(tx!=null){ - if(txid.isXATransaction()){ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); - }else{ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), - true); + public void rollback(TransactionId txid) throws IOException { + Tx tx = null; + synchronized (inflightTransactions) { + tx = (Tx)inflightTransactions.remove(txid); + } + if (tx != null) + synchronized (preparedTransactions) { + tx = (Tx)preparedTransactions.remove(txid); + } + if (tx != null) { + if (txid.isXATransaction()) { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, + false), true); + } else { + peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, + txid, false), true); } } } @@ -238,42 +241,42 @@ * @throws IOException * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ - public void replayRollback(TransactionId txid) throws IOException{ - boolean inflight=false; - synchronized(inflightTransactions){ - inflight=inflightTransactions.remove(txid)!=null; + public void replayRollback(TransactionId txid) throws IOException { + boolean inflight = false; + synchronized (inflightTransactions) { + inflight = inflightTransactions.remove(txid) != null; } - if(inflight){ - synchronized(preparedTransactions){ + if (inflight) { + synchronized (preparedTransactions) { preparedTransactions.remove(txid); } } } - + public void start() throws Exception { } public void stop() throws Exception { } - - synchronized public void recover(TransactionRecoveryListener listener) throws IOException{ + + synchronized public void recover(TransactionRecoveryListener listener) throws IOException { // All the in-flight transactions get rolled back.. - synchronized(inflightTransactions){ + synchronized (inflightTransactions) { inflightTransactions.clear(); } - this.doingRecover=true; - try{ - Map txs=null; - synchronized(preparedTransactions){ - txs=new LinkedHashMap(preparedTransactions); - } - for(Iterator iter=txs.keySet().iterator();iter.hasNext();){ - Object txid=(Object)iter.next(); - Tx tx=(Tx)txs.get(txid); - listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); + this.doingRecover = true; + try { + Map txs = null; + synchronized (preparedTransactions) { + txs = new LinkedHashMap(preparedTransactions); + } + for (Iterator iter = txs.keySet().iterator(); iter.hasNext();) { + Object txid = (Object)iter.next(); + Tx tx = (Tx)txs.get(txid); + listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); } - }finally{ - this.doingRecover=false; + } finally { + this.doingRecover = false; } } @@ -290,40 +293,40 @@ * @param ack * @throws IOException */ - public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) throws IOException { + public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) + throws IOException { Tx tx = getTx(ack.getTransactionId(), location); tx.add(store, ack); } - - + public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { Tx tx = getTx(ack.getTransactionId(), location); tx.add(store, ack); } - - public RecordLocation checkpoint() throws IOException{ + public RecordLocation checkpoint() throws IOException { // Nothing really to checkpoint.. since, we don't - // checkpoint tx operations in to long term store until they are committed. + // checkpoint tx operations in to long term store until they are + // committed. // But we keep track of the first location of an operation // that was associated with an active tx. The journal can not // roll over active tx records. - RecordLocation rc=null; - synchronized(inflightTransactions){ - for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ - Tx tx=(Tx)iter.next(); - RecordLocation location=tx.location; - if(rc==null||rc.compareTo(location)<0){ - rc=location; + RecordLocation rc = null; + synchronized (inflightTransactions) { + for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { + Tx tx = (Tx)iter.next(); + RecordLocation location = tx.location; + if (rc == null || rc.compareTo(location) < 0) { + rc = location; } } } - synchronized(preparedTransactions){ - for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ - Tx tx=(Tx)iter.next(); - RecordLocation location=tx.location; - if(rc==null||rc.compareTo(location)<0){ - rc=location; + synchronized (preparedTransactions) { + for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { + Tx tx = (Tx)iter.next(); + RecordLocation location = tx.location; + if (rc == null || rc.compareTo(location) < 0) { + rc = location; } } return rc; @@ -333,6 +336,5 @@ public boolean isDoingRecover() { return doingRecover; } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java Wed Aug 8 11:56:59 2007 @@ -28,35 +28,36 @@ /** * Marshall an AMQTx + * * @version $Revision: 1.10 $ */ -public class AMQTxMarshaller implements Marshaller{ +public class AMQTxMarshaller implements Marshaller { private WireFormat wireFormat; - public AMQTxMarshaller(WireFormat wireFormat){ - this.wireFormat=wireFormat; + public AMQTxMarshaller(WireFormat wireFormat) { + this.wireFormat = wireFormat; } - public AMQTx readPayload(DataInput dataIn) throws IOException{ - Location location=new Location(); + public AMQTx readPayload(DataInput dataIn) throws IOException { + Location location = new Location(); location.readExternal(dataIn); - AMQTx result=new AMQTx(location); - int size=dataIn.readInt(); - for(int i=0;i list=amqtx.getOperations(); + List list = amqtx.getOperations(); dataOut.writeInt(list.size()); - for(AMQTxOperation op:list){ - op.writeExternal(wireFormat,dataOut); + for (AMQTxOperation op : list) { + op.writeExternal(wireFormat, dataOut); } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Wed Aug 8 11:56:59 2007 @@ -22,20 +22,19 @@ import org.apache.activemq.kaha.Marshaller; import java.util.concurrent.atomic.AtomicInteger; - /** * Marshall an AtomicInteger + * * @version $Revision: 1.10 $ */ -public class AtomicIntegerMarshaller implements Marshaller{ - +public class AtomicIntegerMarshaller implements Marshaller { + + public void writePayload(AtomicInteger ai, DataOutput dataOut) throws IOException { + dataOut.writeInt(ai.get()); - public void writePayload(AtomicInteger ai,DataOutput dataOut) throws IOException{ - dataOut.writeInt(ai.get()); - } - public AtomicInteger readPayload(DataInput dataIn) throws IOException{ + public AtomicInteger readPayload(DataInput dataIn) throws IOException { int value = dataIn.readInt(); return new AtomicInteger(value); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java Wed Aug 8 11:56:59 2007 @@ -22,59 +22,56 @@ * * @version $Revision: 1.10 $ */ -public class ConsumerMessageRef{ +public class ConsumerMessageRef { private MessageId messageId; private StoreEntry messageEntry; private StoreEntry ackEntry; - + /** * @return the ackEntry */ - public StoreEntry getAckEntry(){ + public StoreEntry getAckEntry() { return this.ackEntry; } - + /** * @param ackEntry the ackEntry to set */ - public void setAckEntry(StoreEntry ackEntry){ - this.ackEntry=ackEntry; + public void setAckEntry(StoreEntry ackEntry) { + this.ackEntry = ackEntry; } - + /** * @return the messageEntry */ - public StoreEntry getMessageEntry(){ + public StoreEntry getMessageEntry() { return this.messageEntry; } - + /** * @param messageEntry the messageEntry to set */ - public void setMessageEntry(StoreEntry messageEntry){ - this.messageEntry=messageEntry; + public void setMessageEntry(StoreEntry messageEntry) { + this.messageEntry = messageEntry; } - /** * @return the messageId */ - public MessageId getMessageId(){ + public MessageId getMessageId() { return this.messageId; } - /** * @param messageId the messageId to set */ - public void setMessageId(MessageId messageId){ - this.messageId=messageId; + public void setMessageId(MessageId messageId) { + this.messageId = messageId; } - + public String toString() { - return "ConsumerMessageRef[" + messageId +"]"; + return "ConsumerMessageRef[" + messageId + "]"; } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java Wed Aug 8 11:56:59 2007 @@ -23,31 +23,30 @@ import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.impl.index.IndexItem; - /** * Marshall a TopicSubAck + * * @version $Revision: 1.10 $ */ -public class ConsumerMessageRefMarshaller implements Marshaller{ - +public class ConsumerMessageRefMarshaller implements Marshaller { /** * @param object * @param dataOut * @throws IOException - * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput) + * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, + * java.io.DataOutput) */ - public void writePayload(Object object,DataOutput dataOut) throws IOException{ - ConsumerMessageRef ref = (ConsumerMessageRef) object; - dataOut.writeUTF(ref.getMessageId().toString()); - IndexItem item = (IndexItem)ref.getMessageEntry(); - dataOut.writeLong(item.getOffset()); - item.write(dataOut); - item = (IndexItem)ref.getAckEntry(); - dataOut.writeLong(item.getOffset()); - item.write(dataOut); - - + public void writePayload(Object object, DataOutput dataOut) throws IOException { + ConsumerMessageRef ref = (ConsumerMessageRef)object; + dataOut.writeUTF(ref.getMessageId().toString()); + IndexItem item = (IndexItem)ref.getMessageEntry(); + dataOut.writeLong(item.getOffset()); + item.write(dataOut); + item = (IndexItem)ref.getAckEntry(); + dataOut.writeLong(item.getOffset()); + item.write(dataOut); + } /** @@ -56,7 +55,7 @@ * @throws IOException * @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput) */ - public Object readPayload(DataInput dataIn) throws IOException{ + public Object readPayload(DataInput dataIn) throws IOException { ConsumerMessageRef ref = new ConsumerMessageRef(); ref.setMessageId(new MessageId(dataIn.readUTF())); IndexItem item = new IndexItem(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Wed Aug 8 11:56:59 2007 @@ -22,18 +22,18 @@ import org.apache.activemq.kaha.Marshaller; - /** * Marshall an Integer + * * @version $Revision: 1.10 $ */ public class IntegerMarshaller implements Marshaller { - - public void writePayload(Integer object,DataOutput dataOut) throws IOException{ - dataOut.writeInt(object.intValue()); + + public void writePayload(Integer object, DataOutput dataOut) throws IOException { + dataOut.writeInt(object.intValue()); } - public Integer readPayload(DataInput dataIn) throws IOException{ + public Integer readPayload(DataInput dataIn) throws IOException { return dataIn.readInt(); } }