Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 83965 invoked from network); 13 May 2009 16:44:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 13 May 2009 16:44:18 -0000 Received: (qmail 6770 invoked by uid 500); 13 May 2009 16:44:18 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 6719 invoked by uid 500); 13 May 2009 16:44:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 6710 invoked by uid 99); 13 May 2009 16:44:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 May 2009 16:44:18 +0000 X-ASF-Spam-Status: No, hits=-1998.5 required=10.0 tests=ALL_TRUSTED,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 May 2009 16:44:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BA1FF2388872; Wed, 13 May 2009 16:43:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r774421 - in /activemq/trunk: ./ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-optional/ activemq-optional/src/main/java/org/apache/activemq/transport/http/ activemq-optional/src/test/java/org/apache/activemq/transport/ht... Date: Wed, 13 May 2009 16:43:41 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090513164352.BA1FF2388872@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Wed May 13 16:43:31 2009 New Revision: 774421 URL: http://svn.apache.org/viewvc?rev=774421&view=rev Log: partial fix for https://issues.apache.org/activemq/browse/AMQ-2238 - http transport improvements Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java activemq/trunk/activemq-optional/pom.xml activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java activemq/trunk/pom.xml Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed May 13 16:43:31 2009 @@ -217,7 +217,6 @@ } } }; - startThread.setPriority(4); startThread.start(); } catch (Exception e) { String remoteHost = transport.getRemoteAddress(); Modified: activemq/trunk/activemq-optional/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/pom.xml?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-optional/pom.xml (original) +++ activemq/trunk/activemq-optional/pom.xml Wed May 13 16:43:31 2009 @@ -159,10 +159,6 @@ maven-surefire-plugin - - **/http/* - **/https/* - Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java (original) +++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java Wed May 13 16:43:31 2009 @@ -21,6 +21,7 @@ import java.io.InterruptedIOException; import java.net.URI; +import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.ByteArrayInputStream; @@ -29,10 +30,15 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.HttpMethodRetryHandler; import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.NoHttpResponseException; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.HeadMethod; +import org.apache.commons.httpclient.methods.InputStreamRequestEntity; import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +60,8 @@ private final String clientID = CLIENT_ID_GENERATOR.generateId(); private boolean trace; - + private GetMethod httpMethod; + public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); } @@ -68,23 +75,30 @@ if (isStopped()) { throw new IOException("stopped."); } - PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); configureMethod(httpMethod); String data = getTextWireFormat().marshalText(command); byte[] bytes = data.getBytes("UTF-8"); - httpMethod.setRequestBody(new ByteArrayInputStream(bytes)); + InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes)); + httpMethod.setRequestEntity(entity); try { HttpClient client = getSendHttpClient(); - client.setTimeout(MAX_CLIENT_TIMEOUT); + HttpClientParams params = new HttpClientParams(); + params.setSoTimeout(MAX_CLIENT_TIMEOUT); + client.setParams(params); int answer = client.executeMethod(httpMethod); if (answer != HttpStatus.SC_OK) { throw new IOException("Failed to post command: " + command + " as response was: " + answer); } - - // checkSession(httpMethod); + if (command instanceof ShutdownInfo) { + try { + stop(); + } catch (Exception e) { + LOG.warn("Error trying to stop HTTP client: "+ e, e); + } + } } catch (IOException e) { throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); } finally { @@ -105,7 +119,7 @@ while (!isStopped() && !isStopping()) { - GetMethod httpMethod = new GetMethod(remoteUrl.toString()); + httpMethod = new GetMethod(remoteUrl.toString()); configureMethod(httpMethod); try { @@ -124,7 +138,6 @@ break; } } else { - // checkSession(httpMethod); DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); Object command = (Object)getTextWireFormat().unmarshal(stream); if (command == null) { @@ -137,7 +150,6 @@ onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); break; } finally { - httpMethod.getResponseBody(); httpMethod.releaseConnection(); } } @@ -187,6 +199,7 @@ } protected void doStop(ServiceStopper stopper) throws Exception { + httpMethod.abort(); } protected HttpClient createHttpClient() { @@ -209,16 +222,4 @@ this.trace = trace; } - // protected void checkSession(HttpMethod client) { - // Header header = client.getRequestHeader("Set-Cookie"); - // if (header != null) { - // String set_cookie = header.getValue(); - // - // if (set_cookie != null && set_cookie.startsWith("JSESSIONID=")) { - // String[] bits = set_cookie.split("[=;]"); - // sessionID = bits[1]; - // } - // } - // } - } Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java (original) +++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java Wed May 13 16:43:31 2009 @@ -175,12 +175,20 @@ answer = createTransportChannel(); clients.put(clientID, answer); listener.onAccept(answer); + //wait for the transport to connect + while (!answer.isConnected()) { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + } return answer; } } protected BlockingQueueTransport createTransportChannel() { - return new BlockingQueueTransport(new ArrayBlockingQueue(10)); + // return new BlockingQueueTransport(new LinkedBlockingQueue()); + return new BlockingQueueTransport(new ArrayBlockingQueue(10)); } protected TextWireFormat createWireFormat() { Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java?rev=774421&view=auto ============================================================================== --- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java (added) +++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java Wed May 13 16:43:31 2009 @@ -0,0 +1,63 @@ +package org.apache.activemq.transport.http; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; + +public class HttpClientReconnectTest extends TestCase { + + BrokerService broker; + ActiveMQConnectionFactory factory; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.addConnector("http://localhost:61666?trace=true"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.deleteAllMessages(); + broker.start(); + factory = new ActiveMQConnectionFactory("http://localhost:61666?trace=true"); + } + + protected void tearDown() throws Exception { + broker.stop(); + } + + public void testReconnectClient() throws Exception { + for (int i = 0; i < 100; i++) { + sendAndReceiveMessage(i); + } + } + + private void sendAndReceiveMessage(int i) throws Exception { + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Destination dest = new ActiveMQQueue("test"); + MessageProducer producer = sess.createProducer(dest); + MessageConsumer consumer = sess.createConsumer(dest); + String messageText = "test " + i; + try { + producer.send(sess.createTextMessage(messageText)); + TextMessage msg = (TextMessage)consumer.receive(1000); + assertEquals(messageText, msg.getText()); + } finally { + producer.close(); + consumer.close(); + conn.close(); + sess.close(); + } + } + + + +} Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java (original) +++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java Wed May 13 16:43:31 2009 @@ -16,8 +16,13 @@ */ package org.apache.activemq.transport.http; +import java.net.URI; + import junit.framework.Test; import junit.textui.TestRunner; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.transport.TransportBrokerTestSupport; public class HttpTransportBrokerTest extends TransportBrokerTestSupport { @@ -29,12 +34,19 @@ protected void setUp() throws Exception { maxWait = 2000; super.setUp(); + Thread.sleep(500); } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false")); + connector = broker.addConnector(getBindLocation()); + return broker; + } - protected void tearDown() throws Exception { + protected void tearDown() throws Exception { super.tearDown(); // Give the jetty server enough time to shutdown before starting another one - Thread.sleep(300); + Thread.sleep(500); } public static Test suite() { Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java (original) +++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java Wed May 13 16:43:31 2009 @@ -36,7 +36,7 @@ //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); super.setUp(); - Thread.sleep(5000); + Thread.sleep(500); } public static Test suite() { Modified: activemq/trunk/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=774421&r1=774420&r2=774421&view=diff ============================================================================== --- activemq/trunk/pom.xml (original) +++ activemq/trunk/pom.xml Wed May 13 16:43:31 2009 @@ -48,7 +48,7 @@ 3.2.1 1.2.0 1.2.2 - 2.0.1 + 3.1 1.1 1.4 1.0