activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r985201 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/FutureResponse.java main/java/org/apache/activemq/transport/RequestTimedOutIOException.java test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Date Fri, 13 Aug 2010 13:55:33 GMT
Author: gtully
Date: Fri Aug 13 13:55:32 2010
New Revision: 985201

URL: http://svn.apache.org/viewvc?rev=985201&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2867, addition to https://issues.apache.org/activemq/browse/AMQ-2507
- if no response received and timeout > 0 a RequestTimedOutIOException is thrown on the
client side

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
Fri Aug 13 13:55:32 2010
@@ -49,7 +49,11 @@ public class FutureResponse {
 
     public Response getResult(int timeout) throws IOException {
         try {
-            return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+            Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+            if (result == null && timeout > 0) {
+                throw new RequestTimedOutIOException();
+            }
+            return result;
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
         }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java?rev=985201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
Fri Aug 13 13:55:32 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * thrown when the timeout specified on a request expires before
+ * a reply or response is received
+ */
+public class RequestTimedOutIOException extends IOException {
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/RequestTimedOutIOException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=985201&r1=985200&r2=985201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Fri Aug 13 13:55:32 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.bugs;
 
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.DeliveryMode;
@@ -29,6 +30,7 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.RequestTimedOutIOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -50,7 +52,7 @@ public class JmsTimeoutTest extends Embe
 	        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
 	        final ActiveMQDestination queue = createDestination("testqueue");
 	        
-	        // we should not take longer than 5 seconds to return from send
+	        // we should not take longer than 10 seconds to return from send
 	        cx.setSendTimeout(10000);
 	        	
 	        Runnable r = new Runnable() {
@@ -64,18 +66,61 @@ public class JmsTimeoutTest extends Embe
 	                    TextMessage message = session.createTextMessage(createMessageText());
 	                    for(int count=0; count<messageCount; count++){
 	                    	producer.send(message);
-	                    	// Currently after the timeout producer just
-	                    	// returns but there is no way to know that
-	                    	// the send timed out
 	                    }	  
 	                    LOG.info("Done sending..");
-	                } catch (JMSException e) {
-	                    e.printStackTrace();
-	                    if (e instanceof ResourceAllocationException) {
+                    } catch (JMSException e) {
+                        if (e.getCause() instanceof RequestTimedOutIOException) {
 	                        exceptionCount.incrementAndGet();
+                        } else {
+                            e.printStackTrace();
+                        }
+	                    return;
+	                }
+
+	            }
+	        };
+	        cx.start();
+	        Thread producerThread = new Thread(r);
+	        producerThread.start();
+	        producerThread.join(30000);
+	        cx.close();
+	        // We should have a few timeout exceptions as memory store will fill up
+	        assertTrue("No exception from the broker", exceptionCount.get() > 0);
+	    }
+
+
+        /**
+	     * Test the case where the broker is blocked due to a memory limit
+	     * with a fail timeout
+	     * @throws Exception
+	     */
+	    public void testBlockedProducerUsageSendFailTimeout() throws Exception {
+	        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+	        final ActiveMQDestination queue = createDestination("testqueue");
+
+            broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+	        Runnable r = new Runnable() {
+	            public void run() {
+	                try {
+	                	LOG.info("Sender thread starting");
+	                    Session session = cx.createSession(false, 1);
+	                    MessageProducer producer = session.createProducer(queue);
+	                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+	                    TextMessage message = session.createTextMessage(createMessageText());
+	                    for(int count=0; count<messageCount; count++){
+	                    	producer.send(message);
 	                    }
+	                    LOG.info("Done sending..");
+                    } catch (JMSException e) {
+                        if (e instanceof ResourceAllocationException || e.getCause() instanceof
RequestTimedOutIOException) {
+	                        exceptionCount.incrementAndGet();
+                        } else {
+                            e.printStackTrace();
+                        }
 	                    return;
 	                }
+
 	            }
 	        };
 	        cx.start();
@@ -88,11 +133,12 @@ public class JmsTimeoutTest extends Embe
 	    }
 
 	    protected void setUp() throws Exception {
+            exceptionCount.set(0);
 	        bindAddress = "tcp://localhost:61616";
 	        broker = createBroker();
 	        broker.setDeleteAllMessagesOnStartup(true);
 	        broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
-	        broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+
 	        super.setUp();
 	    }
 



Mime
View raw message