activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1300727 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/
Date Wed, 14 Mar 2012 21:32:18 GMT
Author: chirino
Date: Wed Mar 14 21:32:18 2012
New Revision: 1300727

URL: http://svn.apache.org/viewvc?rev=1300727&view=rev
Log:
Fixes AMQ-3769: Support doing non-blocking sends that uses an async callback that gets notified
when the send has been received by the broker

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AsyncCallback.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1300727&r1=1300726&r2=1300727&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Mar 14 21:32:18 2012
@@ -24,13 +24,7 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -91,6 +85,8 @@ import org.apache.activemq.management.St
 import org.apache.activemq.state.CommandVisitorAdapter;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.failover.FailoverTransport;
@@ -1288,6 +1284,63 @@ public class ActiveMQConnection implemen
      * @return
      * @throws JMSException
      */
+    public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException
{
+        if(onComplete==null) {
+            syncSendPacket(command);
+        } else {
+            if (isClosed()) {
+                throw new ConnectionClosedException();
+            }
+            try {
+                this.transport.asyncRequest(command, new ResponseCallback() {
+                    @Override
+                    public void onCompletion(FutureResponse resp) {
+                        Response response;
+                        Throwable exception = null;
+                        try {
+                            response = resp.getResult();
+                            if (response.isException()) {
+                                ExceptionResponse er = (ExceptionResponse)response;
+                                exception = er.getException();
+                            }
+                        } catch (Exception e) {
+                            exception = e;
+                        }
+                        if(exception!=null) {
+                            if ( exception instanceof JMSException) {
+                                onComplete.onException((JMSException) exception);
+                            } else {
+                                if (isClosed()||closing.get()) {
+                                    LOG.debug("Received an exception but connection is closing");
+                                }
+                                JMSException jmsEx = null;
+                                try {
+                                    jmsEx = JMSExceptionSupport.create(exception);
+                                } catch(Throwable e) {
+                                    LOG.error("Caught an exception trying to create a JMSException
for " +exception,e);
+                                }
+                                //dispose of transport for security exceptions
+                                if (exception instanceof SecurityException){
+                                    Transport t = transport;
+                                    if (null != t){
+                                        ServiceSupport.dispose(t);
+                                    }
+                                }
+                                if (jmsEx !=null) {
+                                    onComplete.onException(jmsEx);
+                                }
+                            }
+                        } else {
+                            onComplete.onSuccess();
+                        }
+                    }
+                });
+            } catch (IOException e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+
     public Response syncSendPacket(Command command) throws JMSException {
         if (isClosed()) {
             throw new ConnectionClosedException();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=1300727&r1=1300726&r2=1300727&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Wed Mar 14 21:32:18 2012
@@ -209,6 +209,36 @@ public class ActiveMQMessageProducer ext
      * @since 1.1
      */
     public void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException {
+        this.send(destination, message, deliveryMode, priority, timeToLive, null);
+    }
+
+    public void send(Message message, AsyncCallback onComplete) throws JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive, onComplete);
+    }
+
+    public void send(Destination destination, Message message, AsyncCallback onComplete)
throws JMSException {
+        this.send(destination,
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive,
+                  onComplete);
+    }
+
+    public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback
onComplete) throws JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  deliveryMode,
+                  priority,
+                  timeToLive,
+                  onComplete);
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive, AsyncCallback onComplete) throws JMSException {
         checkClosed();
         if (destination == null) {
             if (info.getDestination() == null) {
@@ -244,7 +274,7 @@ public class ActiveMQMessageProducer ext
             }
         }
 
-        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
+        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,
sendTimeout, onComplete);
 
         stats.onMessage();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1300727&r1=1300726&r2=1300727&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Mar 14 21:32:18 2012
@@ -1702,6 +1702,7 @@ public class ActiveMQSession implements 
     /**
      * Sends the message for dispatch by the broker.
      *
+     *
      * @param producer - message producer.
      * @param destination - message destination.
      * @param message - message to be sent.
@@ -1709,10 +1710,11 @@ public class ActiveMQSession implements 
      * @param priority - message priority.
      * @param timeToLive - message expiration.
      * @param producerWindow
+     * @param onComplete
      * @throws JMSException
      */
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination,
Message message, int deliveryMode, int priority, long timeToLive,
-                        MemoryUsage producerWindow, int sendTimeout) throws JMSException
{
+                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete)
throws JMSException {
 
         checkClosed();
         if (destination.isTemporary() && connection.isDeleted(destination)) {
@@ -1763,7 +1765,7 @@ public class ActiveMQSession implements 
             if (LOG.isTraceEnabled()) {
                 LOG.trace(getSessionId() + " sending message: " + msg);
             }
-            if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend()
&& (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
+            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired()
&& !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend()
|| txid != null)) {
                 this.connection.asyncSendPacket(msg);
                 if (producerWindow != null) {
                     // Since we defer lots of the marshaling till we hit the
@@ -1777,10 +1779,10 @@ public class ActiveMQSession implements 
                     producerWindow.increaseUsage(size);
                 }
             } else {
-                if (sendTimeout > 0) {
+                if (sendTimeout > 0 && onComplete==null) {
                     this.connection.syncSendPacket(msg,sendTimeout);
                 }else {
-                    this.connection.syncSendPacket(msg);
+                    this.connection.syncSendPacket(msg, onComplete);
                 }
             }
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AsyncCallback.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AsyncCallback.java?rev=1300727&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AsyncCallback.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AsyncCallback.java Wed
Mar 14 21:32:18 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.ExceptionListener;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AsyncCallback extends ExceptionListener {
+    public void onSuccess();
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java?rev=1300727&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
Wed Mar 14 21:32:18 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.*;
+import javax.jms.Message;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * 
+ */
+public class JmsSendWithAsyncCallbackTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+    
+
+    public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+
+        // setup a consumer to drain messages..
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+
+        // warmup...
+        for(int i=0; i < 10; i++) {
+            benchmarkNonCallbackRate();
+            benchmarkCallbackRate();
+        }
+
+        double callbackRate = benchmarkCallbackRate();
+        double nonCallbackRate = benchmarkNonCallbackRate();
+
+        System.out.println(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate));
+        System.out.println(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate));
+
+        // The async style HAS to be faster than the non-async style..
+        assertTrue( callbackRate/nonCallbackRate > 1.5 );
+    }
+
+    private double benchmarkNonCallbackRate() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+        int count = 1000;
+        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage("Hello"));
+        }
+        return 1000.0 * count / (System.currentTimeMillis() - start);
+    }
+
+    private double benchmarkCallbackRate() throws JMSException, InterruptedException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getName());
+        int count = 1000;
+        final CountDownLatch messagesSent = new CountDownLatch(count);
+        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage("Hello"), new AsyncCallback() {
+                @Override
+                public void onSuccess() {
+                    messagesSent.countDown();
+                }
+
+                @Override
+                public void onException(JMSException exception) {
+                    exception.printStackTrace();
+                }
+            });
+        }
+        messagesSent.await();
+        return 1000.0 * count / (System.currentTimeMillis() - start);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message