activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r393294 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQConnectionFactory.java ActiveMQMessageConsumer.java ActiveMQSession.java ActiveMQSessionExecutor.java
Date Tue, 11 Apr 2006 19:09:15 GMT
Author: rajdavies
Date: Tue Apr 11 12:09:11 2006
New Revision: 393294

URL: http://svn.apache.org/viewcvs?rev=393294&view=rev
Log:
extra peformance tuning parameters

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Apr 11 12:09:11 2006
@@ -116,8 +116,10 @@
     private boolean copyMessageOnSend = true;
     private boolean useCompression = false;
     private boolean objectMessageSerializationDefered = false;
-    protected boolean asyncDispatch = true;
+    protected boolean asyncDispatch = false;
+    protected boolean alwaysSessionAsync=true;
     private boolean useAsyncSend = false;
+    private boolean optimizeAcknowledge = true;
     private boolean useRetroactiveConsumer;
     private int closeTimeout = 15000;
     
@@ -247,15 +249,18 @@
      * @see Session#DUPS_OK_ACKNOWLEDGE
      * @since 1.1
      */
-    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
+    public Session createSession(boolean transacted,int acknowledgeMode) throws JMSException{
         checkClosedOrFailed();
         ensureConnectionInfoSent();
-        return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED
-                : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
: acknowledgeMode)), asyncDispatch);
+        boolean doSessionAsync=alwaysSessionAsync||sessions.size()>0||transacted
+                        ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
+        return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
+                        :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
+                        asyncDispatch,alwaysSessionAsync);
     }
 
     /**
-     * @return
+     * @return sessionId
      */
     protected SessionId getNextSessionId() {
         return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
@@ -1325,6 +1330,37 @@
     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
         this.redeliveryPolicy = redeliveryPolicy;
     }
+    
+    /**
+     * @return Returns the alwaysSessionAsync.
+     */
+    public boolean isAlwaysSessionAsync(){
+        return alwaysSessionAsync;
+    }
+
+
+    /**
+     * @param alwaysSessionAsync The alwaysSessionAsync to set.
+     */
+    public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
+        this.alwaysSessionAsync=alwaysSessionAsync;
+    }
+
+    /**
+     * @return Returns the optimizeAcknowledge.
+     */
+    public boolean isOptimizeAcknowledge(){
+        return optimizeAcknowledge;
+    }
+
+
+    /**
+     * @param optimizeAcknowledge The optimizeAcknowledge to set.
+     */
+    public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
+        this.optimizeAcknowledge=optimizeAcknowledge;
+    }
+
 
     private void waitForBrokerInfo() throws JMSException {
         try {
@@ -1516,7 +1552,7 @@
     }
 
     public void setAsyncDispatch(boolean asyncDispatch) {
-        this.asyncDispatch = asyncDispatch;
+        //this.asyncDispatch = asyncDispatch;
     }
 
     public boolean isObjectMessageSerializationDefered() {
@@ -1702,4 +1738,7 @@
     public String toString() {
         return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
     }
+
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Tue Apr 11 12:09:11 2006
@@ -79,8 +79,10 @@
     private boolean copyMessageOnSend = true;
     private boolean useCompression = false;
     private boolean objectMessageSerializationDefered = false;
-    protected boolean asyncDispatch = true;
+    protected boolean asyncDispatch = false;
+    protected boolean alwaysSessionAsync=true;
     private boolean useAsyncSend = false;
+    private boolean optimizeAcknowledge = true;
     private int closeTimeout = 15000;
     private boolean useRetroactiveConsumer;
 
@@ -233,6 +235,8 @@
             connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
             connection.setAsyncDispatch(isAsyncDispatch());
             connection.setUseAsyncSend(isUseAsyncSend());
+            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
+            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
             connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
             connection.setRedeliveryPolicy(getRedeliveryPolicy());
             
@@ -417,6 +421,9 @@
         props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
         props.setProperty("userName", getUserName());
         props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
+        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
+        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
+        
     }
 
     public boolean isOnSendPrepareMessageBody() {
@@ -463,5 +470,33 @@
      */
     public void setCloseTimeout(int closeTimeout){
         this.closeTimeout=closeTimeout;
+    }
+
+    /**
+     * @return Returns the alwaysSessionAsync.
+     */
+    public boolean isAlwaysSessionAsync(){
+        return alwaysSessionAsync;
+    }
+
+    /**
+     * @param alwaysSessionAsync The alwaysSessionAsync to set.
+     */
+    public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
+        this.alwaysSessionAsync=alwaysSessionAsync;
+    }
+
+    /**
+     * @return Returns the optimizeAcknowledge.
+     */
+    public boolean isOptimizeAcknowledge(){
+        return optimizeAcknowledge;
+    }
+
+    /**
+     * @param optimizeAcknowledge The optimizeAcknowledge to set.
+     */
+    public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
+        this.optimizeAcknowledge=optimizeAcknowledge;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Apr 11 12:09:11 2006
@@ -100,7 +100,7 @@
     private int additionalWindowSize = 0;
     private int rollbackCounter = 0;
     private long redeliveryDelay = 0;
-
+    private int ackCounter = 0;
     private MessageListener messageListener;
     private JMSConsumerStatsImpl stats;
 
@@ -111,6 +111,7 @@
     private MessageAvailableListener availableListener;
 
     private RedeliveryPolicy redeliveryPolicy;
+    private boolean optimizeAcknowledge;
 
     /**
      * Create a MessageConsumer
@@ -188,7 +189,9 @@
             this.session.removeConsumer(this);
             throw e;
         }
-
+        this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&!info.isDurable()
+                        &&!info.getDestination().isQueue()
+                        &&session.isAutoAcknowledge();
         if (session.connection.isStarted())
             start();
     }
@@ -540,27 +543,36 @@
             deliveredMessages.addFirst(md);
     }
 
-    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
JMSException {
-        if (unconsumedMessages.isClosed())
+    private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws
JMSException{
+        if(unconsumedMessages.isClosed())
             return;
-
-        if (messageExpired) {
-            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-        } else {
+        if(messageExpired){
+            ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+        }else{
             stats.onMessage();
-            if (session.isTransacted()) {
-                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else if (session.isAutoAcknowledge()) {
-                if (!deliveredMessages.isEmpty()) {
-                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
-                    session.asyncSendPacket(ack);
-                    deliveredMessages.clear();
+            if(session.isTransacted()){
+                ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+            }else if(session.isAutoAcknowledge()){
+                if(!deliveredMessages.isEmpty()){
+                    if(this.optimizeAcknowledge){
+                        ackCounter++;
+                        if(ackCounter>=(info.getPrefetchSize()*.75)){
+                            MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+                            session.asyncSendPacket(ack);
+                            ackCounter=0;
+                            deliveredMessages.clear();
+                        }
+                    }else{
+                        MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                        session.asyncSendPacket(ack);
+                        deliveredMessages.clear();
+                    }
                 }
-            } else if (session.isDupsOkAcknowledge()) {
-                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
-            } else if (session.isClientAcknowledge()) {
-                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else {
+            }else if(session.isDupsOkAcknowledge()){
+                ackLater(md,MessageAck.STANDARD_ACK_TYPE);
+            }else if(session.isClientAcknowledge()){
+                ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+            }else{
                 throw new IllegalStateException("Invalid session state.");
             }
         }
@@ -645,63 +657,59 @@
         redeliveryDelay = 0;
     }
 
-    public void rollback() throws JMSException {
-        synchronized (unconsumedMessages.getMutex()) {
-            if (deliveredMessages.isEmpty())
+    public void rollback() throws JMSException{
+        synchronized(unconsumedMessages.getMutex()){
+            if(optimizeAcknowledge){
+               
+                // remove messages read but not acked at the broker yet through optimizeAcknowledge
 
+                for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+                    deliveredMessages.removeLast();
+                }
+            }
+            if(deliveredMessages.isEmpty())
                 return;
-
             rollbackCounter++;
-            if (rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
-                
+            if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
                 // We need to NACK the messages so that they get sent to the
                 // DLQ.
-
                 // Acknowledge the last message.
-                MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0);
-                MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+                MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0);
+                MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
                 session.asyncSendPacket(ack);
-
                 // Adjust the window size.
-                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-                rollbackCounter = 0;
-                redeliveryDelay = 0;
-                
-            } else {
-
+                additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
+                rollbackCounter=0;
+                redeliveryDelay=0;
+            }else{
                 // stop the delivery of messages.
                 unconsumedMessages.stop();
-
                 // Start up the delivery again a little later.
-                if (redeliveryDelay == 0) {
-                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
-                } else {
-                    if (redeliveryPolicy.isUseExponentialBackOff())
-                        redeliveryDelay *= redeliveryPolicy.getBackOffMultiplier();
+                if(redeliveryDelay==0){
+                    redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
+                }else{
+                    if(redeliveryPolicy.isUseExponentialBackOff())
+                        redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
                 }
-
-                Scheduler.executeAfterDelay(new Runnable() {
-                    public void run() {
-                        try {
-                            if (started.get())
+                Scheduler.executeAfterDelay(new Runnable(){
+                    public void run(){
+                        try{
+                            if(started.get())
                                 start();
-                        } catch (JMSException e) {
+                        }catch(JMSException e){
                             session.connection.onAsyncException(e);
                         }
                     }
-                }, redeliveryDelay);
-                
-                for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
-                    MessageDispatch md = (MessageDispatch) iter.next();
+                },redeliveryDelay);
+                for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
+                    MessageDispatch md=(MessageDispatch) iter.next();
                     md.getMessage().onMessageRolledBack();
                     unconsumedMessages.enqueueFirst(md);
                 }
             }
-
-            deliveredCounter -= deliveredMessages.size();
+            deliveredCounter-=deliveredMessages.size();
             deliveredMessages.clear();
         }
-
-        if (messageListener != null) {
+        if(messageListener!=null){
             session.redispatch(unconsumedMessages);
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Tue Apr 11 12:09:11 2006
@@ -72,6 +72,7 @@
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -196,24 +197,29 @@
 
     protected boolean closed;
     protected boolean asyncDispatch;
+    protected boolean sessionAsyncDispatch;
+    protected TaskRunner taskRunner;
 
     /**
      * Construct the Session
      * 
      * @param connection
+     * @param sessionId 
      * @param acknowledgeMode
      *            n.b if transacted - the acknowledgeMode ==
      *            Session.SESSION_TRANSACTED
+     * @param asyncDispatch 
+     * @param sessionAsyncDispatch 
      * @throws JMSException
      *             on internal error
      */
-    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode,
boolean asyncDispatch)
+    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode,
boolean asyncDispatch,boolean sessionAsyncDispatch)
             throws JMSException {
 
         this.connection = connection;
         this.acknowledgementMode = acknowledgeMode;
         this.asyncDispatch=asyncDispatch;
-        
+        this.sessionAsyncDispatch = sessionAsyncDispatch;
         this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
         setTransactionContext(new TransactionContext(connection));
         connection.addSession(this);
@@ -224,6 +230,10 @@
             start();
 
     }
+    
+    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode,
boolean asyncDispatch)throws JMSException {
+        this(connection,sessionId,acknowledgeMode,asyncDispatch,true);
+    }
 
     /**
      * Sets the transaction context of the session.
@@ -1663,6 +1673,20 @@
     public void setAsyncDispatch(boolean asyncDispatch) {
         this.asyncDispatch = asyncDispatch;
     }
+    
+    /**
+     * @return Returns the sessionAsyncDispatch.
+     */
+    public boolean isSessionAsyncDispatch(){
+        return sessionAsyncDispatch;
+    }
+
+    /**
+     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
+     */
+    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch){
+        this.sessionAsyncDispatch=sessionAsyncDispatch;
+    }
 
 	public List getUnconsumedMessages() {
 		return executor.getUnconsumedMessages();
@@ -1683,5 +1707,7 @@
             }
         }
     }
+
+    
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Apr 11 12:09:11 2006
@@ -52,7 +52,7 @@
     
 
     void execute(MessageDispatch message) throws InterruptedException {
-        if (!session.isAsyncDispatch() && !dispatchedBySessionPool){
+        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
             dispatch(message);
         }else {
             messageQueue.enqueue(message);



Mime
View raw message