activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r600891 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/kaha/ src/main/java/org/apache/activemq/kaha/impl/ src/main/java/org...
Date Tue, 04 Dec 2007 11:28:00 GMT
Author: rajdavies
Date: Tue Dec  4 03:27:58 2007
New Revision: 600891

URL: http://svn.apache.org/viewvc?rev=600891&view=rev
Log:
Changes to address memory usage for large transactions for:
https://issues.apache.org/activemq/browse/AMQ-1490

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Dec  4 03:27:58 2007
@@ -404,6 +404,11 @@
             <exclude>**/nio/**</exclude>
              <!-- A test used for memory profiling only. -->
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+             
+             <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+             
+             <exclude>**/amq1490/*</exclude>
+             <exclude>**/archive/*</exclude>
 
             <exclude>**/AMQDeadlockTest3.*</exclude>
           </excludes>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Dec  4 03:27:58 2007
@@ -113,7 +113,7 @@
     private TaskRunnerFactory persistenceTaskRunnerFactory;
     private SystemUsage systemUsage;
     private SystemUsage producerSystemUsage;
-    private SystemUsage storeSystemUsage;
+    private SystemUsage consumerSystemUsaage;
     private PersistenceAdapter persistenceAdapter;
     private PersistenceAdapterFactory persistenceFactory;
     private DestinationFactory destinationFactory;
@@ -668,23 +668,23 @@
      * @throws IOException 
      */
     public SystemUsage getConsumerSystemUsage() throws IOException {
-        if (this.storeSystemUsage == null) {
-            this.storeSystemUsage = new SystemUsage(getSystemUsage(), "Store");
-            this.storeSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
-            addService(this.storeSystemUsage);
+        if (this.consumerSystemUsaage == null) {
+            this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
+            this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(0.5f);
+            addService(this.consumerSystemUsaage);
         }
-        return this.storeSystemUsage;
+        return this.consumerSystemUsaage;
     }
 
     /**
-     * @param storeSystemUsage the storeSystemUsage to set
+     * @param consumerSystemUsaage the storeSystemUsage to set
      */
-    public void setConsumerSystemUsage(SystemUsage storeSystemUsage) {
-        if (this.storeSystemUsage != null) {
-            removeService(this.storeSystemUsage);
+    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
+        if (this.consumerSystemUsaage != null) {
+            removeService(this.consumerSystemUsaage);
         }
-        this.storeSystemUsage = storeSystemUsage;
-        addService(this.storeSystemUsage);
+        this.consumerSystemUsaage = consumerSystemUsaage;
+        addService(this.consumerSystemUsaage);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Dec  4 03:27:58 2007
@@ -17,7 +17,9 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.InvalidSelectorException;
 import org.apache.activemq.broker.Broker;
@@ -118,9 +120,8 @@
                 topic.deactivate(context, this);
             }
         }
-        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+        for (final MessageReference node : dispatched) {
             // Mark the dispatched messages as redelivered for next time.
-            MessageReference node = (MessageReference)iter.next();
             Integer count = redeliveredMessages.get(node.getMessageId());
             if (count != null) {
                 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue()
+ 1));
@@ -134,8 +135,8 @@
             } else {
                 node.decrementReferenceCount();
             }
-            iter.remove();
         }
+        dispatched.clear();
         if (!keepDurableSubsActive && pending.isTransient()) {
             synchronized (pending) {
                 try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Dec  4 03:27:58 2007
@@ -17,8 +17,10 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -52,12 +54,12 @@
 
     private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
     protected PendingMessageCursor pending;
-    protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
+    protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
     protected int prefetchExtension;
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
-    protected boolean optimizedDispatch=false;
+    protected boolean optimizedDispatch=true;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
@@ -148,7 +150,7 @@
                 if (node.getMessageId().equals(mdn.getMessageId())) {
                     pending.remove();
                     createMessageDispatch(node, node.getMessage());
-                    dispatched.addLast(node);
+                    dispatched.add(node);
                     return;
                 }
             }
@@ -158,7 +160,8 @@
         throw new JMSException("Slave broker out of sync with master: Dispatched message
(" + mdn.getMessageId() + ") was not in the pending list");
     }
 
-    public synchronized void acknowledge(final ConnectionContext context, final MessageAck
ack) throws Exception {
+    public synchronized void acknowledge(final ConnectionContext context,
+            final MessageAck ack) throws Exception {
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
         if (ack.isStandardAck()) {
@@ -166,36 +169,42 @@
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();)
{
-                final MessageReference node = iter.next();
+            List<MessageReference> removeList = new ArrayList<MessageReference>();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId))
{
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
                     // Don't remove the nodes until we are committed.
                     if (!context.isInTransaction()) {
                         dequeueCounter++;
-                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                        iter.remove();
+                        node.getRegionDestination().getDestinationStatistics()
+                                .getDequeues().increment();
+                        removeList.add(node);
                     } else {
                         // setup a Synchronization to remove nodes from the
                         // dispatched list.
-                        context.getTransaction().addSynchronization(new Synchronization()
{
-
-                            public void afterCommit() throws Exception {
-                                synchronized (PrefetchSubscription.this) {
-                                    dequeueCounter++;
-                                    dispatched.remove(node);
-                                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                                    prefetchExtension--;
-                                }
-                            }
+                        context.getTransaction().addSynchronization(
+                                new Synchronization() {
 
-                            public void afterRollback() throws Exception {
-                                super.afterRollback();
-                            }
-                        });
+                                    public void afterCommit() throws Exception {
+                                        synchronized (PrefetchSubscription.this) {
+                                            dequeueCounter++;
+                                            dispatched.remove(node);
+                                            node.getRegionDestination()
+                                                    .getDestinationStatistics()
+                                                    .getDequeues().increment();
+                                            prefetchExtension--;
+                                        }
+                                    }
+
+                                    public void afterRollback()
+                                            throws Exception {
+                                        super.afterRollback();
+                                    }
+                                });
                     }
                     index++;
                     acknowledge(context, ack, node);
@@ -204,21 +213,28 @@
                             // extend prefetch window only if not a pulling
                             // consumer
                             if (getPrefetchSize() != 0) {
-                                prefetchExtension = Math.max(prefetchExtension, index + 1);
+                                prefetchExtension = Math.max(prefetchExtension,
+                                        index + 1);
                             }
                         } else {
-                            prefetchExtension = Math.max(0, prefetchExtension - (index +
1));
+                            prefetchExtension = Math.max(0, prefetchExtension
+                                    - (index + 1));
                         }
                         callDispatchMatched = true;
                         break;
                     }
                 }
             }
+            for (final MessageReference node : removeList) {
+                dispatched.remove(node);
+            }
             // this only happens after a reconnect - get an ack which is not
             // valid
             if (!callDispatchMatched) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not correlate acknowledgment with dispatched message:
" + ack);
+                    LOG
+                            .debug("Could not correlate acknowledgment with dispatched message:
"
+                                    + ack);
                 }
             }
         } else if (ack.isDeliveredAck()) {
@@ -227,7 +243,8 @@
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter
+                    .hasNext(); index++) {
                 final MessageReference node = iter.next();
                 if (ack.getLastMessageId().equals(node.getMessageId())) {
                     prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -236,17 +253,20 @@
                 }
             }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched
message: " + ack);
-            }
-        } else if (ack.isRedeliveredAck() ) {
-            // Message was re-delivered but it was not yet considered to be a DLQ message.
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
+            }
+        } else if (ack.isRedeliveredAck()) {
+            // Message was re-delivered but it was not yet considered to be a
+            // DLQ message.
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();)
{
-                final MessageReference node = iter.next();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId))
{
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
@@ -258,49 +278,65 @@
                 }
             }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched
message: " + ack);
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
             }
         } else if (ack.isPoisonAck()) {
             // TODO: what if the message is already in a DLQ???
             // Handle the poison ACK case: we need to send the message to a DLQ
             if (ack.isInTransaction()) {
-                throw new JMSException("Poison ack cannot be transacted: " + ack);
+                throw new JMSException("Poison ack cannot be transacted: "
+                        + ack);
             }
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();)
{
-                final MessageReference node = iter.next();
+            List<MessageReference> removeList = new ArrayList<MessageReference>();
+            for (final MessageReference node : dispatched) {
                 MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId))
{
+                if (ack.getFirstMessageId() == null
+                        || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
                 }
                 if (inAckRange) {
                     sendToDLQ(context, node);
-                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                    iter.remove();
+                    node.getRegionDestination().getDestinationStatistics()
+                            .getDequeues().increment();
+                    removeList.add(node);
                     dequeueCounter++;
                     index++;
                     acknowledge(context, ack, node);
                     if (ack.getLastMessageId().equals(messageId)) {
-                        prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
+                        prefetchExtension = Math.max(0, prefetchExtension
+                                - (index + 1));
                         callDispatchMatched = true;
                         break;
                     }
                 }
             }
+            for (final MessageReference node : removeList) {
+                dispatched.remove(node);
+            }
             if (!callDispatchMatched) {
-                throw new JMSException("Could not correlate acknowledgment with dispatched
message: " + ack);
+                throw new JMSException(
+                        "Could not correlate acknowledgment with dispatched message: "
+                                + ack);
             }
         }
         if (callDispatchMatched) {
             dispatchMatched();
         } else {
             if (isSlave()) {
-                throw new JMSException("Slave broker out of sync with master: Acknowledgment
(" + ack + ") was not in the dispatch list: " + dispatched);
+                throw new JMSException(
+                        "Slave broker out of sync with master: Acknowledgment ("
+                                + ack + ") was not in the dispatch list: "
+                                + dispatched);
             } else {
-                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection
reconnects): " + ack);
+                LOG
+                        .debug("Acknowledgment out of sync (Normally occurs when failover
connection reconnects): "
+                                + ack);
             }
         }
     }
@@ -450,7 +486,7 @@
             // NULL messages don't count... they don't get Acked.
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 dispatchCounter++;
-                dispatched.addLast(node);
+                dispatched.add(node);
                 if(pending != null) {
                     pending.dispatched(message);
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Dec  4 03:27:58 2007
@@ -383,11 +383,15 @@
      * @throws IOException
      * @throws Exception
      */
-    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message) throws IOException, Exception {
-        final ConnectionContext context = producerExchange.getConnectionContext();
+    synchronized void doMessageSend(
+            final ProducerBrokerExchange producerExchange, final Message message)
+            throws IOException, Exception {
+        final ConnectionContext context = producerExchange
+                .getConnectionContext();
         message.setRegionDestination(this);
 
-        if (store != null && message.isPersistent() && !canOptimizeOutPersistence())
{
+        if (store != null && message.isPersistent()
+                && !canOptimizeOutPersistence()) {
             while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
                 if (context.getStopping().get()) {
                     throw new IOException("Connection closed, send aborted.");
@@ -397,31 +401,35 @@
         }
 
         message.incrementReferenceCount();
-        try {
 
-            if (context.isInTransaction()) {
-                context.getTransaction().addSynchronization(new Synchronization() {
-                    public void afterCommit() throws Exception {
-                        // It could take while before we receive the commit
-                        // operration.. by that time the message could have
-                        // expired..
-                        if (broker.isExpired(message)) {
-                            broker.messageExpired(context, message);
-                            message.decrementReferenceCount();
-                            destinationStatistics.getMessages().decrement();
-                            return;
-                        }
+        if (context.isInTransaction()) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    // It could take while before we receive the commit
+                    // operration.. by that time the message could have
+                    // expired..
+                    if (broker.isExpired(message)) {
+                        broker.messageExpired(context, message);
+                        message.decrementReferenceCount();
+                        destinationStatistics.getMessages().decrement();
+                        return;
+                    }
+                    try {
                         dispatch(context, message);
+                    } finally {
+                        message.decrementReferenceCount();
                     }
-                });
+                }
+            });
 
-            } else {
+        } else {
+            try {
                 dispatch(context, message);
+            } finally {
+                message.decrementReferenceCount();
             }
-
-        } finally {
-            message.decrementReferenceCount();
         }
+
     }
 
     private boolean canOptimizeOutPersistence() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Tue Dec
 4 03:27:58 2007
@@ -49,6 +49,11 @@
      * Command Marshaller
      */
     Marshaller COMMAND_MARSHALLER = new CommandMarshaller();
+    
+    /**
+     * MessageId marshaller
+     */
+    Marshaller MESSAGEID_MARSHALLER = new MessageIdMarshaller();
 
     /**
      * close the store
@@ -270,4 +275,5 @@
     public boolean isPersistentIndex();
     
 	public void setPersistentIndex(boolean persistentIndex);
+	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Tue Dec  4 03:27:58 2007
@@ -178,15 +178,7 @@
             }
         }
         if (directory != null && directory.isDirectory()) {
-            File[] files = directory.listFiles();
-            if (files != null) {
-                for (int i = 0; i < files.length; i++) {
-                    File file = files[i];
-                    if (!file.isDirectory()) {
-                        result &= file.delete();
-                    }
-                }
-            }
+            result =IOHelper.deleteChildren(directory);
             String str = result ? "successfully deleted" : "failed to delete";
             LOG.info("Kaha Store " + str + " data directory " + directory);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Tue Dec  4 03:27:58 2007
@@ -37,6 +37,7 @@
 import org.apache.activemq.kaha.impl.index.IndexManager;
 import org.apache.activemq.kaha.impl.index.VMIndex;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
+import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -67,7 +68,7 @@
         if (index == null) {
             if (persistentIndex) {
                 String name = containerId.getDataContainerName() + "_" + containerId.getKey();
-                name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
+                name=IOHelper.toFileSystemSafeName(name);
                 try {
                     HashIndex hashIndex = new HashIndex(directory, name, indexManager);
                     hashIndex.setNumberOfBins(getIndexBinSize());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Tue Dec  4 03:27:58 2007
@@ -56,7 +56,7 @@
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
     private static final String STORE_STATE = "store-state";
     private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
-    private static final Integer INDEX_VERSION = new Integer(2);
+    private static final Integer INDEX_VERSION = new Integer(3);
     private static final String RECORD_REFERENCES = "record-references";
     private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Tue Dec  4 03:27:58 2007
@@ -117,7 +117,7 @@
             subscriberContainer.put(key, info);
         }
         // add the subscriber
-        ListContainer container = addSubscriberMessageContainer(key);
+        addSubscriberMessageContainer(key);
         /*
          * if(retroactive){ for(StoreEntry
          * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
@@ -200,33 +200,39 @@
         return result;
     }
 
-    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException
{
-        ListContainer container = store.getListContainer(key, "topic-subs");
+    protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
+        MapContainer container = store.getMapContainer(key, "topic-subs");
+        container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
         Marshaller marshaller = new ConsumerMessageRefMarshaller();
-        container.setMarshaller(marshaller);
+        container.setValueMarshaller(marshaller);
         TopicSubContainer tsc = new TopicSubContainer(container);
         subscriberMessages.put(key, tsc);
         return container;
     }
 
-    protected void removeSubscriberMessageContainer(Object key) throws IOException {
+    protected void removeSubscriberMessageContainer(Object key)
+            throws IOException {
         subscriberContainer.remove(key);
         TopicSubContainer container = subscriberMessages.remove(key);
-        for (Iterator i = container.iterator(); i.hasNext();) {
-            ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
-            if (ref != null) {
-                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
-                if (tsa != null) {
-                    if (tsa.decrementCount() <= 0) {
-                        ackContainer.remove(ref.getAckEntry());
-                        messageContainer.remove(tsa.getMessageEntry());
-                    } else {
-                        ackContainer.update(ref.getAckEntry(), tsa);
+        if (container != null) {
+            for (Iterator i = container.iterator(); i.hasNext();) {
+                ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
+                if (ref != null) {
+                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                    if (tsa != null) {
+                        if (tsa.decrementCount() <= 0) {
+                            ackContainer.remove(ref.getAckEntry());
+                            messageContainer.remove(tsa.getMessageEntry());
+                        } else {
+                            ackContainer.update(ref.getAckEntry(), tsa);
+                        }
                     }
                 }
             }
+            container.clear();
         }
         store.deleteListContainer(key, "topic-subs");
+
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Tue Dec  4 03:27:58 2007
@@ -33,6 +33,7 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.util.SubscriptionKey;
 
 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore
{
 
@@ -40,6 +41,7 @@
     protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String,
TopicSubContainer>();
     private Map<String, SubscriptionInfo> subscriberContainer;
     private Store store;
+    private static final String TOPIC_SUB_NAME = "tsn";
 
     public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
                                    MapContainer<MessageId, ReferenceRecord> messageContainer,
ListContainer<TopicSubAck> ackContainer,
@@ -108,10 +110,12 @@
         }
     }
 
-    protected ListContainer addSubscriberMessageContainer(String clientId, String subscriptionName)
throws IOException {
-        ListContainer container = store.getListContainer(clientId+":"+subscriptionName+":"+destination.getQualifiedName(),
"topic-subs-references");
+    
+    protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName)
throws IOException {
+        MapContainer container = store.getMapContainer(getSubscriptionContainerName(getSubscriptionKey(clientId,
subscriptionName)));
+        container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
         Marshaller marshaller = new ConsumerMessageRefMarshaller();
-        container.setMarshaller(marshaller);
+        container.setValueMarshaller(marshaller);
         TopicSubContainer tsc = new TopicSubContainer(container);
         subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
         return container;
@@ -192,7 +196,7 @@
             adapter.addSubscriberState(info);
         }
         // add the subscriber
-        ListContainer container = addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
+        addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
         if (retroactive) {
             /*
              * for(StoreEntry
@@ -210,8 +214,7 @@
         if (info != null) {
             adapter.removeSubscriberState(info);
         }
-        String key = getSubscriptionKey(clientId, subscriptionName);
-        removeSubscriberMessageContainer(key);
+        removeSubscriberMessageContainer(clientId,subscriptionName);
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
@@ -293,9 +296,11 @@
         }
     }
 
-    protected void removeSubscriberMessageContainer(String key) throws IOException {
-        subscriberContainer.remove(key);
-        TopicSubContainer container = subscriberMessages.remove(key);
+    protected void removeSubscriberMessageContainer(String clientId, String subscriptionName)
throws IOException {
+        String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
+        String containerName = getSubscriptionContainerName(subscriberKey);
+        subscriberContainer.remove(subscriberKey);
+        TopicSubContainer container = subscriberMessages.remove(subscriberKey);
         for (Iterator i = container.iterator(); i.hasNext();) {
             ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
             if (ref != null) {
@@ -310,12 +315,18 @@
                 }
             }
         }
-        store.deleteListContainer(destination, "topic-subs-references-" + key);
+        store.deleteMapContainer(containerName);
     }
 
     protected String getSubscriptionKey(String clientId, String subscriberName) {
-        String result = clientId + ":";
-        result += subscriberName != null ? subscriberName : "NOT_SET";
-        return result;
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(clientId).append(":");  
+        String name = subscriberName != null ? subscriberName : "NOT_SET";
+        return buffer.append(name).toString();
+    }
+    
+    private String getSubscriptionContainerName(String subscriptionKey) {
+        StringBuffer buffer = new StringBuffer(subscriptionKey);
+        return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
Tue Dec  4 03:27:58 2007
@@ -19,7 +19,7 @@
 import java.util.Iterator;
 
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.StoreEntry;
 
 /**
@@ -28,11 +28,11 @@
  * @version $Revision: 1.10 $
  */
 public class TopicSubContainer {
-    private transient ListContainer listContainer;
+    private transient MapContainer mapContainer;
     private transient StoreEntry batchEntry;
 
-    public TopicSubContainer(ListContainer container) {
-        this.listContainer = container;
+    public TopicSubContainer(MapContainer container) {
+        this.mapContainer = container;
     }
 
     /**
@@ -55,78 +55,56 @@
     }
 
     public boolean isEmpty() {
-        return listContainer.isEmpty();
+        return mapContainer.isEmpty();
     }
 
     public StoreEntry add(ConsumerMessageRef ref) {
-        return listContainer.placeLast(ref);
+        return mapContainer.place(ref.getMessageId(),ref);
     }
 
     public ConsumerMessageRef remove(MessageId id) {
         ConsumerMessageRef result = null;
-        if (!listContainer.isEmpty()) {
-            StoreEntry entry = listContainer.getFirst();
-            while (entry != null) {
-                ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);  
       
-                if (ref != null && ref.getMessageId().equals(id)) {
-                    result = ref;
-                    listContainer.remove(entry);
-                    if (batchEntry != null && batchEntry.equals(entry)) {
-                        reset();
-                    }
-                    break;
-                }
-                entry = listContainer.getNext(entry);
+        StoreEntry entry = mapContainer.getEntry(id);
+        if (entry != null) {
+            result = (ConsumerMessageRef) mapContainer.getValue(entry);
+            mapContainer.remove(entry);
+            if (batchEntry != null && batchEntry.equals(entry)) {
+                reset();
             }
         }
-        if (listContainer != null  && (listContainer.isEmpty() )) {
+        if(mapContainer.isEmpty()) {
             reset();
         }
         return result;
     }
     
-    public ConsumerMessageRef removeFirst() {
-		ConsumerMessageRef result = null;
-		if (!listContainer.isEmpty()) {
-			StoreEntry entry = listContainer.getFirst();
-
-			result = (ConsumerMessageRef) listContainer.get(entry);
-			listContainer.remove(entry);
-			if (listContainer != null && batchEntry != null
-					&& (listContainer.isEmpty() || batchEntry.equals(entry))) {
-				reset();
-			}
-
-		}
-		return result;
-	}
-
+    
     public ConsumerMessageRef get(StoreEntry entry) {
-        return (ConsumerMessageRef)listContainer.get(entry);
+        return (ConsumerMessageRef)mapContainer.getValue(entry);
     }
 
     public StoreEntry getEntry() {
-        return listContainer.getFirst();
+        return mapContainer.getFirst();
     }
 
     public StoreEntry refreshEntry(StoreEntry entry) {
-        return listContainer.refresh(entry);
+        return mapContainer.refresh(entry);
     }
 
     public StoreEntry getNextEntry(StoreEntry entry) {
-        return listContainer.getNext(entry);
+        return mapContainer.getNext(entry);
     }
 
     public Iterator iterator() {
-        return listContainer.iterator();
+        return mapContainer.values().iterator();
     }
 
     public int size() {
-        return listContainer.size();
+        return mapContainer.size();
     }
 
     public void clear() {
         reset();
-        listContainer.clear();
+        mapContainer.clear();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java?rev=600891&r1=600890&r2=600891&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java Tue
Dec  4 03:27:58 2007
@@ -23,7 +23,7 @@
  * @version $Revision$
  */
 public final class IOHelper {
-
+    protected static final int MAX_FILE_NAME_LENGTH;
     private IOHelper() {
     }
 
@@ -74,6 +74,10 @@
                 rc.append(HexSupport.toHexFromInt(c, true));
             }
         }
+        String result = rc.toString();
+        if (result.length() > MAX_FILE_NAME_LENGTH) {
+            result = result.substring(0,MAX_FILE_NAME_LENGTH);
+        }
         return rc.toString();
     }
 
@@ -119,6 +123,10 @@
         if (!src.renameTo(new File(targetDirectory, src.getName()))) {
             throw new IOException("Failed to move " + src + " to " + targetDirectory);
         }
+    }
+    
+    static {
+        MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","200")).intValue();
            
     }
 
    



Mime
View raw message