activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r395611 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/kaha/impl/ test/java/org/apache/activemq/
Date Thu, 20 Apr 2006 14:58:52 GMT
Author: rajdavies
Date: Thu Apr 20 07:58:50 2006
New Revision: 395611

URL: http://svn.apache.org/viewcvs?rev=395611&view=rev
Log:
finese tuning

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Apr 20 07:58:50 2006
@@ -625,7 +625,7 @@
                     if(optimizeAcknowledge){
                         if(deliveryingAcknowledgements.compareAndSet(false,true)){
                             ackCounter++;
-                            if(ackCounter>=(info.getCurrentPrefetchSize()*.50)){
+                            if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){
                                 MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
                                 session.asyncSendPacket(ack);
                                 ackCounter=0;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
Thu Apr 20 07:58:50 2006
@@ -32,6 +32,7 @@
     private int queueBrowserPrefetch;
     private int topicPrefetch;
     private int durableTopicPrefetch;
+    private int optimizeDurableTopicPrefetch;
     private int inputStreamPrefetch;
     private int maximumPendingMessageLimit;
 
@@ -43,6 +44,7 @@
         this.queueBrowserPrefetch = 500;
         this.topicPrefetch = MAX_PREFETCH_SIZE;
         this.durableTopicPrefetch = 100;
+        this.optimizeDurableTopicPrefetch=1000;
         this.inputStreamPrefetch = 100;
     }
 
@@ -102,6 +104,20 @@
         this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
     }
     
+    /**
+     * @return Returns the optimizeDurableTopicPrefetch.
+     */
+    public int getOptimizeDurableTopicPrefetch(){
+        return optimizeDurableTopicPrefetch;
+    }
+
+    /**
+     * @param optimizeDurableTopicPrefetch The optimizeDurableTopicPrefetch to set.
+     */
+    public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch){
+        this.optimizeDurableTopicPrefetch=optimizeAcknowledgePrefetch;
+    }
+    
     public int getMaximumPendingMessageLimit() {
         return maximumPendingMessageLimit;
     }
@@ -129,6 +145,7 @@
         this.queuePrefetch=i;
         this.topicPrefetch=i;
         this.inputStreamPrefetch=1;
+        this.optimizeDurableTopicPrefetch=i;
     }
 
     public int getInputStreamPrefetch() {
@@ -138,4 +155,6 @@
     public void setInputStreamPrefetch(int inputStreamPrefetch) {
         this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
     }
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Apr 20 07:58:50 2006
@@ -1057,14 +1057,17 @@
      *             if the message selector is invalid.
      * @since 1.1
      */
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector,
boolean noLocal)
-            throws JMSException {
+    public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean
noLocal)
+                    throws JMSException{
         checkClosed();
         connection.checkClientIDWasManuallySpecified();
-        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
-        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(),

-                prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
+        ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy();
+        int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy
+                        .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch();
+        int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit();
+        return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation
+                        .transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false,
+                        asyncDispatch);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
Thu Apr 20 07:58:50 2006
@@ -32,7 +32,7 @@
  */
 final class DataManager{
     private static final Log log=LogFactory.getLog(DataManager.class);
-    protected static long MAX_FILE_LENGTH=1024*1024*16;
+    protected static long MAX_FILE_LENGTH=1024*1024*32;
     private final File dir;
     private final String prefix;
     private StoreDataReader reader;

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
Thu Apr 20 07:58:50 2006
@@ -281,7 +281,7 @@
             session.createTextMessage("Second Message")
         };
 
-        // lets consume any outstanding messages from previous test runs
+        // lets consume any outstanding messages from prev test runs
         while (consumer.receive(1000) != null) {
         }
         session.commit();
@@ -306,7 +306,7 @@
         assertEquals(outbound[1], message);
         session.rollback();
 
-        // Consume again.. the previous message should
+        // Consume again.. the prev message should
         // get redelivered.
         message = consumer.receive(5000);
         assertNotNull("Should have re-received the message again!", message);
@@ -329,7 +329,7 @@
             session.createTextMessage("Second Message")
         };
 
-        // lets consume any outstanding messages from previous test runs
+        // lets consume any outstanding messages from prev test runs
         while (consumer.receive(1000) != null) {
         }
         session.commit();
@@ -351,7 +351,7 @@
         assertEquals(outbound[1], message);
         session.rollback();
 
-        // Consume again.. the previous message should
+        // Consume again.. the prev message should
         // get redelivered.
         message = consumer.receive(5000);
         assertNotNull("Should have re-received the first message again!", message);
@@ -445,7 +445,7 @@
             session.createTextMessage("Second Message")
         };
 
-        // lets consume any outstanding messages from previous test runs
+        // lets consume any outstanding messages from prev test runs
         while (consumer.receiveNoWait() != null) {
         }
 
@@ -529,7 +529,7 @@
     protected void reconnect() throws JMSException {
         
         if (connection != null) {
-            // Close the previous connection.
+            // Close the prev connection.
             connection.close();
         }
         session = null;
@@ -562,6 +562,7 @@
         prefetchPolicy.setQueuePrefetch(1);
         prefetchPolicy.setTopicPrefetch(1);
         prefetchPolicy.setDurableTopicPrefetch(1);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
     }
 
     public void testMessageListener() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
Thu Apr 20 07:58:50 2006
@@ -130,6 +130,7 @@
         activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
         activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
         activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
+        activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
     }
 
     public void tearDown() throws Exception {



Mime
View raw message