Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 67916 invoked from network); 20 Apr 2006 15:11:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 20 Apr 2006 15:11:49 -0000 Received: (qmail 85932 invoked by uid 500); 20 Apr 2006 14:59:19 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 85909 invoked by uid 500); 20 Apr 2006 14:59:19 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 85891 invoked by uid 99); 20 Apr 2006 14:59:19 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2006 07:59:19 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 20 Apr 2006 07:59:15 -0700 Received: (qmail 60325 invoked by uid 65534); 20 Apr 2006 14:58:54 -0000 Message-ID: <20060420145854.60313.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r395611 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/kaha/impl/ test/java/org/apache/activemq/ Date: Thu, 20 Apr 2006 14:58:52 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: rajdavies Date: Thu Apr 20 07:58:50 2006 New Revision: 395611 URL: http://svn.apache.org/viewcvs?rev=395611&view=rev Log: finese tuning Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Apr 20 07:58:50 2006 @@ -625,7 +625,7 @@ if(optimizeAcknowledge){ if(deliveryingAcknowledgements.compareAndSet(false,true)){ ackCounter++; - if(ackCounter>=(info.getCurrentPrefetchSize()*.50)){ + if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); session.asyncSendPacket(ack); ackCounter=0; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java Thu Apr 20 07:58:50 2006 @@ -32,6 +32,7 @@ private int queueBrowserPrefetch; private int topicPrefetch; private int durableTopicPrefetch; + private int optimizeDurableTopicPrefetch; private int inputStreamPrefetch; private int maximumPendingMessageLimit; @@ -43,6 +44,7 @@ this.queueBrowserPrefetch = 500; this.topicPrefetch = MAX_PREFETCH_SIZE; this.durableTopicPrefetch = 100; + this.optimizeDurableTopicPrefetch=1000; this.inputStreamPrefetch = 100; } @@ -102,6 +104,20 @@ this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); } + /** + * @return Returns the optimizeDurableTopicPrefetch. + */ + public int getOptimizeDurableTopicPrefetch(){ + return optimizeDurableTopicPrefetch; + } + + /** + * @param optimizeDurableTopicPrefetch The optimizeDurableTopicPrefetch to set. + */ + public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch){ + this.optimizeDurableTopicPrefetch=optimizeAcknowledgePrefetch; + } + public int getMaximumPendingMessageLimit() { return maximumPendingMessageLimit; } @@ -129,6 +145,7 @@ this.queuePrefetch=i; this.topicPrefetch=i; this.inputStreamPrefetch=1; + this.optimizeDurableTopicPrefetch=i; } public int getInputStreamPrefetch() { @@ -138,4 +155,6 @@ public void setInputStreamPrefetch(int inputStreamPrefetch) { this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch); } + + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 20 07:58:50 2006 @@ -1057,14 +1057,17 @@ * if the message selector is invalid. * @since 1.1 */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) + throws JMSException{ checkClosed(); connection.checkClientIDWasManuallySpecified(); - ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); - return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(), - prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); + ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy(); + int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy + .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch(); + int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit(); + return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation + .transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false, + asyncDispatch); } /** Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Thu Apr 20 07:58:50 2006 @@ -32,7 +32,7 @@ */ final class DataManager{ private static final Log log=LogFactory.getLog(DataManager.class); - protected static long MAX_FILE_LENGTH=1024*1024*16; + protected static long MAX_FILE_LENGTH=1024*1024*32; private final File dir; private final String prefix; private StoreDataReader reader; Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java Thu Apr 20 07:58:50 2006 @@ -281,7 +281,7 @@ session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receive(1000) != null) { } session.commit(); @@ -306,7 +306,7 @@ assertEquals(outbound[1], message); session.rollback(); - // Consume again.. the previous message should + // Consume again.. the prev message should // get redelivered. message = consumer.receive(5000); assertNotNull("Should have re-received the message again!", message); @@ -329,7 +329,7 @@ session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receive(1000) != null) { } session.commit(); @@ -351,7 +351,7 @@ assertEquals(outbound[1], message); session.rollback(); - // Consume again.. the previous message should + // Consume again.. the prev message should // get redelivered. message = consumer.receive(5000); assertNotNull("Should have re-received the first message again!", message); @@ -445,7 +445,7 @@ session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receiveNoWait() != null) { } @@ -529,7 +529,7 @@ protected void reconnect() throws JMSException { if (connection != null) { - // Close the previous connection. + // Close the prev connection. connection.close(); } session = null; @@ -562,6 +562,7 @@ prefetchPolicy.setQueuePrefetch(1); prefetchPolicy.setTopicPrefetch(1); prefetchPolicy.setDurableTopicPrefetch(1); + prefetchPolicy.setOptimizeDurableTopicPrefetch(1); } public void testMessageListener() throws Exception { Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java Thu Apr 20 07:58:50 2006 @@ -130,6 +130,7 @@ activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue); } public void tearDown() throws Exception {