activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r774421 - in /activemq/trunk: ./ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-optional/ activemq-optional/src/main/java/org/apache/activemq/transport/http/ activemq-optional/src/test/java/org/apache/activemq/transport/ht...
Date Wed, 13 May 2009 16:43:41 GMT
Author: dejanb
Date: Wed May 13 16:43:31 2009
New Revision: 774421

URL: http://svn.apache.org/viewvc?rev=774421&view=rev
Log:
partial fix for https://issues.apache.org/activemq/browse/AMQ-2238 - http transport improvements

Added:
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-optional/pom.xml
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Wed May 13 16:43:31 2009
@@ -217,7 +217,6 @@
                             }
                         }
                     };
-                    startThread.setPriority(4);
                     startThread.start();
                 } catch (Exception e) {
                     String remoteHost = transport.getRemoteAddress();

Modified: activemq/trunk/activemq-optional/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/pom.xml?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/pom.xml (original)
+++ activemq/trunk/activemq-optional/pom.xml Wed May 13 16:43:31 2009
@@ -159,10 +159,6 @@
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
-          <excludes>
-            <exclude>**/http/*</exclude>
-            <exclude>**/https/*</exclude>
-          </excludes>
         </configuration>
       </plugin>
 

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
Wed May 13 16:43:31 2009
@@ -21,6 +21,7 @@
 import java.io.InterruptedIOException;
 import java.net.URI;
 
+import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.util.ByteArrayInputStream;
@@ -29,10 +30,15 @@
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.NoHttpResponseException;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.HeadMethod;
+import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
 import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpClientParams;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -54,7 +60,8 @@
 
     private final String clientID = CLIENT_ID_GENERATOR.generateId();
     private boolean trace;
-
+    private GetMethod httpMethod;
+    
     public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
         super(wireFormat, remoteUrl);
     }
@@ -68,23 +75,30 @@
         if (isStopped()) {
             throw new IOException("stopped.");
         }
-
         PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
         configureMethod(httpMethod);
         String data = getTextWireFormat().marshalText(command);
         byte[] bytes = data.getBytes("UTF-8");
-        httpMethod.setRequestBody(new ByteArrayInputStream(bytes));
+        InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes));
+        httpMethod.setRequestEntity(entity);
 
         try {
 
             HttpClient client = getSendHttpClient();
-            client.setTimeout(MAX_CLIENT_TIMEOUT);
+            HttpClientParams params = new HttpClientParams();
+            params.setSoTimeout(MAX_CLIENT_TIMEOUT);
+            client.setParams(params);
             int answer = client.executeMethod(httpMethod);
             if (answer != HttpStatus.SC_OK) {
                 throw new IOException("Failed to post command: " + command + " as response
was: " + answer);
             }
-
-            // checkSession(httpMethod);
+            if (command instanceof ShutdownInfo) {
+            	try {
+            		stop();
+            	} catch (Exception e) {
+            		LOG.warn("Error trying to stop HTTP client: "+ e, e);
+            	}
+            }
         } catch (IOException e) {
             throw IOExceptionSupport.create("Could not post command: " + command + " due
to: " + e, e);
         } finally {
@@ -105,7 +119,7 @@
 
         while (!isStopped() && !isStopping()) {
 
-            GetMethod httpMethod = new GetMethod(remoteUrl.toString());
+            httpMethod = new GetMethod(remoteUrl.toString());
             configureMethod(httpMethod);
 
             try {
@@ -124,7 +138,6 @@
                         break;
                     }
                 } else {
-                    // checkSession(httpMethod);
                     DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
                     Object command = (Object)getTextWireFormat().unmarshal(stream);
                     if (command == null) {
@@ -137,7 +150,6 @@
                 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl
+ " Reason: " + e.getMessage(), e));
                 break;
             } finally {
-                httpMethod.getResponseBody();
                 httpMethod.releaseConnection();
             }
         }
@@ -187,6 +199,7 @@
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
+    	httpMethod.abort();
     }
 
     protected HttpClient createHttpClient() {
@@ -209,16 +222,4 @@
         this.trace = trace;
     }
 
-    // protected void checkSession(HttpMethod client) {
-    // Header header = client.getRequestHeader("Set-Cookie");
-    // if (header != null) {
-    // String set_cookie = header.getValue();
-    //
-    // if (set_cookie != null && set_cookie.startsWith("JSESSIONID=")) {
-    // String[] bits = set_cookie.split("[=;]");
-    // sessionID = bits[1];
-    // }
-    // }
-    // }
-
 }

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
Wed May 13 16:43:31 2009
@@ -175,12 +175,20 @@
             answer = createTransportChannel();
             clients.put(clientID, answer);
             listener.onAccept(answer);
+            //wait for the transport to connect
+            while (!answer.isConnected()) {
+            	try {
+            		Thread.sleep(100);
+            	} catch (InterruptedException ignore) {
+            	}
+            }
             return answer;
         }
     }
 
     protected BlockingQueueTransport createTransportChannel() {
-        return new BlockingQueueTransport(new ArrayBlockingQueue(10));
+       // return new BlockingQueueTransport(new LinkedBlockingQueue<Object>());
+    	 return new BlockingQueueTransport(new ArrayBlockingQueue<Object>(10));
     }
 
     protected TextWireFormat createWireFormat() {

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java?rev=774421&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java
(added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java
Wed May 13 16:43:31 2009
@@ -0,0 +1,63 @@
+package org.apache.activemq.transport.http;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class HttpClientReconnectTest extends TestCase {
+	
+	BrokerService broker;
+	ActiveMQConnectionFactory factory;
+
+	protected void setUp() throws Exception {
+		broker = new BrokerService();
+		broker.addConnector("http://localhost:61666?trace=true");
+		broker.setPersistent(false);
+		broker.setUseJmx(false);
+		broker.deleteAllMessages();
+		broker.start();
+		factory = new ActiveMQConnectionFactory("http://localhost:61666?trace=true");
+	}
+
+	protected void tearDown() throws Exception {
+		broker.stop();
+	}
+	
+	public void testReconnectClient() throws Exception {
+		for (int i = 0; i < 100; i++) {
+			sendAndReceiveMessage(i);
+		}
+	}
+	
+	private void sendAndReceiveMessage(int i) throws Exception {
+		Connection conn = factory.createConnection();
+		Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		conn.start();
+		Destination dest = new ActiveMQQueue("test");
+		MessageProducer producer = sess.createProducer(dest);
+		MessageConsumer consumer = sess.createConsumer(dest);
+		String messageText = "test " + i;
+		try {
+			producer.send(sess.createTextMessage(messageText));
+			TextMessage msg = (TextMessage)consumer.receive(1000);
+			assertEquals(messageText, msg.getText());
+		} finally {
+			producer.close();
+			consumer.close();
+			conn.close();
+			sess.close();
+		}
+	}
+	
+	
+
+}

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java
(original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java
Wed May 13 16:43:31 2009
@@ -16,8 +16,13 @@
  */
 package org.apache.activemq.transport.http;
 
+import java.net.URI;
+
 import junit.framework.Test;
 import junit.textui.TestRunner;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.transport.TransportBrokerTestSupport;
 
 public class HttpTransportBrokerTest extends TransportBrokerTestSupport {
@@ -29,12 +34,19 @@
     protected void setUp() throws Exception {
         maxWait = 2000;
         super.setUp();
+        Thread.sleep(500);
     }
+    
+	protected BrokerService createBroker() throws Exception {
+		BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false"));
+		connector = broker.addConnector(getBindLocation());
+		return broker;
+	}
 
-    protected void tearDown() throws Exception {
+	protected void tearDown() throws Exception {
         super.tearDown();
         // Give the jetty server enough time to shutdown before starting another one
-        Thread.sleep(300);
+        Thread.sleep(500);
     }
 
     public static Test suite() {

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
(original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
Wed May 13 16:43:31 2009
@@ -36,7 +36,7 @@
         //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
         super.setUp();
 
-        Thread.sleep(5000);
+        Thread.sleep(500);
     }
 
     public static Test suite() {

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=774421&r1=774420&r2=774421&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed May 13 16:43:31 2009
@@ -48,7 +48,7 @@
     <commons-collections-version>3.2.1</commons-collections-version>
     <openjpa-version>1.2.0</openjpa-version>
     <commons-dbcp-version>1.2.2</commons-dbcp-version>
-    <commons-httpclient-version>2.0.1</commons-httpclient-version>
+    <commons-httpclient-version>3.1</commons-httpclient-version>
     <commons-logging-version>1.1</commons-logging-version>
     <commons-pool-version>1.4</commons-pool-version>
     <commons-primitives-version>1.0</commons-primitives-version>



Mime
View raw message