Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 18611 invoked from network); 8 Mar 2007 18:44:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Mar 2007 18:44:28 -0000 Received: (qmail 33061 invoked by uid 500); 8 Mar 2007 18:44:36 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33041 invoked by uid 500); 8 Mar 2007 18:44:36 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 33032 invoked by uid 99); 8 Mar 2007 18:44:36 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Mar 2007 10:44:36 -0800 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Mar 2007 10:44:26 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 2E01C1A983A; Thu, 8 Mar 2007 10:44:06 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r516139 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQConnectionFactory.java ActiveMQMessageProducer.java ActiveMQSession.java Date: Thu, 08 Mar 2007 18:44:04 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070308184406.2E01C1A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Mar 8 10:44:03 2007 New Revision: 516139 URL: http://svn.apache.org/viewvc?view=rev&rev=516139 Log: Adding the client side bits needed to implement producer flow control using a window. Currently disabled since the server side bits still need implementing. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=516139&r1=516138&r2=516139 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Mar 8 10:44:03 2007 @@ -76,6 +76,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; @@ -156,6 +157,7 @@ // Maps ConsumerIds to ActiveMQConsumer objects private final ConcurrentHashMap dispatchers = new ConcurrentHashMap(); + private final ConcurrentHashMap producers = new ConcurrentHashMap(); private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); private final SessionId connectionSessionId; private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); @@ -168,6 +170,7 @@ private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); private BrokerInfo brokerInfo; private IOException firstFailureError; + private int producerWindowSize=ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; // Assume that protocol is the latest. Change to the actual protocol // version when a WireFormatInfo is received. @@ -1515,6 +1518,14 @@ Transport getTransport() { return transport; } + + public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { + producers.put(producerId, producer); + } + public void removeProducer(ProducerId producerId) { + producers.remove(producerId); + } + public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { dispatchers.put(consumerId, dispatcher); @@ -1546,6 +1557,12 @@ } dispatcher.dispatch(md); } + } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) { + ProducerAck pa = (ProducerAck) command; + ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); + if( producer!=null ) { + producer.onProducerAck(pa); + } } else if ( command.isBrokerInfo() ) { this.brokerInfo = (BrokerInfo)command; brokerInfoReceived.countDown(); @@ -2006,4 +2023,16 @@ public int getProtocolVersion() { return protocolVersion.get(); } + + + public int getProducerWindowSize() { + return producerWindowSize; + } + + + public void setProducerWindowSize(int producerWindowSize) { + this.producerWindowSize = producerWindowSize; + } + + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=516139&r1=516138&r2=516139 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Thu Mar 8 10:44:03 2007 @@ -17,6 +17,24 @@ */ package org.apache.activemq; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.Context; + +import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; @@ -28,23 +46,6 @@ import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport.CompositeData; -import org.apache.activemq.blob.BlobTransferPolicy; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.naming.Context; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; /** * A ConnectionFactory is an an Administered object, and is used for creating @@ -60,6 +61,7 @@ public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; public static final String DEFAULT_USER = null; public static final String DEFAULT_PASSWORD = null; + public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; private IdGenerator clientIdGenerator; private String clientIDPrefix; @@ -90,6 +92,7 @@ private boolean alwaysSyncSend; private boolean useSyncSend=false; private boolean watchTopicAdvisories=true; + private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE; static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { public Thread newThread(Runnable run) { @@ -263,7 +266,7 @@ connection.setTransformer(getTransformer()); connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); connection.setWatchTopicAdvisories(watchTopicAdvisories); - + connection.setProducerWindowSize(producerWindowSize); transport.start(); if( clientID !=null ) @@ -590,7 +593,7 @@ props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled())); props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend())); - + props.setProperty("producerWindowSize", Integer.toString(producerWindowSize)); } public boolean isUseCompression() { @@ -749,4 +752,12 @@ public void setStatsEnabled(boolean statsEnabled){ this.factoryStats.setEnabled(statsEnabled); } + + synchronized public int getProducerWindowSize() { + return producerWindowSize; + } + + synchronized public void setProducerWindowSize(int producerWindowSize) { + this.producerWindowSize = producerWindowSize; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516139&r1=516138&r2=516139 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu Mar 8 10:44:03 2007 @@ -27,12 +27,16 @@ import javax.jms.MessageProducer; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.management.JMSProducerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.util.IntrospectionSupport; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicLong; /** @@ -83,12 +87,25 @@ private long defaultTimeToLive; private long startTime; private MessageTransformer transformer; + private UsageManager producerWindow; protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException { this.session = session; this.info = new ProducerInfo(producerId); + this.info.setWindowSize(session.connection.getProducerWindowSize()); + if (destination!=null && destination.getOptions() != null) { + HashMap options = new HashMap(destination.getOptions()); + IntrospectionSupport.setProperties(this.info, options, "producer."); + } this.info.setDestination(destination); + + // Enable producer window flow control if protocol > 3 and the window size > 0 + if( session.connection.getProtocolVersion()>=3 && this.info.getWindowSize()>0 ) { + producerWindow = new UsageManager("Producer Window: "+producerId); + producerWindow.setLimit(this.info.getWindowSize()); + } + this.disableMessageID = false; this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; @@ -470,7 +487,21 @@ message = transformedMessage; } } - this.session.send(this, dest, message, deliveryMode, priority, timeToLive); + + if( producerWindow!=null ) { + try { + producerWindow.waitForSpace(); + } catch (InterruptedException e) { + throw new JMSException("Send aborted due to thread interrupt."); + } + } + + int size = this.session.send(this, dest, message, deliveryMode, priority, timeToLive); + + if( producerWindow!=null ) { + producerWindow.increaseUsage(size); + } + stats.onMessage(); } @@ -524,5 +555,11 @@ public String toString() { return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }"; } + + public void onProducerAck(ProducerAck pa) { + if( this.producerWindow!=null ) { + this.producerWindow.decreaseUsage(pa.getSize()); + } + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516139&r1=516138&r2=516139 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Mar 8 10:44:03 2007 @@ -1478,6 +1478,7 @@ */ protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { this.producers.add(producer); + this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); } /** @@ -1488,6 +1489,7 @@ * @throws JMSException */ protected void removeProducer(ActiveMQMessageProducer producer) { + this.connection.removeProducer(producer.getProducerInfo().getProducerId()); this.producers.remove(producer); } @@ -1546,7 +1548,7 @@ * message expiration. * @throws JMSException */ - protected void send(ActiveMQMessageProducer producer, + protected int send(ActiveMQMessageProducer producer, ActiveMQDestination destination,Message message,int deliveryMode, int priority,long timeToLive) throws JMSException{ checkClosed(); @@ -1599,6 +1601,12 @@ }else{ this.connection.syncSendPacket(msg); } + + // Since we defer lots of the marshaling till we hit the wire, this might not + // provide and accurate size. We may change over to doing more aggressive marshaling, + // to get more accurate sizes.. this is more important once users start using producer window + // flow control. + return msg.getSize(); } }