Author: gtully Date: Fri Jul 8 14:37:10 2011 New Revision: 1144340 URL: http://svn.apache.org/viewvc?rev=1144340&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3397: Improve scalability of active durable subs with JDBC message store. Cache topic message sequence ids to avoid each ack going to the store twice, boost through put for active durable subs; 100 subs, 550->2200 for 2min test with mysql Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Jul 8 14:37:10 2011 @@ -109,10 +109,10 @@ public class JDBCMessageStore extends Ab } finally { c.close(); } - onAdd(sequenceId, message.getPriority()); + onAdd(messageId, sequenceId, message.getPriority()); } - protected void onAdd(long sequenceId, byte priority) { + protected void onAdd(MessageId messageId, long sequenceId, byte priority) { } public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Jul 8 14:37:10 2011 @@ -20,11 +20,14 @@ import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -46,6 +49,17 @@ public class JDBCTopicMessageStore exten private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); private Map subscriberLastRecoveredMap = new ConcurrentHashMap(); + public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE"; + private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty( + PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10); + private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock(); + private Map sequenceIdCache = new LinkedHashMap() { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > SEQUENCE_ID_CACHE_SIZE; + } + }; + + public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { super(persistenceAdapter, adapter, wireFormat, topic, audit); } @@ -59,7 +73,7 @@ public class JDBCTopicMessageStore exten } TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - long[] res = adapter.getStoreSequenceId(c, destination, messageId); + long[] res = getCachedStoreSequenceId(c, destination, messageId); if (this.isPrioritizedMessages()) { adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); } else { @@ -76,6 +90,20 @@ public class JDBCTopicMessageStore exten } } + private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException { + long[] val = null; + sequenceIdCacheSizeLock.readLock().lock(); + try { + val = sequenceIdCache.get(messageId); + } finally { + sequenceIdCacheSizeLock.readLock().unlock(); + } + if (val == null) { + val = adapter.getStoreSequenceId(transactionContext, destination, messageId); + } + return val; + } + /** * @throws Exception */ @@ -266,11 +294,17 @@ public class JDBCTopicMessageStore exten subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); } - protected void onAdd(long sequenceId, byte priority) { + protected void onAdd(MessageId messageId, long sequenceId, byte priority) { // update last recovered state for (LastRecovered last : subscriberLastRecoveredMap.values()) { last.updateStored(sequenceId, priority); } + sequenceIdCacheSizeLock.writeLock().lock(); + try { + sequenceIdCache.put(messageId, new long[]{sequenceId, priority}); + } finally { + sequenceIdCacheSizeLock.writeLock().unlock(); + } }