Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 71A8C180B9 for ; Wed, 17 Jun 2015 16:10:27 +0000 (UTC) Received: (qmail 57197 invoked by uid 500); 17 Jun 2015 16:10:27 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 57170 invoked by uid 500); 17 Jun 2015 16:10:27 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 57160 invoked by uid 99); 17 Jun 2015 16:10:27 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jun 2015 16:10:27 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 28EABAC00B4 for ; Wed, 17 Jun 2015 16:10:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1686063 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/ma... Date: Wed, 17 Jun 2015 16:10:26 -0000 To: commits@qpid.apache.org From: rgodfrey@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150617161027.28EABAC00B4@hades.apache.org> Author: rgodfrey Date: Wed Jun 17 16:10:25 2015 New Revision: 1686063 URL: http://svn.apache.org/r1686063 Log: QPID-6597 : [Java Broker] Move Connection to be a child of Port rather than VirtualHost (work by Rob Godfrey & Lorenz Quack) Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectOperation.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ManagedOperation.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original) +++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Wed Jun 17 16:10:25 2015 @@ -55,10 +55,7 @@ import org.apache.qpid.server.stats.Stat import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.HouseKeepingTask; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostPrincipal; +import org.apache.qpid.server.virtualhost.*; /** Object that represents the VirtualHost whilst the VirtualHostNode is in the replica role. The @@ -300,12 +297,18 @@ public class BDBHAReplicaVirtualHostImpl } @Override - public Collection getConnections() + public Collection> getConnections() { return Collections.emptyList(); } @Override + public Connection getConnection(String name) + { + return null; + } + + @Override public AMQQueue getQueue(final String name) { return null; @@ -561,4 +564,15 @@ public class BDBHAReplicaVirtualHostImpl + " does not permit this operation."); } + @Override + public void addConnectionAssociationListener(VirtualHostConnectionListener listener) + { + throwUnsupportedForReplica(); + } + + @Override + public void removeConnectionAssociationListener(VirtualHostConnectionListener listener) + { + throwUnsupportedForReplica(); + } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Wed Jun 17 16:10:25 2015 @@ -48,6 +48,7 @@ public final class BrokerModel extends M * Renamed FileTrustStore/FileKeyStore.path => FileTrustStore/FileKeyStore.storeUrl * 3.1 Add BrokerLogger as a child of Broker * Replace the defaultVirtualHost (at Broker) with defaultVirtualHostNode flag (at VHN) + * Make Connections children of Ports instead of VHosts */ public static final int MODEL_MAJOR_VERSION = 3; public static final int MODEL_MINOR_VERSION = 1; @@ -88,13 +89,13 @@ public final class BrokerModel extends M addRelationship(VirtualHostNode.class, RemoteReplicationNode.class); addRelationship(VirtualHost.class, VirtualHostLogger.class); - addRelationship(VirtualHost.class, Connection.class); addRelationship(VirtualHost.class, Exchange.class); addRelationship(VirtualHost.class, Queue.class); addRelationship(VirtualHostLogger.class, VirtualHostLoggerFilter.class); addRelationship(Port.class, VirtualHostAlias.class); + addRelationship(Port.class, Connection.class); addRelationship(AuthenticationProvider.class, User.class); addRelationship(AuthenticationProvider.class, PreferencesProvider.class); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectOperation.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectOperation.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectOperation.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectOperation.java Wed Jun 17 16:10:25 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Type; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -203,4 +204,14 @@ public class ConfiguredObjectOperation, Q extends Queue, E extends Exchange > extends ConfiguredObject @@ -164,8 +165,11 @@ public interface VirtualHost getExchangeTypeNames(); - Collection getConnections(); + @ManagedOperation(nonModifying = true) + Collection> getConnections(); + @ManagedOperation(nonModifying = true) + Connection getConnection(@Param(name="name") String name); void start(); @@ -206,4 +210,6 @@ public interface VirtualHost getVirtualHost() { return _underlyingConnection.getVirtualHost(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java Wed Jun 17 16:10:25 2015 @@ -229,20 +229,7 @@ abstract public class AbstractPort getConnections() { - return null; - } - - @Override - public Collection getChildren(Class clazz) - { - if(clazz == Connection.class) - { - return (Collection) getConnections(); - } - else - { - return super.getChildren(clazz); - } + return getChildren(Connection.class); } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED ) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Jun 17 16:10:25 2015 @@ -210,8 +210,7 @@ public class NonBlockingConnection imple public boolean doWork() { _protocolEngine.clearWork(); - final boolean closed = _closed.get(); - if (!closed) + if (!_closed.get()) { try { @@ -249,7 +248,9 @@ public class NonBlockingConnection imple } } } - else + + final boolean closed = _closed.get(); + if (closed) { shutdown(); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Jun 17 16:10:25 2015 @@ -36,17 +36,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -56,7 +53,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.DefaultDestination; @@ -108,6 +104,7 @@ public abstract class AbstractVirtualHos private final Set> _connections = newSetFromMap(new ConcurrentHashMap, Boolean>()); + private final Set _connectionAssociationListeners = new CopyOnWriteArraySet<>(); private static enum BlockingType { STORE, FILESYSTEM }; @@ -463,9 +460,23 @@ public abstract class AbstractVirtualHos return _messageStoreLogSubject; } - public Collection getConnections() + @Override + public Collection> getConnections() + { + return _connections; + } + + @Override + public Connection getConnection(String name) { - return getChildren(Connection.class); + for (Connection connection : _connections) + { + if (connection.getName().equals(name)) + { + return connection; + } + } + return null; } @Override @@ -499,10 +510,6 @@ public abstract class AbstractVirtualHos { throw new UnsupportedOperationException(); } - else if(childClass == Connection.class) - { - throw new UnsupportedOperationException(); - } else if(childClass == VirtualHostLogger.class) { return getObjectFactory().createAsync(childClass, attributes, this); @@ -858,14 +865,13 @@ public abstract class AbstractVirtualHos { setState(State.UNAVAILABLE); _virtualHostLoggersToClose = new ArrayList(getChildren(VirtualHostLogger.class)); - return super.beforeClose(); + //Stop Connections + return closeConnections(); } @Override protected void onClose() { - //Stop Connections - closeConnections("VirtualHost is closing"); _dtxRegistry.close(); closeMessageStore(); shutdownHouseKeeping(); @@ -880,7 +886,7 @@ public abstract class AbstractVirtualHos } - public void closeConnections(final String replyText) + public ListenableFuture closeConnections() { if (_logger.isDebugEnabled()) { @@ -891,6 +897,7 @@ public abstract class AbstractVirtualHos conn.getUnderlyingConnection().stop(); } + List> connectionCloseFutures = new ArrayList<>(); while (!_connections.isEmpty()) { Iterator> itr = _connections.iterator(); @@ -899,7 +906,7 @@ public abstract class AbstractVirtualHos Connection connection = itr.next(); try { - connection.getUnderlyingConnection().closeAsync(AMQConstant.CONNECTION_FORCED, replyText); + connectionCloseFutures.add(connection.closeAsync()); } catch (Exception e) { @@ -911,6 +918,15 @@ public abstract class AbstractVirtualHos } } } + ListenableFuture> combinedFuture = Futures.allAsList(connectionCloseFutures); + return Futures.transform(combinedFuture, new Function, Void>() + { + @Override + public Void apply(List voids) + { + return null; + } + }); } private void closeMessageStore() @@ -1402,35 +1418,31 @@ public abstract class AbstractVirtualHos @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) protected ListenableFuture doStop() { - final SettableFuture returnVal = SettableFuture.create(); final List loggers = new ArrayList<>(getChildren(VirtualHostLogger.class)); - closeChildren().addListener( - new Runnable() - { - @Override - public void run() - { - try - { - shutdownHouseKeeping(); - if(_networkConnectionScheduler != null) - { - _networkConnectionScheduler.close(); - _networkConnectionScheduler = null; - } - closeMessageStore(); - setState(State.STOPPED); - - stopLogging(loggers); - } - finally - { - returnVal.set(null); - } - } - }, getTaskExecutor().getExecutor() - ); - return returnVal; + return doAfter(closeConnections(), new Callable>() + { + @Override + public ListenableFuture call() throws Exception + { + return closeChildren(); + } + }).then(new Runnable() + { + @Override + public void run() + { + shutdownHouseKeeping(); + if (_networkConnectionScheduler != null) + { + _networkConnectionScheduler.close(); + _networkConnectionScheduler = null; + } + closeMessageStore(); + setState(State.STOPPED); + + stopLogging(loggers); + } + }); } private void stopLogging(Collection loggers) @@ -1677,8 +1689,6 @@ public abstract class AbstractVirtualHos @Override public void registerConnection(final Connection connection) { - childAdded(connection); - _connections.add(connection); AMQConnectionModel underlyingConnection = connection.getUnderlyingConnection(); @@ -1689,12 +1699,22 @@ public abstract class AbstractVirtualHos underlyingConnection.setScheduler(_networkConnectionScheduler); + for (VirtualHostConnectionListener listener : _connectionAssociationListeners) + { + listener.connectionAssociated(connection); + } } @Override public void deregisterConnection(final Connection connection) { - _connections.remove(connection); + if (_connections.remove(connection)) + { + for (VirtualHostConnectionListener listener : _connectionAssociationListeners) + { + listener.connectionRemoved(connection); + } + } } @@ -1796,6 +1816,19 @@ public abstract class AbstractVirtualHos scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); } } + + @Override + public void addConnectionAssociationListener(VirtualHostConnectionListener listener) + { + _connectionAssociationListeners.add(listener); + } + + @Override + public void removeConnectionAssociationListener(VirtualHostConnectionListener listener) + { + _connectionAssociationListeners.remove(listener); + } + private static class ChildCounter { private final AtomicInteger _count = new AtomicInteger(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Wed Jun 17 16:10:25 2015 @@ -56,10 +56,7 @@ import org.apache.qpid.server.stats.Stat import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.HouseKeepingTask; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostPrincipal; +import org.apache.qpid.server.virtualhost.*; @ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false ) class RedirectingVirtualHostImpl @@ -301,12 +298,18 @@ class RedirectingVirtualHostImpl } @Override - public Collection getConnections() + public Collection> getConnections() { return Collections.emptyList(); } @Override + public Connection getConnection(String name) + { + return null; + } + + @Override public AMQQueue getQueue(final String name) { return null; @@ -563,5 +566,15 @@ class RedirectingVirtualHostImpl + " does not permit this operation."); } + @Override + public void addConnectionAssociationListener(VirtualHostConnectionListener listener) + { + throwUnsupportedForRedirector(); + } + @Override + public void removeConnectionAssociationListener(VirtualHostConnectionListener listener) + { + throwUnsupportedForRedirector(); + } } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Wed Jun 17 16:10:25 2015 @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.Futures; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -137,7 +138,7 @@ public class VirtualHostTest extends Qpi assertEquals("Unexpected name", virtualHostName, virtualHost.getName()); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - verify(_configStore).update(eq(true),matchesRecord(virtualHost.getId(), virtualHost.getType())); + verify(_configStore).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType())); } public void testDeleteVirtualHost() @@ -230,23 +231,23 @@ public class VirtualHostTest extends Qpi assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); AMQConnectionModel connection = createMockProtocolConnection(virtualHost); - assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size()); + assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount()); - ConnectionAdapter modelConnection = new ConnectionAdapter(connection); - modelConnection.create(); + Connection modelConnection = mock(Connection.class); + when(modelConnection.getUnderlyingConnection()).thenReturn(connection); + when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null)); virtualHost.registerConnection(modelConnection); - assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren( - Connection.class).size()); + assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount()); virtualHost.stop(); assertEquals("Unexpected state", State.STOPPED, virtualHost.getState()); assertEquals("Unexpected number of connections after virtualhost stopped", 0, - virtualHost.getChildren(Connection.class).size()); + virtualHost.getConnectionCount()); - verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(modelConnection).closeAsync(); } public void testDeleteVirtualHost_ClosesConnections() @@ -258,23 +259,23 @@ public class VirtualHostTest extends Qpi AMQConnectionModel connection = createMockProtocolConnection(virtualHost); assertEquals("Unexpected number of connections before connection registered", - 0, - virtualHost.getChildren(Connection.class).size()); + 0, + virtualHost.getConnectionCount()); - ConnectionAdapter modelConnection = new ConnectionAdapter(connection); - modelConnection.create(); + Connection modelConnection = mock(Connection.class); + when(modelConnection.getUnderlyingConnection()).thenReturn(connection); virtualHost.registerConnection(modelConnection); - assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(Connection.class).size()); + assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount()); virtualHost.delete(); assertEquals("Unexpected state", State.DELETED, virtualHost.getState()); assertEquals("Unexpected number of connections after virtualhost deleted", - 0, - virtualHost.getChildren(Connection.class).size()); + 0, + virtualHost.getConnectionCount()); - verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(modelConnection).closeAsync(); } public void testCreateDurableQueue() Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Wed Jun 17 16:10:25 2015 @@ -132,6 +132,8 @@ public class ServerConnection extends Co _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); + _adapter = new ConnectionAdapter(this); + _adapter.create(); } public Object getReference() @@ -167,23 +169,20 @@ public class ServerConnection extends Co if (state == State.OPEN) { getEventLogger().message(ConnectionMessages.OPEN(getClientId(), - "0-10", - getClientVersion(), - getClientProduct(), - true, - true, - true, - true)); + "0-10", + getClientVersion(), + getClientProduct(), + true, + true, + true, + true)); - _adapter = new ConnectionAdapter(this); - _adapter.create(); _adapter.virtualHostAssociated(); - } if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) { - if(_adapter != null) + if(_virtualHost != null) { _virtualHost.deregisterConnection(_adapter); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Wed Jun 17 16:10:25 2015 @@ -367,7 +367,7 @@ public class ServerConnectionDelegate ex final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal(); final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName(); - final Iterator connections = + final Iterator> connections = ((ServerConnection)conn).getVirtualHost().getConnections().iterator(); while(connections.hasNext()) { Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Wed Jun 17 16:10:25 2015 @@ -25,7 +25,12 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; @@ -72,8 +77,7 @@ public class ServerSessionTest extends Q final Broker broker = mock(Broker.class); when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); - AmqpPort amqpPort = mock(AmqpPort.class); - when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP); final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class); @@ -98,8 +102,8 @@ public class ServerSessionTest extends Q final Broker broker = mock(Broker.class); when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); - AmqpPort port = mock(AmqpPort.class); - when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024); + AmqpPort port = createMockPort(1024); + ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP); final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class); connection.setProtocolEngine(protocolEngine); @@ -136,5 +140,17 @@ public class ServerSessionTest extends Q assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty()); } + public AmqpPort createMockPort(int maxMessageSize) + { + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(maxMessageSize); + TaskExecutor childExecutor = new TaskExecutorImpl(); + childExecutor.start(); + when(port.getChildExecutor()).thenReturn(childExecutor); + when(port.getCategoryClass()).thenReturn(Port.class); + when(port.getModel()).thenReturn(BrokerModel.getInstance()); + return port; + } + } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Jun 17 16:10:25 2015 @@ -260,6 +260,9 @@ public class AMQProtocolEngine implement _messagesReceived = new StatisticsCounter("messages-received-" + _connectionId); _dataReceived = new StatisticsCounter("data-received-" + _connectionId); _creationTime = System.currentTimeMillis(); + + _adapter = new ConnectionAdapter(this); + _adapter.create(); } @@ -1058,8 +1061,6 @@ public class AMQProtocolEngine implement public void setVirtualHost(VirtualHostImpl virtualHost) { _virtualHost = virtualHost; - _adapter = new ConnectionAdapter(this); - _adapter.create(); _adapter.virtualHostAssociated(); _messageCompressionThreshold = virtualHost.getContextValue(Integer.class, Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java Wed Jun 17 16:10:25 2015 @@ -29,7 +29,10 @@ import java.util.Map; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; @@ -40,7 +43,7 @@ import org.apache.qpid.transport.network public class AMQProtocolEngineTest extends QpidTestCase { private Broker _broker; - private AmqpPort _port; + private AmqpPort _port; private NetworkConnection _network; private Transport _transport; @@ -54,6 +57,11 @@ public class AMQProtocolEngineTest exten _port = mock(AmqpPort.class); when(_port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + TaskExecutor childExecutor = _broker.getChildExecutor(); + when(_port.getChildExecutor()).thenReturn(childExecutor); + when(_port.getCategoryClass()).thenReturn(Port.class); + when(_port.getModel()).thenReturn(BrokerModel.getInstance()); + _network = mock(NetworkConnection.class); _transport = Transport.TCP; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Wed Jun 17 16:10:25 2015 @@ -134,6 +134,8 @@ public class Connection_1_0 implements C _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId()); _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId()); _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId()); + _adapter = new ConnectionAdapter(this); + _adapter.create(); } public Object getReference() @@ -172,10 +174,7 @@ public class Connection_1_0 implements C } else { - _adapter = new ConnectionAdapter(this); - _adapter.create(); _adapter.virtualHostAssociated(); - } } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Wed Jun 17 16:10:25 2015 @@ -40,7 +40,7 @@ import java.util.UUID; import javax.security.auth.Subject; -import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.virtualhost.VirtualHostPrincipal; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -55,11 +55,6 @@ import org.apache.qpid.amqp_1_0.type.sec import org.apache.qpid.amqp_1_0.type.transport.Open; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; -import org.apache.qpid.server.model.AuthenticationProvider; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -78,7 +73,7 @@ public class ProtocolEngine_1_0_0Test ex private ProtocolEngine_1_0_0 _protocolEngine_1_0_0; private NetworkConnection _networkConnection; private Broker _broker; - private AmqpPort _port; + private AmqpPort _port; private SubjectCreator _subjectCreator; private AuthenticationProvider _authenticationProvider; private List _sentBuffers; @@ -99,6 +94,9 @@ public class ProtocolEngine_1_0_0Test ex when(_broker.getTaskExecutor()).thenReturn(taskExecutor); when(_broker.getId()).thenReturn(UUID.randomUUID()); _port = mock(AmqpPort.class); + when(_port.getChildExecutor()).thenReturn(taskExecutor); + when(_port.getCategoryClass()).thenReturn(Port.class); + when(_port.getModel()).thenReturn(BrokerModel.getInstance()); _subjectCreator = mock(SubjectCreator.class); _authenticationProvider = mock(AuthenticationProvider.class); when(_port.getAuthenticationProvider()).thenReturn(_authenticationProvider); Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java (original) +++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java Wed Jun 17 16:10:25 2015 @@ -17,6 +17,9 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; import java.security.AccessControlException; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +44,7 @@ import org.apache.qpid.server.model.Conf import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.IntegrityViolationException; import org.apache.qpid.server.model.TypedContent; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.QueueExistsException; import org.codehaus.jackson.map.ObjectMapper; @@ -85,6 +89,8 @@ public class RestServlet extends Abstrac EXTRACT_INITIAL_CONFIG_PARAM, INHERITED_ACTUALS_PARAM, CONTENT_DISPOSITION_ATTACHMENT_FILENAME_PARAM)); + public static final int DEFAULT_DEPTH = 1; + public static final int DEFAULT_OVERSIZE = 120; private Class[] _hierarchy; @@ -332,61 +338,69 @@ public class RestServlet extends Abstrac @Override protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - // TODO - sort special params, everything else should act as a filter - String attachmentFilename = request.getParameter(CONTENT_DISPOSITION_ATTACHMENT_FILENAME_PARAM); - boolean extractInitialConfig = getBooleanParameterFromRequest(request, EXTRACT_INITIAL_CONFIG_PARAM); - - if (attachmentFilename != null) + String[] pathInfoElements = getPathInfoElements(request); + if (pathInfoElements != null && pathInfoElements.length == _hierarchy.length + 1) { - setContentDispositionHeaderIfNecessary(response, attachmentFilename); + doOperation(request, response); } + else + { + // TODO - sort special params, everything else should act as a filter + String attachmentFilename = request.getParameter(CONTENT_DISPOSITION_ATTACHMENT_FILENAME_PARAM); + boolean extractInitialConfig = getBooleanParameterFromRequest(request, EXTRACT_INITIAL_CONFIG_PARAM); - Collection> allObjects = getObjects(request); + if (attachmentFilename != null) + { + setContentDispositionHeaderIfNecessary(response, attachmentFilename); + } - if (allObjects.isEmpty() && isSingleObjectRequest(request) ) - { - sendJsonErrorResponse(request, response, HttpServletResponse.SC_NOT_FOUND, "Not Found"); - return; - } + Collection> allObjects = getObjects(request); - int depth; - boolean actuals; - boolean includeSystemContext; - boolean inheritedActuals; - int oversizeThreshold; - - if(extractInitialConfig) - { - depth = Integer.MAX_VALUE; - oversizeThreshold = Integer.MAX_VALUE; - actuals = true; - includeSystemContext = false; - inheritedActuals = false; - } - else - { - depth = getIntParameterFromRequest(request, DEPTH_PARAM, 1); - oversizeThreshold = getIntParameterFromRequest(request, OVERSIZE_PARAM, 120); - actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM); - includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM); - inheritedActuals = getBooleanParameterFromRequest(request, INHERITED_ACTUALS_PARAM); - } + if (allObjects.isEmpty() && isSingleObjectRequest(request)) + { + sendJsonErrorResponse(request, response, HttpServletResponse.SC_NOT_FOUND, "Not Found"); + return; + } - List> output = new ArrayList<>(); - for(ConfiguredObject configuredObject : allObjects) - { + int depth; + boolean actuals; + boolean includeSystemContext; + boolean inheritedActuals; + int oversizeThreshold; + + if (extractInitialConfig) + { + depth = Integer.MAX_VALUE; + oversizeThreshold = Integer.MAX_VALUE; + actuals = true; + includeSystemContext = false; + inheritedActuals = false; + } + else + { + depth = getIntParameterFromRequest(request, DEPTH_PARAM, DEFAULT_DEPTH); + oversizeThreshold = getIntParameterFromRequest(request, OVERSIZE_PARAM, DEFAULT_OVERSIZE); + actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM); + includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM); + inheritedActuals = getBooleanParameterFromRequest(request, INHERITED_ACTUALS_PARAM); + } - output.add(_objectConverter.convertObjectToMap(configuredObject, getConfiguredClass(), - depth, actuals, inheritedActuals, includeSystemContext, extractInitialConfig, oversizeThreshold, request.isSecure())); - } + List> output = new ArrayList<>(); + for (ConfiguredObject configuredObject : allObjects) + { + + output.add(_objectConverter.convertObjectToMap(configuredObject, getConfiguredClass(), + depth, actuals, inheritedActuals, includeSystemContext, extractInitialConfig, oversizeThreshold, request.isSecure())); + } - boolean sendCachingHeaders = attachmentFilename == null; - sendJsonResponse(extractInitialConfig && output.size() == 1 ? output.get(0) : output, - request, - response, - HttpServletResponse.SC_OK, - sendCachingHeaders); + boolean sendCachingHeaders = attachmentFilename == null; + sendJsonResponse(extractInitialConfig && output.size() == 1 ? output.get(0) : output, + request, + response, + HttpServletResponse.SC_OK, + sendCachingHeaders); + } } private boolean isSingleObjectRequest(HttpServletRequest request) @@ -433,7 +447,6 @@ public class RestServlet extends Abstrac response.setContentType("application/json"); List names = getParentNamesFromServletPath(request); - Map providedObject = getRequestProvidedObject(request); boolean isFullObjectURL = names.size() == _hierarchy.length; boolean isPostToFullURL = isFullObjectURL && "POST".equalsIgnoreCase(request.getMethod()); final String[] pathInfoElements = getPathInfoElements(request); @@ -442,6 +455,7 @@ public class RestServlet extends Abstrac { if(!isOperation) { + Map providedObject = getRequestProvidedObject(request); if (names.isEmpty() && _hierarchy.length == 0) { getBroker().setAttributes(providedObject); @@ -493,7 +507,7 @@ public class RestServlet extends Abstrac } else { - doOperation(request, response, names, providedObject, pathInfoElements); + doOperation(request, response); } } catch (RuntimeException e) @@ -504,12 +518,12 @@ public class RestServlet extends Abstrac } private void doOperation(final HttpServletRequest request, - final HttpServletResponse response, - final List names, - final Map providedObject, - final String[] pathInfoElements) throws IOException + final HttpServletResponse response) throws IOException, ServletException { ConfiguredObject subject; + final List names = getParentNamesFromServletPath(request); + final String[] pathInfoElements = getPathInfoElements(request); + if (names.isEmpty() && _hierarchy.length == 0) { subject = getBroker(); @@ -544,6 +558,10 @@ public class RestServlet extends Abstrac final Map> availableOperations = getBroker().getModel().getTypeRegistry().getOperations(subject.getClass()); ConfiguredObjectOperation operation = availableOperations.get(operationName); + Map operationArguments; + + + String requestMethod = request.getMethod(); if (operation == null) { sendJsonErrorResponse(request, @@ -552,7 +570,40 @@ public class RestServlet extends Abstrac "No such operation: " + operationName); return; } - Object returnVal = operation.perform(subject, providedObject); + else + { + if (requestMethod.equals("GET")) + { + if (operation.isNonModifying()) + { + operationArguments = getOperationArgumentsAsMap(request); + } + else + { + response.addHeader("Allow", "POST"); + sendJsonErrorResponse(request, + response, + HttpServletResponse.SC_METHOD_NOT_ALLOWED, + "Operation " + operationName + " modifies the object so you must use POST."); + return; + } + + } + else if (requestMethod.equals("POST")) + { + operationArguments = getRequestProvidedObject(request); + } + else + { + response.addHeader("Allow", (operation.isNonModifying() ? "POST, GET" : "POST")); + sendJsonErrorResponse(request, + response, + HttpServletResponse.SC_METHOD_NOT_ALLOWED, + "Operation " + operationName + " does not support the " + requestMethod + " requestMethod."); + return; + } + } + Object returnVal = operation.perform(subject, operationArguments); if(returnVal instanceof TypedContent) { TypedContent typedContent = (TypedContent)returnVal; @@ -563,11 +614,82 @@ public class RestServlet extends Abstrac } else { + if (ConfiguredObject.class.isAssignableFrom(operation.getReturnType())) + { + returnVal = _objectConverter.convertObjectToMap((ConfiguredObject) returnVal, operation.getReturnType(), DEFAULT_DEPTH, + false, false, false, false, DEFAULT_OVERSIZE, request.isSecure()); + } + else if (returnsCollectionOfConfiguredObjects(operation)) + { + List> output = new ArrayList<>(); + for (Object configuredObject : (Collection)returnVal) + { + output.add(_objectConverter.convertObjectToMap((ConfiguredObject) configuredObject, + getCollectionMemberType((ParameterizedType) operation.getGenericReturnType()), + DEFAULT_DEPTH, false, false, false, false, DEFAULT_OVERSIZE, request.isSecure())); + } + returnVal = output; + } sendJsonResponse(returnVal, request, response); } } + private boolean returnsCollectionOfConfiguredObjects(ConfiguredObjectOperation operation) + { + return Collection.class.isAssignableFrom(operation.getReturnType()) + && operation.getGenericReturnType() instanceof ParameterizedType + && ConfiguredObject.class.isAssignableFrom(getCollectionMemberType((ParameterizedType) operation.getGenericReturnType())); + } + + private Class getCollectionMemberType(ParameterizedType collectionType) + { + return getRawType((collectionType).getActualTypeArguments()[0]); + } + + private static Class getRawType(Type t) + { + if(t instanceof Class) + { + return (Class)t; + } + else if(t instanceof ParameterizedType) + { + return (Class)((ParameterizedType)t).getRawType(); + } + else if(t instanceof TypeVariable) + { + Type[] bounds = ((TypeVariable)t).getBounds(); + if(bounds.length == 1) + { + return getRawType(bounds[0]); + } + } + throw new ServerScopedRuntimeException("Unable to process type when constructing configuration model: " + t); + } + + private Map getOperationArgumentsAsMap(HttpServletRequest request) + { + Map providedObject; + providedObject = new HashMap<>(); + for (Map.Entry entry : request.getParameterMap().entrySet()) + { + String[] value = entry.getValue(); + if (value != null) + { + if(value.length > 1) + { + providedObject.put(entry.getKey(), Arrays.asList(value)); + } + else + { + providedObject.put(entry.getKey(), value[0]); + } + } + } + return providedObject; + } + private List findAllObjectParents(List names) { Collection[] objects = new Collection[_hierarchy.length]; Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js (original) +++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/VirtualHost.js Wed Jun 17 16:10:25 2015 @@ -292,7 +292,12 @@ define(["dojo/parser", var idx = evt.rowIndex, theItem = this.getItem(idx); var connectionName = obj.dataStore.getValue(theItem,"name"); - controller.show("connection", connectionName, vhost, theItem.id); + // mock the connection's parent port because we don't have access to it from here + var port = { name: obj.dataStore.getValue(theItem,"port"), + type: "port", + parent: vhost.parent.parent }; + + controller.show("connection", connectionName, port, theItem.id); }); } ); @@ -323,23 +328,36 @@ define(["dojo/parser", this.management.load(this.modelObj) .then(function(data) { thisObj.vhostData = data[0] || {name: thisObj.modelObj.name,statistics:{messagesIn:0,bytesIn:0,messagesOut:0,bytesOut:0}}; - - if (callback) - { - callback(); - } - - try - { - thisObj._update(); - } - catch(e) - { - if (console && console.error) - { - console.error(e); - } - } + thisObj.management.get({url: thisObj.management.objectToURL(thisObj.modelObj) + "/getConnections" }) + .then(function(data){ + thisObj.vhostData["connections"] = data; + + if (callback) + { + callback(); + } + + try + { + thisObj._update(); + } + catch(e) + { + if (console && console.error) + { + console.error(e); + } + } + + }, + function(error) + { + util.tabErrorHandler(error, { updater:thisObj, + contentPane: thisObj.tabObject.contentPane, + tabContainer: thisObj.tabObject.controller.tabContainer, + name: thisObj.modelObj.name, + category: "Virtual Host" }); + }); }, function(error) { Modified: qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java (original) +++ qpid/java/trunk/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java Wed Jun 17 16:10:25 2015 @@ -29,6 +29,7 @@ import java.util.Map; import javax.management.JMException; import javax.management.ObjectName; +import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ import org.apache.qpid.server.model.Stat import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.virtualhost.ManagedVirtualHost; -public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost, ConfigurationChangeListener +public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost, ConfigurationChangeListener, VirtualHostConnectionListener { private static final Logger LOGGER = LoggerFactory.getLogger(VirtualHostMBean.class); @@ -59,6 +60,7 @@ public class VirtualHostMBean extends AM super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE, registry); _virtualHost = virtualHost; virtualHost.addChangeListener(this); + virtualHost.addConnectionAssociationListener(this); initQueues(); initExchanges(); @@ -164,12 +166,6 @@ public class VirtualHostMBean extends AM _children.put(child, exchangeMBean); } - else if(child instanceof Connection) - { - ConnectionMBean connectionMBean = new ConnectionMBean((Connection)child, this); - _children.put(child, connectionMBean); - - } else { LOGGER.debug("Unsupported child : " + child.getName() + " type : " + child.getClass()); @@ -276,4 +272,27 @@ public class VirtualHostMBean extends AM { } + + @Override + public void connectionAssociated(Connection connection) + { + synchronized (_children) + { + try + { + ConnectionMBean connectionMBean = new ConnectionMBean(connection, this); + _children.put(connection, connectionMBean); + } + catch(Exception e) + { + LOGGER.error("Exception while creating mbean for " + connection.getClass().getSimpleName() + " " + connection.getName(), e); + } + } + } + + @Override + public void connectionRemoved(Connection connection) + { + childRemoved(_virtualHost, connection); + } } Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original) +++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Wed Jun 17 16:10:25 2015 @@ -304,6 +304,24 @@ public class RestTestHelper return response.get(0); } + public List> postDataToPathAndGetList(String path, Map data) throws IOException + { + HttpURLConnection connection = openManagementConnection(path, "POST"); + connection.connect(); + writeJsonRequest(connection, data); + List> response = readJsonResponseAsList(connection); + return response; + } + + public Map postDataToPathAndGetObject(String path, Map data) throws IOException + { + HttpURLConnection connection = openManagementConnection(path, "POST"); + connection.connect(); + writeJsonRequest(connection, data); + Map response = readJsonResponseAsMap(connection); + return response; + } + public List> getJsonAsList(String path) throws IOException { HttpURLConnection connection = openManagementConnection(path, "GET"); Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Wed Jun 17 16:10:25 2015 @@ -34,10 +34,8 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.Set; -import org.apache.qpid.server.model.AuthenticationProvider; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory; @@ -161,12 +159,15 @@ public class MultiVersionProtocolEngineF AuthenticationProvider authProvider = mock(AuthenticationProvider.class); when(authProvider.getSubjectCreator(false)).thenReturn(subjectCreator); - AmqpPort port = mock(AmqpPort.class); + AmqpPort port = mock(AmqpPort.class); when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true); when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); when(port.getAuthenticationProvider()).thenReturn(authProvider); - + TaskExecutor childExecutor = _broker.getChildExecutor(); + when(port.getChildExecutor()).thenReturn(childExecutor); + when(port.getCategoryClass()).thenReturn(Port.class); + when(port.getModel()).thenReturn(BrokerModel.getInstance()); when(port.getContextValue(eq(Long.class), eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/ConnectionRestTest.java Wed Jun 17 16:10:25 2015 @@ -21,6 +21,7 @@ package org.apache.qpid.systest.rest; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Conf import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator; +import org.apache.qpid.test.utils.TestBrokerConfiguration; public class ConnectionRestTest extends QpidRestTestCase { @@ -105,7 +107,7 @@ public class ConnectionRestTest extends public void testGetVirtualHostConnections() throws Exception { - List> connections = getRestTestHelper().getJsonAsList("connection/test/test"); + List> connections = getRestTestHelper().getJsonAsList("virtualhost/test/test/getConnections"); assertEquals("Unexpected number of connections", 1, connections.size()); Asserts.assertConnection(connections.get(0), (AMQConnection) _connection); } @@ -115,23 +117,27 @@ public class ConnectionRestTest extends // get connection name String connectionName = getConnectionName(); - Map connectionDetails = getRestTestHelper().getJsonAsSingletonList("connection/test/test/" - + getRestTestHelper().encodeAsUTF(connectionName)); - assertConnection(connectionDetails); + Map connectionDetailsFromPost = getRestTestHelper().postDataToPathAndGetObject("virtualhost/test/test/getConnection", + Collections.singletonMap("name", (Object) connectionName)); + assertConnection(connectionDetailsFromPost); + + Map connectionDetailsFromGet = getRestTestHelper().getJsonAsMap("virtualhost/test/test/getConnection?name=" + + getRestTestHelper().encodeAsUTF(connectionName)); + assertConnection(connectionDetailsFromGet); } public void testDeleteConnection() throws Exception { - // get connection name String connectionName = getConnectionName(); + String portName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; - List> connections = getRestTestHelper().getJsonAsList("connection/test/test"); + List> connections = getRestTestHelper().getJsonAsList("connection/" + portName); assertEquals("Unexpected number of connections before deletion", 1, connections.size()); - String connectionUrl = "connection/test/test/" + getRestTestHelper().encodeAsUTF(connectionName); + String connectionUrl = "connection/" + portName + "/" + getRestTestHelper().encodeAsUTF(connectionName); getRestTestHelper().submitRequest(connectionUrl, "DELETE", HttpServletResponse.SC_OK); - connections = getRestTestHelper().getJsonAsList("connection/test/test"); + connections = getRestTestHelper().getJsonAsList("connection/" + portName); assertEquals("Unexpected number of connections before deletion", 0, connections.size()); try @@ -152,19 +158,21 @@ public class ConnectionRestTest extends assertSession(sessions.get(0), (AMQSession) _session); } - public void testGetVirtualHostSessions() throws Exception + public void testGetPortSessions() throws Exception { - List> sessions = getRestTestHelper().getJsonAsList("session/test/test"); + String portName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; + + List> sessions = getRestTestHelper().getJsonAsList("session/" + portName); assertEquals("Unexpected number of sessions", 1, sessions.size()); assertSession(sessions.get(0), (AMQSession) _session); } public void testGetConnectionSessions() throws Exception { - // get connection name String connectionName = getConnectionName(); + String portName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; - List> sessions = getRestTestHelper().getJsonAsList("session/test/test/" + List> sessions = getRestTestHelper().getJsonAsList("session/" + portName + "/" + getRestTestHelper().encodeAsUTF(connectionName)); assertEquals("Unexpected number of sessions", 1, sessions.size()); assertSession(sessions.get(0), (AMQSession) _session); @@ -172,10 +180,10 @@ public class ConnectionRestTest extends public void testGetSessionByName() throws Exception { - // get connection name String connectionName = getConnectionName(); + String portName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; - List> sessions = getRestTestHelper().getJsonAsList("session/test/test/" + List> sessions = getRestTestHelper().getJsonAsList("session/" + portName + "/" + getRestTestHelper().encodeAsUTF(connectionName) + "/" + ((AMQSession) _session).getChannelId()); assertEquals("Unexpected number of sessions", 1, sessions.size()); assertSession(sessions.get(0), (AMQSession) _session); @@ -189,8 +197,9 @@ public class ConnectionRestTest extends // session left open ((AMQSession)_session).sync(); String connectionName = getConnectionName(); + String portName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; - List> sessions = getRestTestHelper().getJsonAsList("session/test/test/" + List> sessions = getRestTestHelper().getJsonAsList("session/" + portName + "/" + getRestTestHelper().encodeAsUTF(connectionName) + "/" + ((AMQSession) _session).getChannelId()); assertEquals("Unexpected number of sessions", 1, sessions.size()); @@ -274,10 +283,7 @@ public class ConnectionRestTest extends private String getConnectionName() throws IOException { - Map hostDetails = getRestTestHelper().getJsonAsSingletonList("virtualhost/test/test"); - @SuppressWarnings("unchecked") - List> connections = (List>) hostDetails - .get(VirtualHostRestTest.VIRTUALHOST_CONNECTIONS_ATTRIBUTE); + List> connections = getRestTestHelper().getJsonAsList("virtualhost/test/test/getConnections"); assertEquals("Unexpected number of connections", 1, connections.size()); Map connection = connections.get(0); String connectionName = (String) connection.get(Connection.NAME); Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1686063&r1=1686062&r2=1686063&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Wed Jun 17 16:10:25 2015 @@ -52,7 +52,6 @@ public class VirtualHostRestTest extends { private static final String VIRTUALHOST_EXCHANGES_ATTRIBUTE = "exchanges"; public static final String VIRTUALHOST_QUEUES_ATTRIBUTE = "queues"; - public static final String VIRTUALHOST_CONNECTIONS_ATTRIBUTE = "connections"; public static final String EMPTY_VIRTUALHOSTNODE_NAME = "emptyVHN"; @@ -113,8 +112,7 @@ public class VirtualHostRestTest extends assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, queue.get(Queue.DURABLE)); @SuppressWarnings("unchecked") - List> connections = (List>) hostDetails - .get(VIRTUALHOST_CONNECTIONS_ATTRIBUTE); + List> connections = getRestTestHelper().getJsonAsList("virtualhost/test/test/getConnections"); assertEquals("Unexpected number of connections", 1, connections.size()); Asserts.assertConnection(connections.get(0), _connection); } @@ -680,7 +678,6 @@ public class VirtualHostRestTest extends Asserts.assertDurableExchange("amq.match", "headers", restTestHelper.find(Exchange.NAME, "amq.match", exchanges)); assertNull("Unexpected queues", hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE)); - assertNull("Unexpected connections", hostDetails.get(VIRTUALHOST_CONNECTIONS_ATTRIBUTE)); } private void assertActualAndDesireStates(final String restUrl, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org