Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 612E66B98 for ; Fri, 8 Jul 2011 14:37:32 +0000 (UTC) Received: (qmail 87130 invoked by uid 500); 8 Jul 2011 14:37:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 87057 invoked by uid 500); 8 Jul 2011 14:37:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 87050 invoked by uid 99); 8 Jul 2011 14:37:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2011 14:37:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2011 14:37:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 645F823888FD for ; Fri, 8 Jul 2011 14:37:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1144340 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc: JDBCMessageStore.java JDBCTopicMessageStore.java Date: Fri, 08 Jul 2011 14:37:10 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110708143710.645F823888FD@eris.apache.org> Author: gtully Date: Fri Jul 8 14:37:10 2011 New Revision: 1144340 URL: http://svn.apache.org/viewvc?rev=1144340&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3397: Improve scalability of active durable subs with JDBC message store. Cache topic message sequence ids to avoid each ack going to the store twice, boost through put for active durable subs; 100 subs, 550->2200 for 2min test with mysql Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Jul 8 14:37:10 2011 @@ -109,10 +109,10 @@ public class JDBCMessageStore extends Ab } finally { c.close(); } - onAdd(sequenceId, message.getPriority()); + onAdd(messageId, sequenceId, message.getPriority()); } - protected void onAdd(long sequenceId, byte priority) { + protected void onAdd(MessageId messageId, long sequenceId, byte priority) { } public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1144340&r1=1144339&r2=1144340&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Jul 8 14:37:10 2011 @@ -20,11 +20,14 @@ import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -46,6 +49,17 @@ public class JDBCTopicMessageStore exten private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); private Map subscriberLastRecoveredMap = new ConcurrentHashMap(); + public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE"; + private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty( + PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10); + private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock(); + private Map sequenceIdCache = new LinkedHashMap() { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > SEQUENCE_ID_CACHE_SIZE; + } + }; + + public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { super(persistenceAdapter, adapter, wireFormat, topic, audit); } @@ -59,7 +73,7 @@ public class JDBCTopicMessageStore exten } TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - long[] res = adapter.getStoreSequenceId(c, destination, messageId); + long[] res = getCachedStoreSequenceId(c, destination, messageId); if (this.isPrioritizedMessages()) { adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); } else { @@ -76,6 +90,20 @@ public class JDBCTopicMessageStore exten } } + private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException { + long[] val = null; + sequenceIdCacheSizeLock.readLock().lock(); + try { + val = sequenceIdCache.get(messageId); + } finally { + sequenceIdCacheSizeLock.readLock().unlock(); + } + if (val == null) { + val = adapter.getStoreSequenceId(transactionContext, destination, messageId); + } + return val; + } + /** * @throws Exception */ @@ -266,11 +294,17 @@ public class JDBCTopicMessageStore exten subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); } - protected void onAdd(long sequenceId, byte priority) { + protected void onAdd(MessageId messageId, long sequenceId, byte priority) { // update last recovered state for (LastRecovered last : subscriberLastRecoveredMap.values()) { last.updateStored(sequenceId, priority); } + sequenceIdCacheSizeLock.writeLock().lock(); + try { + sequenceIdCache.put(messageId, new long[]{sequenceId, priority}); + } finally { + sequenceIdCacheSizeLock.writeLock().unlock(); + } }