activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1144340 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc: JDBCMessageStore.java JDBCTopicMessageStore.java
Date Fri, 08 Jul 2011 14:37:10 GMT
Author: gtully
Date: Fri Jul  8 14:37:10 2011
New Revision: 1144340

URL: http://svn.apache.org/viewvc?rev=1144340&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3397: Improve scalability of active durable subs
with JDBC message store. Cache topic message sequence ids to avoid each ack going to the store
twice, boost through put for active durable subs; 100 subs, 550->2200 for 2min test with
mysql

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Fri Jul  8 14:37:10 2011
@@ -109,10 +109,10 @@ public class JDBCMessageStore extends Ab
         } finally {
             c.close();
         }
-        onAdd(sequenceId, message.getPriority());
+        onAdd(messageId, sequenceId, message.getPriority());
     }
 
-    protected void onAdd(long sequenceId, byte priority) {
+    protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
     }
 
     public void addMessageReference(ConnectionContext context, MessageId messageId, long
expirationTime, String messageRef) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Fri Jul  8 14:37:10 2011
@@ -20,11 +20,14 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -46,6 +49,17 @@ public class JDBCTopicMessageStore exten
     private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
     private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String,
LastRecovered>();
 
+    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
+    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
+               PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
+    private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
+    private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId,
long[]>() {
+         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
+           return size() > SEQUENCE_ID_CACHE_SIZE;
+        }
+    };
+
+
     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
@@ -59,7 +73,7 @@ public class JDBCTopicMessageStore exten
         }
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-            long[] res = adapter.getStoreSequenceId(c, destination, messageId);
+            long[] res = getCachedStoreSequenceId(c, destination, messageId);
             if (this.isPrioritizedMessages()) {
                 adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName,
res[0], res[1]);
             } else {
@@ -76,6 +90,20 @@ public class JDBCTopicMessageStore exten
         }
     }
 
+    private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination
destination, MessageId messageId) throws SQLException, IOException {
+        long[] val = null;
+        sequenceIdCacheSizeLock.readLock().lock();
+        try {
+            val = sequenceIdCache.get(messageId);
+        } finally {
+            sequenceIdCacheSizeLock.readLock().unlock();
+        }
+        if (val == null) {
+            val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
+        }
+        return val;
+    }
+
     /**
      * @throws Exception
      */
@@ -266,11 +294,17 @@ public class JDBCTopicMessageStore exten
         subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
     }
 
-    protected void onAdd(long sequenceId, byte priority) {
+    protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
         // update last recovered state
         for (LastRecovered last : subscriberLastRecoveredMap.values()) {
             last.updateStored(sequenceId, priority);
         }
+        sequenceIdCacheSizeLock.writeLock().lock();
+        try {
+            sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
+        } finally {
+            sequenceIdCacheSizeLock.writeLock().unlock();
+        }
     }
 
 



Mime
View raw message