Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 52147 invoked from network); 23 Jul 2007 18:03:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Jul 2007 18:03:15 -0000 Received: (qmail 46009 invoked by uid 500); 23 Jul 2007 18:03:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 45984 invoked by uid 500); 23 Jul 2007 18:03:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 45975 invoked by uid 99); 23 Jul 2007 18:03:16 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jul 2007 11:03:16 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jul 2007 11:03:13 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 8ED081A981A; Mon, 23 Jul 2007 11:02:53 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r558814 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/ test/java/... Date: Mon, 23 Jul 2007 18:02:46 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070723180253.8ED081A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Jul 23 11:02:41 2007 New Revision: 558814 URL: http://svn.apache.org/viewvc?view=rev&rev=558814 Log: https://issues.apache.org/activemq/browse/AMQ-1337 - Broker should finish accepting connection in an async thread. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Jul 23 11:02:41 2007 @@ -1443,10 +1443,16 @@ * Returns the broker name if one is available or null if one is not available yet. */ public String getBrokerName() { - if (brokerInfo == null) { - return null; - } - return brokerInfo.getBrokerName(); + try { + brokerInfoReceived.await(5,TimeUnit.SECONDS); + if (brokerInfo == null) { + return null; + } + return brokerInfo.getBrokerName(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Jul 23 11:02:41 2007 @@ -143,10 +143,23 @@ this.server = server; this.brokerInfo.setBrokerURL(server.getConnectURI().toString()); this.server.setAcceptListener(new TransportAcceptListener() { - public void onAccept(Transport transport) { + public void onAccept(final Transport transport) { try { - Connection connection = createConnection(transport); - connection.start(); + // Starting the connection could block due to + // wireformat negociation, so start it in an async thread. + Thread startThread = new Thread("ActiveMQ Transport Initiator: "+transport.getRemoteAddress()) { + public void run() { + try { + Connection connection = createConnection(transport); + connection.start(); + } catch (Exception e) { + ServiceSupport.dispose(transport); + onAcceptError(e); + } + } + }; + startThread.setPriority(4); + startThread.start(); } catch (Exception e) { String remoteHost = transport.getRemoteAddress(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Mon Jul 23 11:02:41 2007 @@ -21,8 +21,8 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import org.apache.activemq.command.Command; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -52,10 +52,11 @@ protected boolean marshal; protected boolean network; protected boolean async=true; - protected AtomicBoolean started=new AtomicBoolean(); protected int asyncQueueDepth=2000; protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList()); protected LinkedBlockingQueue messageQueue=null; + protected boolean started; + protected final Object startMutex = new Object(); protected final URI location; protected final long id; private TaskRunner taskRunner; @@ -96,11 +97,15 @@ } protected void syncOneWay(Object command){ - final TransportListener tl=peer.transportListener; - prePeerSetQueue=peer.prePeerSetQueue; - if(tl==null){ - prePeerSetQueue.add(command); - }else{ + TransportListener tl=null; + synchronized(peer.startMutex){ + if( peer.started ) { + tl = peer.transportListener; + } else if(!peer.disposed) { + peer.prePeerSetQueue.add(command); + } + } + if( tl!=null ) { tl.onCommand(command); } } @@ -147,30 +152,33 @@ } public void start() throws Exception{ - if(started.compareAndSet(false,true)){ - if(transportListener==null) - throw new IOException("TransportListener not set."); - if(!async){ - for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ - Command command=(Command)iter.next(); - transportListener.onCommand(command); - iter.remove(); - } - }else{ - peer.wakeup(); - wakeup(); - } + if(transportListener==null) + throw new IOException("TransportListener not set."); + synchronized(startMutex) { + if( !prePeerSetQueue.isEmpty() ) { + for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ + Command command=(Command)iter.next(); + transportListener.onCommand(command); + } + prePeerSetQueue.clear(); + } + started = true; + if( isAsync() ) { + peer.wakeup(); + wakeup(); + } } } public void stop() throws Exception{ - if(started.compareAndSet(true,false)){ + synchronized(startMutex) { if(!disposed){ + started=false; disposed=true; - } - if(taskRunner!=null){ - taskRunner.shutdown(1000); - taskRunner=null; + if(taskRunner!=null){ + taskRunner.shutdown(1000); + taskRunner=null; + } } } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java Mon Jul 23 11:02:41 2007 @@ -36,23 +36,34 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { - public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException { + private ActiveMQConnection connection; + private BrokerService broker; + + public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese"); assertEquals("Cheese", cf.getClientIDPrefix()); - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - try { - connection.start(); + connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); - String clientID = connection.getClientID(); - log.info("Got client ID: " + clientID); + String clientID = connection.getClientID(); + log.info("Got client ID: " + clientID); - assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese")); - } - finally { - connection.close(); - } + assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese")); } + + protected void tearDown() throws Exception { + // Try our best to close any previously opend connection. + try { + connection.close(); + } catch (Throwable ignore) { + } + // Try our best to stop any previously started broker. + try { + broker.stop(); + } catch (Throwable ignore) { + } + } public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true"); @@ -88,26 +99,27 @@ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); // Make sure the broker is not created until the connection is instantiated. assertNull( BrokerRegistry.getInstance().lookup("localhost") ); - Connection connection = cf.createConnection(); + connection = (ActiveMQConnection) cf.createConnection(); // This should create the connection. assertNotNull(connection); // Verify the broker was created. assertNotNull( BrokerRegistry.getInstance().lookup("localhost") ); + connection.close(); + // Verify the broker was destroyed. assertNull( BrokerRegistry.getInstance().lookup("localhost") ); } public void testGetBrokerName() throws URISyntaxException, JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection = (ActiveMQConnection) cf.createConnection(); connection.start(); String brokerName = connection.getBrokerName(); log.info("Got broker name: " + brokerName); assertNotNull("No broker name available!", brokerName); - connection.close(); } public void testCreateTcpConnectionUsingAllocatedPort() throws Exception { @@ -143,7 +155,7 @@ protected void assertCreateConnection(String uri) throws Exception { // Start up a broker with a tcp connector. - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setPersistent(false); TransportConnector connector = broker.addConnector(uri); broker.start(); @@ -162,9 +174,8 @@ // This should create the connection. ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectURI); - Connection connection = cf.createConnection(); + connection = (ActiveMQConnection) cf.createConnection(); assertNotNull(connection); - connection.close(); broker.stop(); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Mon Jul 23 11:02:41 2007 @@ -17,6 +17,9 @@ */ package org.apache.activemq.broker; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -34,9 +37,6 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; -import java.util.ArrayList; -import java.util.concurrent.TimeUnit; - public class BrokerTest extends BrokerTestSupport { public ActiveMQDestination destination; @@ -45,6 +45,61 @@ public byte destinationType; public boolean durableConsumer; + public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() { + addCombinationValues( "deliveryMode", new Object[]{ + Integer.valueOf(DeliveryMode.NON_PERSISTENT), + Integer.valueOf(DeliveryMode.PERSISTENT)} ); + } + public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(1); + connection1.send(consumerInfo1); + + // Setup a second connection + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(1); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(consumerInfo2); + + // Send the messages + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + + for( int i=0; i < 2 ; i++ ) { + Message m1 = receiveMessage(connection1); + Message m2 = receiveMessage(connection2); + + assertNotNull("m1 is null for index: " + i, m1); + assertNotNull("m2 is null for index: " + i, m2); + + assertNotSame(m1.getMessageId(), m2.getMessageId()); + connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); + connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); + } + + assertNoMessagesLeft(connection1); + assertNoMessagesLeft(connection2); + } + + public void initCombosForTestQueuBrowserWith2Consumers() { addCombinationValues( "deliveryMode", new Object[]{ Integer.valueOf(DeliveryMode.NON_PERSISTENT), @@ -1339,61 +1394,7 @@ assertNoMessagesLeft(connection); } - - public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() { - addCombinationValues( "deliveryMode", new Object[]{ - Integer.valueOf(DeliveryMode.NON_PERSISTENT), - Integer.valueOf(DeliveryMode.PERSISTENT)} ); - } - public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception { - - ActiveMQDestination destination = new ActiveMQQueue("TEST"); - - // Setup a first connection - StubConnection connection1 = createConnection(); - ConnectionInfo connectionInfo1 = createConnectionInfo(); - SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); - ProducerInfo producerInfo = createProducerInfo(sessionInfo1); - connection1.send(connectionInfo1); - connection1.send(sessionInfo1); - connection1.send(producerInfo); - - ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); - consumerInfo1.setPrefetchSize(1); - connection1.send(consumerInfo1); - - // Setup a second connection - StubConnection connection2 = createConnection(); - ConnectionInfo connectionInfo2 = createConnectionInfo(); - SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); - ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); - consumerInfo2.setPrefetchSize(1); - connection2.send(connectionInfo2); - connection2.send(sessionInfo2); - connection2.send(consumerInfo2); - - // Send the messages - connection1.send(createMessage(producerInfo, destination, deliveryMode)); - connection1.send(createMessage(producerInfo, destination, deliveryMode)); - connection1.send(createMessage(producerInfo, destination, deliveryMode)); - connection1.send(createMessage(producerInfo, destination, deliveryMode)); - - for( int i=0; i < 2 ; i++ ) { - Message m1 = receiveMessage(connection1); - Message m2 = receiveMessage(connection2); - - assertNotNull("m1 is null for index: " + i, m1); - assertNotNull("m2 is null for index: " + i, m2); - - assertNotSame(m1.getMessageId(), m2.getMessageId()); - connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); - connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); - } - - assertNoMessagesLeft(connection1); - assertNoMessagesLeft(connection2); - } public void initCombosForTestQueueSendThenAddConsumer() { addCombinationValues( "deliveryMode", new Object[]{ Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Mon Jul 23 11:02:41 2007 @@ -117,6 +117,14 @@ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); connection2.send(consumerInfo); + // Give demand forwarding bridge a chance to finish forwarding the subscriptions. + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + // Send the message to the local boker. connection1.request(createMessage(producerInfo, destination, deliveryMode)); // Make sure the message was delivered via the remote. @@ -129,14 +137,7 @@ config.setBrokerName("local"); config.setDispatchAsync(false); bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport()); - bridge.start(); - - // PATCH: Give demand forwarding bridge a chance to finish setting up - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - ie.printStackTrace(); - } + bridge.start(); } protected void tearDown() throws Exception { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Mon Jul 23 11:02:41 2007 @@ -68,6 +68,13 @@ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); connection2.send(consumerInfo); Thread.sleep(1000); + // Give forwarding bridge a chance to finish setting up + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + // Send the message to the local boker. connection1.send(createMessage(producerInfo, destination, deliveryMode)); @@ -82,14 +89,7 @@ bridge = new ForwardingBridge(createTransport(), createRemoteTransport()); bridge.setClientId("local-remote-bridge"); bridge.setDispatchAsync(false); - bridge.start(); - - // PATCH: Give forwarding bridge a chance to finish setting up - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - ie.printStackTrace(); - } + bridge.start(); } protected void tearDown() throws Exception {