Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91520F897 for ; Fri, 12 Apr 2013 23:03:13 +0000 (UTC) Received: (qmail 20373 invoked by uid 500); 12 Apr 2013 23:03:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 20309 invoked by uid 500); 12 Apr 2013 23:03:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 20296 invoked by uid 99); 12 Apr 2013 23:03:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Apr 2013 23:03:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Apr 2013 23:03:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8087323889D5; Fri, 12 Apr 2013 23:02:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1467510 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java Date: Fri, 12 Apr 2013 23:02:48 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130412230248.8087323889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri Apr 12 23:02:48 2013 New Revision: 1467510 URL: http://svn.apache.org/r1467510 Log: fix and test for: https://issues.apache.org/jira/browse/AMQ-4469 Rewrote the unit test as a JUnit 4 test and remove hard coded 61616 port dep. Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java (with props) Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=1467510&r1=1467509&r2=1467510&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Fri Apr 12 23:02:48 2013 @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ServerSocketFactory; @@ -113,7 +114,7 @@ public class TcpTransportServer extends * The maximum number of sockets allowed for this server */ protected int maximumConnections = Integer.MAX_VALUE; - protected int currentTransportCount=0; + protected AtomicInteger currentTransportCount = new AtomicInteger(); public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); @@ -177,6 +178,7 @@ public class TcpTransportServer extends * * @param brokerInfo */ + @Override public void setBrokerInfo(BrokerInfo brokerInfo) { } @@ -267,6 +269,7 @@ public class TcpTransportServer extends /** * pull Sockets from the ServerSocket */ + @Override public void run() { while (!isStopped()) { Socket socket = null; @@ -312,6 +315,7 @@ public class TcpTransportServer extends /** * @return pretty print of this */ + @Override public String toString() { return "" + getBindLocation(); } @@ -337,9 +341,11 @@ public class TcpTransportServer extends return result; } + @Override protected void doStart() throws Exception { if(useQueueForAccept) { Runnable run = new Runnable() { + @Override public void run() { try { while (!isStopped() && !isStopping()) { @@ -355,9 +361,7 @@ public class TcpTransportServer extends onAcceptError(e); } } - } - }; socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), @@ -367,9 +371,9 @@ public class TcpTransportServer extends socketHandlerThread.start(); } super.doStart(); - } + @Override protected void doStop(ServiceStopper stopper) throws Exception { super.doStop(stopper); if (serverSocket != null) { @@ -377,13 +381,14 @@ public class TcpTransportServer extends } } + @Override public InetSocketAddress getSocketAddress() { return (InetSocketAddress)serverSocket.getLocalSocketAddress(); } protected final void handleSocket(Socket socket) { try { - if (this.currentTransportCount >= this.maximumConnections) { + if (this.currentTransportCount.get() >= this.maximumConnections) { throw new ExceededMaximumConnectionsException("Exceeded the maximum " + "number of allowed client connections. See the 'maximumConnections' " + "property on the TCP transport configuration URI in the ActiveMQ " + @@ -416,6 +421,7 @@ public class TcpTransportServer extends transportFactory.serverConfigure( transport, format, options); getAcceptListener().onAccept(configuredTransport); + currentTransportCount.incrementAndGet(); } } catch (SocketTimeoutException ste) { // expect this to happen @@ -427,7 +433,6 @@ public class TcpTransportServer extends onAcceptError(e); } } - } public int getSoTimeout() { @@ -468,12 +473,13 @@ public class TcpTransportServer extends this.maximumConnections = maximumConnections; } + @Override public void started(Service service) { - this.currentTransportCount++; } + @Override public void stopped(Service service) { - this.currentTransportCount--; + this.currentTransportCount.decrementAndGet(); } @Override Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java?rev=1467510&view=auto ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java (added) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java Fri Apr 12 23:02:48 2013 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jms.support.JmsUtils; + +public class AMQ4469Test { + + private static final int maxConnections = 100; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + + @Before + public void setUp() throws Exception { + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + connector = service.addConnector("tcp://0.0.0.0:0?maximumConnections="+maxConnections); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testMaxConnectionControl() throws Exception { + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + for(int i = 0; i < maxConnections + 20; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + Connection conn = null; + try { + startupLatch.await(); + conn = cf.createConnection(); + conn.start(); + } catch (Exception e) { + e.printStackTrace(); + JmsUtils.closeConnection(conn); + } + } + }); + } + + TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); + // ensure the max connections is in effect + assertEquals(maxConnections, transportServer.getMaximumConnections()); + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + TimeUnit.SECONDS.sleep(5); + + final TransportConnector connector = this.connector; + + // Expect the max connections is created + assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == maxConnections; + } + }) + ); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + + service.stop(); + service.waitUntilStopped(); + } +} Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java ------------------------------------------------------------------------------ svn:eol-style = native