Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 33901 invoked from network); 15 Mar 2007 14:26:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 15 Mar 2007 14:26:02 -0000 Received: (qmail 37929 invoked by uid 500); 15 Mar 2007 14:26:11 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 37896 invoked by uid 500); 15 Mar 2007 14:26:11 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37885 invoked by uid 99); 15 Mar 2007 14:26:11 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Mar 2007 07:26:10 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Mar 2007 07:26:02 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id CD1B01A9838; Thu, 15 Mar 2007 07:25:41 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r518638 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/ProducerFlowControlTest.java Date: Thu, 15 Mar 2007 14:25:41 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070315142541.CD1B01A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Mar 15 07:25:40 2007 New Revision: 518638 URL: http://svn.apache.org/viewvc?view=rev&rev=518638 Log: Added test case that makes use of producer window flow control. So now even async sends can be flow controled so that an individual publisher can be stopped without stopping the entire connection. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=518638&r1=518637&r2=518638 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Mar 15 07:25:40 2007 @@ -364,24 +364,28 @@ synchronized( messagesWaitingForSpace ) { messagesWaitingForSpace.add(new Runnable() { public void run() { + + // While waiting for space to free up... the message may have expired. + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + + if( !message.isResponseRequired() ) { + ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); + context.getConnection().dispatchAsync(ack); + } + return; + } + + try { doMessageSend(producerExchange, message); - if( message.isResponseRequired() ) { - Response response = new Response(); - response.setCorrelationId(message.getCommandId()); - context.getConnection().dispatchAsync(response); - } else { - ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); - context.getConnection().dispatchAsync(ack); - } } catch (Exception e) { if( message.isResponseRequired() ) { ExceptionResponse response = new ExceptionResponse(e); response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); - } else { - ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); - context.getConnection().dispatchAsync(ack); } } } @@ -391,8 +395,7 @@ if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { // so call it directly here. sendMessagesWaitingForSpaceTask.run(); - } - + } context.setDontSendReponse(true); return; } @@ -412,10 +415,6 @@ if (log.isDebugEnabled()) { log.debug("Expired message: " + message); } - if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { - ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); - context.getConnection().dispatchAsync(ack); - } return; } } @@ -431,7 +430,10 @@ if(store!=null&&message.isPersistent()){ store.addMessage(context,message); } - + if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { + ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); + context.getConnection().dispatchAsync(ack); + } if(context.isInTransaction()){ // If this is a transacted message.. increase the usage now so that a big TX does not blow up // our memory. This increment is decremented once the tx finishes.. @@ -445,10 +447,6 @@ // TODO: remove message from store. if (log.isDebugEnabled()) { log.debug("Expired message: " + message); - } - if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { - ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); - context.getConnection().dispatchAsync(ack); } return; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=518638&r1=518637&r2=518638 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Thu Mar 15 07:25:40 2007 @@ -30,6 +30,36 @@ private TransportConnector connector; private ActiveMQConnection connection; + public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(); + factory.setProducerWindowSize(1024*64); + connection = (ActiveMQConnection) factory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queueB); + + // Test sending to Queue A + // 1 few sends should not block until the producer window is used up. + fillQueue(queueA); + + // Test sending to Queue B it should not block since the connection should not be blocked. + CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); + assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); + + TextMessage msg = (TextMessage) consumer.receive(); + assertEquals("Message 1", msg.getText()); + msg.acknowledge(); + + pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); + assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); + + msg = (TextMessage) consumer.receive(); + assertEquals("Message 2", msg.getText()); + msg.acknowledge(); + } + public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception { ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(); factory.setAlwaysSyncSend(true);