Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 96940 invoked from network); 4 Sep 2008 13:35:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Sep 2008 13:35:01 -0000 Received: (qmail 14665 invoked by uid 500); 4 Sep 2008 13:34:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 14614 invoked by uid 500); 4 Sep 2008 13:34:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 14602 invoked by uid 99); 4 Sep 2008 13:34:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Sep 2008 06:34:59 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Sep 2008 13:34:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 69BF023889FF; Thu, 4 Sep 2008 06:34:40 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r692009 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: transport/tcp/TcpTransportServer.java util/ServiceListener.java util/ServiceSupport.java Date: Thu, 04 Sep 2008 13:34:39 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080904133440.69BF023889FF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Thu Sep 4 06:34:39 2008 New Revision: 692009 URL: http://svn.apache.org/viewvc?rev=692009&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1928 Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=692009&r1=692008&r2=692009&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Thu Sep 4 06:34:39 2008 @@ -33,6 +33,7 @@ import javax.net.ServerSocketFactory; +import org.apache.activemq.Service; import org.apache.activemq.ThreadPriorities; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.openwire.OpenWireFormatFactory; @@ -41,7 +42,9 @@ import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceListener; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; import org.apache.commons.logging.Log; @@ -54,7 +57,7 @@ * @version $Revision: 1.1 $ */ -public class TcpTransportServer extends TransportServerThreadSupport { +public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{ private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); protected ServerSocket serverSocket; @@ -64,7 +67,7 @@ protected long maxInactivityDuration = 30000; protected int minmumWireFormatVersion; protected boolean useQueueForAccept=true; - + /** * trace=true -> the Transport stack where this TcpTransport * object will be, will have a TransportLogger layer @@ -104,6 +107,11 @@ protected final ServerSocketFactory serverSocketFactory; protected BlockingQueue socketQueue = new LinkedBlockingQueue(); protected Thread socketHandlerThread; + /** + * The maximum number of sockets allowed for this server + */ + protected int maximumConnections = Integer.MAX_VALUE; + protected int currentTransportCount=0; public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); @@ -287,7 +295,7 @@ * @return * @throws IOException */ - protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new TcpTransport(format, socket); } @@ -360,8 +368,11 @@ this.transportOptions = transportOptions; } - protected void handleSocket(Socket socket) { + protected final void handleSocket(Socket socket) { try { + if (this.currentTransportCount >= this.maximumConnections) { + + }else { HashMap options = new HashMap(); options.put("maxInactivityDuration", Long .valueOf(maxInactivityDuration)); @@ -380,9 +391,13 @@ options.putAll(transportOptions); WireFormat format = wireFormatFactory.createWireFormat(); Transport transport = createTransport(socket, format); + if (transport instanceof ServiceSupport) { + ((ServiceSupport) transport).addServiceListener(this); + } Transport configuredTransport = transportFactory.serverConfigure( transport, format, options); getAcceptListener().onAccept(configuredTransport); + } } catch (SocketTimeoutException ste) { // expect this to happen } catch (Exception e) { @@ -393,6 +408,7 @@ onAcceptError(e); } } + } public int getSoTimeout() { @@ -418,4 +434,27 @@ public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } + + /** + * @return the maximumConnections + */ + public int getMaximumConnections() { + return maximumConnections; + } + + /** + * @param maximumConnections the maximumConnections to set + */ + public void setMaximumConnections(int maximumConnections) { + this.maximumConnections = maximumConnections; + } + + + public void started(Service service) { + this.currentTransportCount++; + } + + public void stopped(Service service) { + this.currentTransportCount--; + } } \ No newline at end of file Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java?rev=692009&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java Thu Sep 4 06:34:39 2008 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import org.apache.activemq.Service; + +/** + * A listener for service start, stop events + * + * @version $Revision: 1.1 $ + */ +public interface ServiceListener{ + + public void started(Service service); + + public void stopped(Service service); +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java?rev=692009&r1=692008&r2=692009&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java Thu Sep 4 06:34:39 2008 @@ -16,6 +16,8 @@ */ package org.apache.activemq.util; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Service; @@ -34,6 +36,7 @@ private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean stopping = new AtomicBoolean(false); private AtomicBoolean stopped = new AtomicBoolean(false); + private ListserviceListeners = new CopyOnWriteArrayList(); public static void dispose(Service service) { try { @@ -52,6 +55,9 @@ } finally { started.set(success); } + for(ServiceListener l:this.serviceListeners) { + l.started(this); + } } } @@ -67,6 +73,9 @@ stopped.set(true); started.set(false); stopping.set(false); + for(ServiceListener l:this.serviceListeners) { + l.stopped(this); + } stopper.throwFirstException(); } } @@ -91,6 +100,14 @@ public boolean isStopped() { return stopped.get(); } + + public void addServiceListener(ServiceListener l) { + this.serviceListeners.add(l); + } + + public void removeServiceListener(ServiceListener l) { + this.serviceListeners.remove(l); + } protected abstract void doStop(ServiceStopper stopper) throws Exception;