Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 53890 invoked from network); 4 Jul 2006 09:55:03 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 4 Jul 2006 09:55:03 -0000 Received: (qmail 62163 invoked by uid 500); 4 Jul 2006 09:55:03 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 62144 invoked by uid 500); 4 Jul 2006 09:55:03 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 62135 invoked by uid 99); 4 Jul 2006 09:55:03 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Jul 2006 02:55:02 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Jul 2006 02:55:02 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id D0FCA1A983A; Tue, 4 Jul 2006 02:54:41 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r418966 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/test/java/org/apache/activemq/jndi/ Date: Tue, 04 Jul 2006 09:54:40 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060704095441.D0FCA1A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jstrachan Date: Tue Jul 4 02:54:38 2006 New Revision: 418966 URL: http://svn.apache.org/viewvc?rev=418966&view=rev Log: fix for AMQ-792 to allow the async dispatch of messages to consumers to be easily configured & properly documented the javadoc. For more detail see http://activemq.org/site/consumer-dispatch-async.html Modified: incubator/activemq/trunk/activemq-core/ (props changed) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java Propchange: incubator/activemq/trunk/activemq-core/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Tue Jul 4 02:54:38 2006 @@ -9,7 +9,6 @@ bin junit*.properties *.iml -test +ActiveMQConnections.dot load.db sdbStoreTest.db -eclipse-classes Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=418966&r1=418965&r2=418966&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jul 4 02:54:38 2006 @@ -122,7 +122,7 @@ private boolean copyMessageOnSend = true; private boolean useCompression = false; private boolean objectMessageSerializationDefered = false; - protected boolean asyncDispatch = false; + protected boolean dispatchAsync = false; protected boolean alwaysSessionAsync=true; private boolean useAsyncSend = false; private boolean optimizeAcknowledge = false; @@ -274,7 +274,7 @@ ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE; return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)), - asyncDispatch,alwaysSessionAsync); + dispatchAsync,alwaysSessionAsync); } /** @@ -674,7 +674,7 @@ info.setSubcriptionName(subscriptionName); info.setSelector(messageSelector); info.setPrefetchSize(maxMessages); - info.setDispatchAsync(asyncDispatch); + info.setDispatchAsync(dispatchAsync); // Allows the options on the destination to configure the consumerInfo if( info.getDestination().getOptions()!=null ) { @@ -727,8 +727,9 @@ } /** - * @param prefetchPolicy - * The prefetchPolicy to set. + * Sets the prefetch + * policy for consumers created by this connection. */ public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { this.prefetchPolicy = prefetchPolicy; @@ -1031,7 +1032,7 @@ info.setSelector(messageSelector); info.setPrefetchSize(maxMessages); info.setNoLocal(noLocal); - info.setDispatchAsync(asyncDispatch); + info.setDispatchAsync(dispatchAsync); // Allows the options on the destination to configure the consumerInfo if( info.getDestination().getOptions()!=null ) { @@ -1358,10 +1359,16 @@ /** - * @param alwaysSessionAsync The alwaysSessionAsync to set. + * If this flag is set then a separate thread is not used for dispatching + * messages for each Session in the Connection. However, a separate thread + * is always used if there is more than one session, or the session isn't in + * auto acknowledge or duplicates ok mode + * + * @param alwaysSessionAsync + * The alwaysSessionAsync to set. */ - public void setAlwaysSessionAsync(boolean alwaysSessionAsync){ - this.alwaysSessionAsync=alwaysSessionAsync; + public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { + this.alwaysSessionAsync = alwaysSessionAsync; } /** @@ -1603,12 +1610,28 @@ } - public boolean isAsyncDispatch() { - return asyncDispatch; + public boolean isDispatchAsync() { + return dispatchAsync; } - public void setAsyncDispatch(boolean asyncDispatch) { - this.asyncDispatch = asyncDispatch; + /** + * Enables or disables the default setting of whether or not consumers have + * their messages dispatched + * synchronously or asynchronously by the broker. + * + * For non-durable topics for example we typically dispatch synchronously by + * default to minimize context switches which boost performance. However + * sometimes its better to go slower to ensure that a single blocked + * consumer socket does not block delivery to other consumers. + * + * @param asyncDispatch + * If true then consumers created on this connection will default + * to having their messages dispatched asynchronously. The + * default value is false. + */ + public void setDispatchAsync(boolean asyncDispatch) { + this.dispatchAsync = asyncDispatch; } public boolean isObjectMessageSerializationDefered() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=418966&r1=418965&r2=418966&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jul 4 02:54:38 2006 @@ -74,7 +74,7 @@ private boolean copyMessageOnSend = true; private boolean useCompression = false; private boolean objectMessageSerializationDefered = false; - protected boolean asyncDispatch = false; + protected boolean dispatchAsync = false; protected boolean alwaysSessionAsync=true; private boolean useAsyncSend = false; private boolean optimizeAcknowledge = false; @@ -227,7 +227,7 @@ connection.setCopyMessageOnSend(isCopyMessageOnSend()); connection.setUseCompression(isUseCompression()); connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); - connection.setAsyncDispatch(isAsyncDispatch()); + connection.setDispatchAsync(isDispatchAsync()); connection.setUseAsyncSend(isUseAsyncSend()); connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); @@ -337,6 +337,11 @@ return prefetchPolicy; } + /** + * Sets the prefetch + * policy for consumers created by this connection. + */ public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { this.prefetchPolicy = prefetchPolicy; } @@ -419,7 +424,7 @@ } public void populateProperties(Properties props) { - props.setProperty("asyncDispatch", Boolean.toString(isAsyncDispatch())); + props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); if (getBrokerURL() != null) { props.setProperty(Context.PROVIDER_URL, getBrokerURL()); @@ -472,12 +477,28 @@ this.objectMessageSerializationDefered = objectMessageSerializationDefered; } - public boolean isAsyncDispatch() { - return asyncDispatch; + public boolean isDispatchAsync() { + return dispatchAsync; } - public void setAsyncDispatch(boolean asyncDispatch) { - this.asyncDispatch = asyncDispatch; + /** + * Enables or disables the default setting of whether or not consumers have + * their messages dispatched + * synchronously or asynchronously by the broker. + * + * For non-durable topics for example we typically dispatch synchronously by + * default to minimize context switches which boost performance. However + * sometimes its better to go slower to ensure that a single blocked + * consumer socket does not block delivery to other consumers. + * + * @param asyncDispatch + * If true then consumers created on this connection will default + * to having their messages dispatched asynchronously. The + * default value is false. + */ + public void setDispatchAsync(boolean asyncDispatch) { + this.dispatchAsync = asyncDispatch; } /** Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=418966&r1=418965&r2=418966&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java Tue Jul 4 02:54:38 2006 @@ -75,6 +75,6 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); - return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch); + return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync); } } Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java?rev=418966&r1=418965&r2=418966&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java Tue Jul 4 02:54:38 2006 @@ -27,7 +27,7 @@ public void testConnectionFactory() throws Exception { // Create sample connection factory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); - factory.setAsyncDispatch(false); + factory.setDispatchAsync(false); factory.setBrokerURL("vm://test"); factory.setClientID("test"); factory.setCopyMessageOnSend(false); @@ -53,7 +53,7 @@ temp = (ActiveMQConnectionFactory)refFactory.getObjectInstance(ref, null, null, null); // Check settings - assertEquals(factory.isAsyncDispatch(), temp.isAsyncDispatch()); + assertEquals(factory.isDispatchAsync(), temp.isDispatchAsync()); assertEquals(factory.getBrokerURL(), temp.getBrokerURL()); assertEquals(factory.getClientID(), temp.getClientID()); assertEquals(factory.isCopyMessageOnSend(), temp.isCopyMessageOnSend());