qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1695267 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-core/src/test/java/org/apache/qpid/server/model/port/ broker-plugins/management-http...
Date Tue, 11 Aug 2015 11:38:10 GMT
Author: kwall
Date: Tue Aug 11 11:38:09 2015
New Revision: 1695267

URL: http://svn.apache.org/r1695267
Log:
QPID-6683: [Java Broker] Give HTTP management ports separate individually configurable thread
pools

* Acceptor threads no longer defaults to a value derived from the number of cores.

Work by Lorenz Quack <quack.lorenz@gmail.com> and Keith Wall <kwall@apache.org>

Added:
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
Tue Aug 11 11:38:09 2015
@@ -286,6 +286,44 @@ abstract class AttributeValueConverter<T
             }
         }
     };
+
+    static final AttributeValueConverter<Double> DOUBLE_CONVERTER = new AttributeValueConverter<Double>()
+    {
+
+        @Override
+        public Double convert(final Object value, final ConfiguredObject object)
+        {
+            if(value instanceof Double)
+            {
+                return (Double) value;
+            }
+            else if(value instanceof Number)
+            {
+                return ((Number) value).doubleValue();
+            }
+            else if(value instanceof String)
+            {
+                String interpolated = AbstractConfiguredObject.interpolate(object, (String)
value);
+                try
+                {
+                    return Double.valueOf(interpolated);
+                }
+                catch(NumberFormatException e)
+                {
+                    throw new IllegalArgumentException("Cannot convert string '" + interpolated
+ "' to a Double",e);
+                }
+            }
+            else if(value == null)
+            {
+                return null;
+            }
+            else
+            {
+                throw new IllegalArgumentException("Cannot convert type " + value.getClass()
+ " to a Double");
+            }
+        }
+    };
+
     static final AttributeValueConverter<Boolean> BOOLEAN_CONVERTER = new AttributeValueConverter<Boolean>()
     {
 
@@ -465,6 +503,10 @@ abstract class AttributeValueConverter<T
         {
             return (AttributeValueConverter<X>) LONG_CONVERTER;
         }
+        else if(type == Double.class)
+        {
+            return (AttributeValueConverter<X>) DOUBLE_CONVERTER;
+        }
         else if(type == Boolean.class)
         {
             return (AttributeValueConverter<X>) BOOLEAN_CONVERTER;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
Tue Aug 11 11:38:09 2015
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
@@ -32,16 +33,19 @@ import org.apache.qpid.server.model.Trus
 @ManagedObject( category = false, type = "HTTP")
 public interface HttpPort<X extends HttpPort<X>> extends ClientAuthCapablePort<X>
 {
-    String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
-    String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
+    String THREAD_POOL_MINIMUM = "threadPoolMinimum";
+    String THREAD_POOL_MAXIMUM = "threadPoolMaximum";
+
+    String DEFAULT_HTTP_NEED_CLIENT_AUTH = "false";
+    String DEFAULT_HTTP_WANT_CLIENT_AUTH = "false";
 
     @ManagedAttribute(defaultValue = "*")
     String getBindingAddress();
 
-    @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
+    @ManagedAttribute( defaultValue = DEFAULT_HTTP_NEED_CLIENT_AUTH)
     boolean getNeedClientAuth();
 
-    @ManagedAttribute( defaultValue = DEFAULT_AMQP_WANT_CLIENT_AUTH )
+    @ManagedAttribute( defaultValue = DEFAULT_HTTP_WANT_CLIENT_AUTH)
     boolean getWantClientAuth();
 
     @ManagedAttribute
@@ -59,4 +63,31 @@ public interface HttpPort<X extends Http
     Set<Protocol> getProtocols();
 
     void setPortManager(PortManager manager);
+
+    String PORT_HTTP_THREAD_POOL_MAXIMUM = "port.http.threadPool.maximum";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_HTTP_THREAD_POOL_MAXIMUM )
+    long DEFAULT_PORT_HTTP_THREAD_POOL_MAXIMUM = 24;
+
+    @ManagedAttribute( defaultValue = "${" + PORT_HTTP_THREAD_POOL_MAXIMUM + "}")
+    int getThreadPoolMaximum();
+
+    String PORT_HTTP_THREAD_POOL_MINIMUM = "port.http.threadPool.minimum";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_HTTP_THREAD_POOL_MINIMUM )
+    long DEFAULT_PORT_HTTP_THREAD_POOL_MINIMUM = 8;
+
+    @ManagedAttribute( defaultValue = "${" + PORT_HTTP_THREAD_POOL_MINIMUM + "}")
+    int getThreadPoolMinimum();
+
+    String PORT_HTTP_ADDITIONAL_INTERNAL_THREADS = "port.http.additionalInternalThreads";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_HTTP_ADDITIONAL_INTERNAL_THREADS )
+    long DEFAULT_PORT_HTTP_ADDITIONAL_INTERNAL_THREADS = 5;
+
+    String PORT_HTTP_MAXIMUM_QUEUED_REQUESTS = "port.http.maximumQueuedRequests";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_HTTP_MAXIMUM_QUEUED_REQUESTS )
+    long DEFAULT_PORT_HTTP_MAXIMUM_QUEUED_REQUESTS = 1000;
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
Tue Aug 11 11:38:09 2015
@@ -21,9 +21,11 @@
 package org.apache.qpid.server.model.port;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.State;
@@ -36,6 +38,12 @@ public class HttpPortImpl extends Abstra
     @ManagedAttributeField
     private String _bindingAddress;
 
+    @ManagedAttributeField
+    private int _threadPoolMaximum;
+
+    @ManagedAttributeField
+    private int _threadPoolMinimum;
+
     @ManagedObjectFactoryConstructor
     public HttpPortImpl(final Map<String, Object> attributes,
                         final Broker<?> broker)
@@ -56,6 +64,18 @@ public class HttpPortImpl extends Abstra
     }
 
     @Override
+    public int getThreadPoolMaximum()
+    {
+        return _threadPoolMaximum;
+    }
+
+    @Override
+    public int getThreadPoolMinimum()
+    {
+        return _threadPoolMinimum;
+    }
+
+    @Override
     protected State onActivate()
     {
         if(_portManager != null && _portManager.isActivationAllowed(this))
@@ -78,5 +98,58 @@ public class HttpPortImpl extends Abstra
             throw new IllegalConfigurationException(String.format("Cannot bind to port %d
and binding address '%s'. Port is already is use.",
                     getPort(), bindingAddress == null || "".equals(bindingAddress) ? "*"
: bindingAddress));
         }
+
+        if (_threadPoolMaximum < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Thread pool maximum %d
is too small. Must be greater than zero.", _threadPoolMaximum));
+        }
+        if (_threadPoolMinimum < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Thread pool minimum %d
is too small. Must be greater than zero.", _threadPoolMinimum));
+        }
+        if (_threadPoolMinimum > _threadPoolMaximum)
+        {
+            throw new IllegalConfigurationException(String.format("Thread pool minimum %d
cannot be greater than thread pool maximum %d.", _threadPoolMinimum, _threadPoolMaximum));
+        }
+
+        final double additionalInternalThreads = getContextValue(Integer.class, HttpPort.PORT_HTTP_ADDITIONAL_INTERNAL_THREADS);
+        if (additionalInternalThreads < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Number of additional internal
threads %d is too small. Must be greater than zero.", additionalInternalThreads));
+        }
+
+        final double maximumQueuedRequests = getContextValue(Integer.class, HttpPort.PORT_HTTP_MAXIMUM_QUEUED_REQUESTS);
+        if (maximumQueuedRequests < 1)
+        {
+            throw new IllegalConfigurationException(String.format("Number of additional internal
threads %d is too small. Must be greater than zero.", maximumQueuedRequests));
+        }
+    }
+
+    @Override
+    protected void validateChange(final ConfiguredObject<?> proxyForValidation, final
Set<String> changedAttributes)
+    {
+        super.validateChange(proxyForValidation, changedAttributes);
+        HttpPort changed = (HttpPort) proxyForValidation;
+        if (changedAttributes.contains(HttpPort.THREAD_POOL_MAXIMUM))
+        {
+            if (changed.getThreadPoolMaximum() < 1)
+            {
+                throw new IllegalConfigurationException(String.format("Thread pool maximum
%d is too small. Must be greater than zero.", getThreadPoolMaximum()));
+            }
+        }
+        if (changedAttributes.contains(HttpPort.THREAD_POOL_MINIMUM))
+        {
+            if (changed.getThreadPoolMaximum() < 1)
+            {
+                throw new IllegalConfigurationException(String.format("Thread pool minimum
%d is too small. Must be greater than zero.", getThreadPoolMinimum()));
+            }
+        }
+        if (changedAttributes.contains(HttpPort.THREAD_POOL_MAXIMUM) || changedAttributes.contains(HttpPort.THREAD_POOL_MINIMUM))
+        {
+            if (changed.getThreadPoolMinimum() > changed.getThreadPoolMaximum())
+            {
+                throw new IllegalConfigurationException(String.format("Thread pool minimum
%d cannot be greater than thread pool maximum %d.", changed.getThreadPoolMinimum(), changed.getThreadPoolMaximum()));
+            }
+        }
     }
 }

Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java?rev=1695267&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
(added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/port/HttpPortImplTest.java
Tue Aug 11 11:38:09 2015
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.model.port;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class HttpPortImplTest extends QpidTestCase
+{
+    private static final String AUTHENTICATION_PROVIDER_NAME = "test";
+
+    private TaskExecutor _taskExecutor;
+    private Broker _broker;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+        Model model = BrokerModel.getInstance();
+
+        _broker = mock(Broker.class);
+        when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
+        when(_broker.getChildExecutor()).thenReturn(_taskExecutor);
+        when(_broker.getModel()).thenReturn(model);
+        when(_broker.getEventLogger()).thenReturn(new EventLogger());
+        when(_broker.getCategoryClass()).thenReturn(Broker.class);
+        when(_broker.getSecurityManager()).thenReturn(new SecurityManager(_broker, false));
+
+        AuthenticationProvider<?> provider = mock(AuthenticationProvider.class);
+        when(provider.getName()).thenReturn(AUTHENTICATION_PROVIDER_NAME);
+        when(provider.getParent(Broker.class)).thenReturn(_broker);
+        when(provider.getMechanisms()).thenReturn(Arrays.asList("PLAIN"));
+        when(_broker.getChildren(AuthenticationProvider.class)).thenReturn(Collections.<AuthenticationProvider>singleton(
+                provider));
+        when(_broker.getChildByName(AuthenticationProvider.class, AUTHENTICATION_PROVIDER_NAME)).thenReturn(provider);
+
+    }
+
+    public void testCreateWithIllegalThreadPoolValues() throws Exception
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(HttpPort.PORT, 10000);
+        attributes.put(HttpPort.NAME, getTestName());
+        attributes.put(HttpPort.THREAD_POOL_MINIMUM, 51);
+        attributes.put(HttpPort.THREAD_POOL_MAXIMUM, 50);
+        attributes.put(HttpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+
+
+        HttpPortImpl port = new HttpPortImpl(attributes, _broker);
+        try
+        {
+            port.create();
+            fail("Creation should fail due to validation check");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // PASS
+        }
+    }
+
+    public void testChangeWithIllegalThreadPoolValues() throws Exception
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(HttpPort.PORT, 10000);
+        attributes.put(HttpPort.NAME, getTestName());
+        attributes.put(HttpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME);
+
+
+        HttpPortImpl port = new HttpPortImpl(attributes, _broker);
+        port.create();
+
+        final Map<String, Object> updates = new HashMap<>();
+        updates.put(HttpPort.THREAD_POOL_MINIMUM, 51);
+        updates.put(HttpPort.THREAD_POOL_MAXIMUM, 50);
+        try
+        {
+            port.setAttributes(updates);
+            fail("Change should fail due to validation check");
+        }
+        catch (IllegalConfigurationException e)
+        {
+            // PASS
+        }
+    }
+
+}

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
(original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
Tue Aug 11 11:38:09 2015
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +56,7 @@ import org.eclipse.jetty.servlet.Servlet
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,13 +144,12 @@ public class HttpManagement extends Abst
     {
         getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
 
-        Collection<Port<?>> httpPorts = getHttpPorts(getBroker().getPorts());
-        Map<Port<?>, Connector> connectors = new HashMap<>();
-        _server = createServer(httpPorts, connectors);
+        Collection<HttpPort<?>> httpPorts = getHttpPorts(getBroker().getPorts());
+        _server = createServer(httpPorts);
         try
         {
             _server.start();
-            logOperationalListenMessages(httpPorts, connectors);
+            logOperationalListenMessages(httpPorts);
         }
         catch (Exception e)
         {
@@ -186,7 +185,7 @@ public class HttpManagement extends Abst
         return _sessionTimeout;
     }
 
-    private Server createServer(Collection<Port<?>> ports, final Map<Port<?>,
Connector> connectors)
+    private Server createServer(Collection<HttpPort<?>> ports)
     {
         if (_logger.isInfoEnabled())
         {
@@ -195,75 +194,20 @@ public class HttpManagement extends Abst
         _allowPortActivation = true;
 
         Server server = new Server();
-
-        QueuedThreadPool threadPool = new QueuedThreadPool();
-        threadPool.setName("HttpManagement");
-        threadPool.setMaxQueued(getContextValue(Integer.class, JETTY_THREAD_POOL_MAX_QUEUED));
-        threadPool.setMaxThreads(getContextValue(Integer.class, JETTY_THREAD_POOL_MAX_THREADS));
-        threadPool.setMinThreads(getContextValue(Integer.class, JETTY_THREAD_POOL_MIN_THREADS));
-
-        server.setThreadPool(threadPool);
+        // All connectors will have their own thread pool, so we expect the server to need
none.
+        server.setThreadPool(new ZeroSizedThreadPool());
 
         int lastPort = -1;
-        for (Port<?> port : ports)
+        for (HttpPort<?> port : ports)
         {
-            if(port instanceof HttpPort)
+            if (!State.ACTIVE.equals(port.getDesiredState()))
             {
-
-                if (!State.ACTIVE.equals(port.getDesiredState()))
-                {
-                    continue;
-                }
-                ((HttpPort<?>)port).setPortManager(this);
-
-                if(port.getState() != State.ACTIVE)
-                {
-
-                    // TODO - RG - probably does nothing
-                    port.startAsync();
-                }
-                Connector connector = null;
-
-                Collection<Transport> transports = port.getTransports();
-                if (!transports.contains(Transport.SSL))
-                {
-                    final Port thePort = port;
-                    connector = new SelectChannelConnector()
-                                {
-                                    @Override
-                                    public void customize(final EndPoint endpoint, final
Request request) throws IOException
-                                    {
-                                        super.customize(endpoint, request);
-                                        request.setAttribute(PORT_SERVLET_ATTRIBUTE, thePort);
-                                    }
-                                };
-                }
-                else if (transports.contains(Transport.SSL))
-                {
-                    connector = createSslConnector((HttpPort<?>) port);
-                }
-                else
-                {
-                    throw new IllegalArgumentException("Unexpected transport on port "
-                                                       + port.getName()
-                                                       + ":"
-                                                       + transports);
-                }
-                lastPort = port.getPort();
-                String bindingAddress = ((HttpPort)port).getBindingAddress();
-                if (bindingAddress != null && !bindingAddress.trim().equals("") &&
!bindingAddress.trim().equals("*"))
-                {
-                    connector.setHost(bindingAddress.trim());
-                }
-                connector.setPort(port.getPort());
-                server.addConnector(connector);
-                connectors.put(port, connector);
-            }
-            else
-            {
-                throw new IllegalArgumentException("Http management can only be added to
an Http port");
+                continue;
             }
 
+            SelectChannelConnector connector = createConnector(port);
+            server.addConnector(connector);
+            lastPort = port.getPort();
         }
 
         _allowPortActivation = false;
@@ -344,9 +288,72 @@ public class HttpManagement extends Abst
         return server;
     }
 
-    private Connector createSslConnector(final HttpPort<?> port)
+    private SelectChannelConnector createConnector(final HttpPort<?> port)
     {
-        final Connector connector;
+        port.setPortManager(this);
+
+        if(port.getState() != State.ACTIVE)
+        {
+            // TODO - RG - probably does nothing
+            port.startAsync();
+        }
+        SelectChannelConnector connector = null;
+
+        Collection<Transport> transports = port.getTransports();
+        if (!transports.contains(Transport.SSL))
+        {
+            final Port thePort = port;
+            connector = new SelectChannelConnector()
+                        {
+                            @Override
+                            public void customize(final EndPoint endpoint, final Request
request) throws IOException
+                            {
+                                super.customize(endpoint, request);
+                                request.setAttribute(PORT_SERVLET_ATTRIBUTE, thePort);
+                            }
+                        };
+        }
+        else if (transports.contains(Transport.SSL))
+        {
+            connector = createSslConnector(port);
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unexpected transport on port "
+                                               + port.getName()
+                                               + ":"
+                                               + transports);
+        }
+        String bindingAddress = port.getBindingAddress();
+        if (bindingAddress != null && !bindingAddress.trim().equals("") &&
!bindingAddress.trim().equals("*"))
+        {
+            connector.setHost(bindingAddress.trim());
+        }
+        connector.setPort(port.getPort());
+
+
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        threadPool.setName("HttpManagement-" + port.getName());
+
+        int additionalInternalThreads = port.getContextValue(Integer.class,
+                                                             HttpPort.PORT_HTTP_ADDITIONAL_INTERNAL_THREADS);
+        int maximumQueueRequests = port.getContextValue(Integer.class, HttpPort.PORT_HTTP_MAXIMUM_QUEUED_REQUESTS);
+
+        int threadPoolMaximum = port.getThreadPoolMaximum();
+        int threadPoolMinimum = port.getThreadPoolMinimum();
+
+        threadPool.setMaxQueued(maximumQueueRequests);
+        threadPool.setMaxThreads(threadPoolMaximum + additionalInternalThreads);
+        threadPool.setMinThreads(threadPoolMinimum + additionalInternalThreads);
+
+        connector.setAcceptors(Math.max(1, threadPoolMaximum / 2));
+        connector.setThreadPool(threadPool);
+        return connector;
+    }
+
+    private SelectChannelConnector createSslConnector(final HttpPort<?> port)
+    {
+        final SelectChannelConnector connector;
         KeyStore keyStore = port.getKeyStore();
         Collection<TrustStore> trustStores = port.getTrustStores();
         if (keyStore == null)
@@ -530,14 +537,14 @@ public class HttpManagement extends Abst
         return "v"+String.valueOf(BrokerModel.MODEL_MAJOR_VERSION);
     }
 
-    private void logOperationalListenMessages(Collection<Port<?>> ports, final
Map<Port<?>, Connector> connectors)
+    private void logOperationalListenMessages(Collection<HttpPort<?>> ports)
     {
         for (Port port : ports)
         {
             Set<Transport> transports = port.getTransports();
             for (Transport transport: transports)
             {
-                getBroker().getEventLogger().message(ManagementConsoleMessages.LISTENING(Protocol.HTTP.name(),
transport.name(), connectors.get(port).getLocalPort()));
+                getBroker().getEventLogger().message(ManagementConsoleMessages.LISTENING(Protocol.HTTP.name(),
transport.name(), port.getPort()));
             }
         }
     }
@@ -552,14 +559,14 @@ public class HttpManagement extends Abst
     }
 
 
-    private Collection<Port<?>> getHttpPorts(Collection<Port<?>>
ports)
+    private Collection<HttpPort<?>> getHttpPorts(Collection<Port<?>>
ports)
     {
-        Collection<Port<?>> httpPorts = new HashSet<>();
+        Collection<HttpPort<?>> httpPorts = new HashSet<>();
         for (Port<?> port : ports)
         {
             if (port.getState() != State.ERRORED && port.getProtocols().contains(Protocol.HTTP))
             {
-                httpPorts.add(port);
+                httpPorts.add((HttpPort<?>) port);
             }
         }
         return httpPorts;
@@ -631,4 +638,35 @@ public class HttpManagement extends Abst
         }
     }
 
+    private static class ZeroSizedThreadPool implements ThreadPool
+    {
+        @Override
+        public boolean dispatch(final Runnable job)
+        {
+            throw new IllegalStateException("Job unexpectedly dispatched to server thread
pool. Cannot dispatch");
+        }
+
+        @Override
+        public void join() throws InterruptedException
+        {
+        }
+
+        @Override
+        public int getThreads()
+        {
+            return 0;
+        }
+
+        @Override
+        public int getIdleThreads()
+        {
+            return 0;
+        }
+
+        @Override
+        public boolean isLowOnThreads()
+        {
+            return false;
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java?rev=1695267&r1=1695266&r2=1695267&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java
(original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementConfiguration.java
Tue Aug 11 11:38:09 2015
@@ -61,20 +61,5 @@ public interface HttpManagementConfigura
     static final long DEFAULT_MAX_UPLOAD_SIZE = 100 * 1024;
 
 
-    String JETTY_THREAD_POOL_MAX_QUEUED = "jetty.threadPool.maxQueued";
-    @ManagedContextDefault( name = JETTY_THREAD_POOL_MAX_QUEUED)
-    static final long DEFAULT_JETTY_THREAD_POOL_MAX_QUEUED = 1000;
-
-    String JETTY_THREAD_POOL_MAX_THREADS = "jetty.threadPool.maxThreads";
-    @ManagedContextDefault( name = JETTY_THREAD_POOL_MAX_THREADS)
-    static final long DEFAULT_JETTY_THREAD_POOL_MAX_THREADS = 50;
-
-
-    String JETTY_THREAD_POOL_MIN_THREADS = "jetty.threadPool.minThreads";
-    @ManagedContextDefault( name = JETTY_THREAD_POOL_MIN_THREADS)
-    static final long DEFAULT_JETTY_THREAD_POOL_MIN_THREADS = 5;
-
-
-
     AuthenticationProvider getAuthenticationProvider(HttpServletRequest request);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message