activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r820713 - in /activemq/trunk/activemq-core: pom.xml src/main/java/org/apache/activemq/broker/ConnectionContext.java src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java src/main/java/org/apache/activemq/broker/region/Queue.java
Date Thu, 01 Oct 2009 16:46:38 GMT
Author: chirino
Date: Thu Oct  1 16:46:37 2009
New Revision: 820713

URL: http://svn.apache.org/viewvc?rev=820713&view=rev
Log:
AMQ-2435: NullPointer Exception Occurs when using producer flow control

 When producer window based flow control kicks in now, we copy the context since it will be
 changed while the message send request is waiting for space on the queue.


Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Thu Oct  1 16:46:37 2009
@@ -599,8 +599,8 @@
           <phase>process-classes</phase>
             <configuration>
               <namespace>http://activemq.apache.org/schema/core</namespace>
-              <schema>${basedir}/target/classes/activemq.xsd</schema>
-              <outputDir>${basedir}/target/classes</outputDir>
+              <schema>${basedir}/target/generated-sources/xbean/activemq.xsd</schema>
+              <outputDir>${basedir}/target/generated-sources/xbean</outputDir>
               <generateSpringSchemasFile>false</generateSpringSchemasFile>
               <excludedClasses>org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean</excludedClasses>
             </configuration>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Thu Oct  1 16:46:37 2009
@@ -73,6 +73,32 @@
         setUserName(info.getUserName());
         setConnectionId(info.getConnectionId());
     }
+    
+    public ConnectionContext copy() {
+        ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext);
+        rc.connection = this.connection;
+        rc.connector = this.connector;
+        rc.broker = this.broker;
+        rc.inRecoveryMode = this.inRecoveryMode;
+        rc.transaction = this.transaction;
+        rc.transactions = this.transactions;
+        rc.securityContext = this.securityContext;
+        rc.connectionId = this.connectionId;
+        rc.clientId = this.clientId;
+        rc.userName = this.userName;
+        rc.haAware = this.haAware;
+        rc.wireFormatInfo = this.wireFormatInfo;
+        rc.longTermStoreContext = this.longTermStoreContext;
+        rc.producerFlowControl = this.producerFlowControl;
+        rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy;
+        rc.networkConnection = this.networkConnection;
+        rc.faultTolerant = this.faultTolerant;
+        rc.stopping.set(this.stopping.get());
+        rc.dontSendReponse = this.dontSendReponse;
+        rc.clientMaster = this.clientMaster;
+        return rc;
+    }
+
 
     public SecurityContext getSecurityContext() {
         return securityContext;
@@ -293,4 +319,5 @@
     public void setFaultTolerant(boolean faultTolerant) {
         this.faultTolerant = faultTolerant;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Oct  1 16:46:37 2009
@@ -36,6 +36,17 @@
     public ProducerBrokerExchange() {
     }
 
+    public ProducerBrokerExchange copy() {
+        ProducerBrokerExchange rc = new ProducerBrokerExchange();
+        rc.connectionContext = connectionContext.copy();
+        rc.regionDestination = regionDestination;
+        rc.region = region;
+        rc.producerState = producerState;
+        rc.mutable = mutable;
+        return rc;
+    }
+
+    
     /**
      * @return the connectionContext
      */
@@ -105,4 +116,5 @@
     public void setProducerState(ProducerState producerState) {
         this.producerState = producerState;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Oct  1 16:46:37 2009
@@ -428,6 +428,9 @@
                 // a sync message or
                 // if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired())
{
+                    // copy the exchange state since the context will be modified while we
are waiting
+                    // for space.
+                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();

                     synchronized (messagesWaitingForSpace) {
                         messagesWaitingForSpace.add(new Runnable() {
                             public void run() {
@@ -439,7 +442,7 @@
                                         broker.messageExpired(context, message);
                                         destinationStatistics.getExpired().increment();
                                     } else {
-                                        doMessageSend(producerExchange, message);
+                                        doMessageSend(producerExchangeCopy, message);
                                     }
     
                                     if (sendProducerAck) {



Mime
View raw message