Author: chirino Date: Thu Oct 1 16:46:37 2009 New Revision: 820713 URL: http://svn.apache.org/viewvc?rev=820713&view=rev Log: AMQ-2435: NullPointer Exception Occurs when using producer flow control When producer window based flow control kicks in now, we copy the context since it will be changed while the message send request is waiting for space on the queue. Modified: activemq/trunk/activemq-core/pom.xml activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=820713&r1=820712&r2=820713&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Thu Oct 1 16:46:37 2009 @@ -599,8 +599,8 @@ process-classes http://activemq.apache.org/schema/core - ${basedir}/target/classes/activemq.xsd - ${basedir}/target/classes + ${basedir}/target/generated-sources/xbean/activemq.xsd + ${basedir}/target/generated-sources/xbean false org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=820713&r1=820712&r2=820713&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Thu Oct 1 16:46:37 2009 @@ -73,6 +73,32 @@ setUserName(info.getUserName()); setConnectionId(info.getConnectionId()); } + + public ConnectionContext copy() { + ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext); + rc.connection = this.connection; + rc.connector = this.connector; + rc.broker = this.broker; + rc.inRecoveryMode = this.inRecoveryMode; + rc.transaction = this.transaction; + rc.transactions = this.transactions; + rc.securityContext = this.securityContext; + rc.connectionId = this.connectionId; + rc.clientId = this.clientId; + rc.userName = this.userName; + rc.haAware = this.haAware; + rc.wireFormatInfo = this.wireFormatInfo; + rc.longTermStoreContext = this.longTermStoreContext; + rc.producerFlowControl = this.producerFlowControl; + rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy; + rc.networkConnection = this.networkConnection; + rc.faultTolerant = this.faultTolerant; + rc.stopping.set(this.stopping.get()); + rc.dontSendReponse = this.dontSendReponse; + rc.clientMaster = this.clientMaster; + return rc; + } + public SecurityContext getSecurityContext() { return securityContext; @@ -293,4 +319,5 @@ public void setFaultTolerant(boolean faultTolerant) { this.faultTolerant = faultTolerant; } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=820713&r1=820712&r2=820713&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Oct 1 16:46:37 2009 @@ -36,6 +36,17 @@ public ProducerBrokerExchange() { } + public ProducerBrokerExchange copy() { + ProducerBrokerExchange rc = new ProducerBrokerExchange(); + rc.connectionContext = connectionContext.copy(); + rc.regionDestination = regionDestination; + rc.region = region; + rc.producerState = producerState; + rc.mutable = mutable; + return rc; + } + + /** * @return the connectionContext */ @@ -105,4 +116,5 @@ public void setProducerState(ProducerState producerState) { this.producerState = producerState; } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=820713&r1=820712&r2=820713&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Oct 1 16:46:37 2009 @@ -428,6 +428,9 @@ // a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { + // copy the exchange state since the context will be modified while we are waiting + // for space. + final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { public void run() { @@ -439,7 +442,7 @@ broker.messageExpired(context, message); destinationStatistics.getExpired().increment(); } else { - doMessageSend(producerExchange, message); + doMessageSend(producerExchangeCopy, message); } if (sendProducerAck) {