Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBB5A974E for ; Fri, 21 Oct 2011 14:45:54 +0000 (UTC) Received: (qmail 46092 invoked by uid 500); 21 Oct 2011 14:45:54 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 46074 invoked by uid 500); 21 Oct 2011 14:45:54 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 46064 invoked by uid 99); 21 Oct 2011 14:45:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2011 14:45:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2011 14:45:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1A0952388C6A for ; Fri, 21 Oct 2011 14:43:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1187375 [22/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf... Date: Fri, 21 Oct 2011 14:42:51 -0000 To: commits@qpid.apache.org From: shuston@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111021144348.1A0952388C6A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011 @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am * */ - -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -38,11 +35,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - private static final ProtocolVersionMethodConverter - PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); public static Factory getInstanceFactory() @@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity); + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - int writtenSize = 0; + int writtenSize = capacity; + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf); CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); @@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl while(writtenSize < bodySize) { - buf = java.nio.ByteBuffer.allocate(capacity); + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + writeFrame(new AMQFrame(channelId, body)); } } } + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) { @@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl return _underlyingBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if(_underlyingBody == null) { @@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); } @@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011 @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am * */ - -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -33,17 +30,16 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataOutputStream; +import java.io.IOException; + public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91); - private static final ProtocolVersionMethodConverter - PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); - public static Factory getInstanceFactory() { @@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity); + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - int writtenSize = 0; + int writtenSize = capacity; - - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf); + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); @@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl while(writtenSize < bodySize) { - buf = java.nio.ByteBuffer.allocate(capacity); + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + writeFrame(new AMQFrame(channelId, body)); } } } + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) { @@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl return _underlyingBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if(_underlyingBody == null) { @@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); } @@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java Fri Oct 21 14:42:12 2011 @@ -27,5 +27,5 @@ public interface Plugin /** * Provide Configuration to this plugin */ - public void configure(ConfigurationPlugin config); + public void configure(ConfigurationPlugin config) throws ConfigurationException; } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Fri Oct 21 14:42:12 2011 @@ -18,39 +18,56 @@ */ package org.apache.qpid.server.plugins; -import static org.apache.felix.framework.util.FelixConstants.*; -import static org.apache.felix.main.AutoProcessor.*; +import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP; +import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY; +import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY; +import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE; +import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE; +import static org.apache.felix.main.AutoProcessor.process; +import static org.osgi.framework.Constants.FRAMEWORK_STORAGE; +import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN; +import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT; +import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.configuration.ConfigurationException; import org.apache.felix.framework.Felix; import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SecurityPluginFactory; import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.access.plugins.DenyAll; import org.apache.qpid.server.security.access.plugins.LegacyAccess; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import org.apache.qpid.util.FileUtils; import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; import org.osgi.framework.BundleException; +import org.osgi.framework.Version; import org.osgi.framework.launch.Framework; import org.osgi.util.tracker.ServiceTracker; @@ -63,7 +80,6 @@ public class PluginManager implements Cl private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String QPID_VER_SUFFIX = "version=0.9,"; private Framework _felix; @@ -72,15 +88,61 @@ public class PluginManager implements Cl private ServiceTracker _configTracker = null; private ServiceTracker _virtualHostTracker = null; private ServiceTracker _policyTracker = null; + private ServiceTracker _authenticationManagerTracker = null; private Activator _activator; + private final List _trackers = new ArrayList(); private Map _securityPlugins = new HashMap(); private Map, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap, ConfigurationPluginFactory>(); private Map _vhostPlugins = new HashMap(); private Map _policyPlugins = new HashMap(); + private Map> _authenticationManagerPlugins = new HashMap>(); - public PluginManager(String pluginPath, String cachePath) throws Exception + /** The default name of the OSGI system package list. */ + private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties"; + + /** The name of the override system property that holds the name of the OSGI system package list. */ + private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties"; + + private static final String OSGI_SYSTEM_PACKAGES; + + static + { + final String filename = System.getProperty(FILE_PROPERTY); + final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME, + PluginManager.class.getClassLoader()); + + try + { + Version qpidReleaseVersion; + try + { + qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion()); + } + catch (IllegalArgumentException iae) + { + qpidReleaseVersion = null; + } + + final Properties p = new Properties(); + p.load(is); + + final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p); + + OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString(); + + _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES); + } + catch (IOException e) + { + _logger.error("Error reading OSGI system package list", e); + throw new ExceptionInInitializerError(e); + } + } + + + public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception { // Store all non-OSGi plugins // A little gross that we have to add them here, but not all the plugins are OSGIfied @@ -97,7 +159,8 @@ public class PluginManager implements Cl LegacyAccess.LegacyAccessConfiguration.FACTORY, new SlowConsumerDetectionConfigurationFactory(), new SlowConsumerDetectionPolicyConfigurationFactory(), - new SlowConsumerDetectionQueueConfigurationFactory())) + new SlowConsumerDetectionQueueConfigurationFactory(), + PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY)) { _configPlugins.put(configFactory.getParentPaths(), configFactory); } @@ -112,125 +175,109 @@ public class PluginManager implements Cl _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory); } - // Check the plugin directory path is set and exist - if (pluginPath == null) + for (AuthenticationManagerPluginFactory pluginFactory : Arrays.asList( + PrincipalDatabaseAuthenticationManager.FACTORY)) { - return; + _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory); } - File pluginDir = new File(pluginPath); - if (!pluginDir.exists()) - { - return; - } - - // Setup OSGi configuration propery map - StringMap configMap = new StringMap(false); - - // Add the bundle provided service interface package and the core OSGi - // packages to be exported from the class path via the system bundle. - configMap.put(FRAMEWORK_SYSTEMPACKAGES, - "org.osgi.framework; version=1.3.0," + - "org.osgi.service.packageadmin; version=1.2.0," + - "org.osgi.service.startlevel; version=1.0.0," + - "org.osgi.service.url; version=1.0.0," + - "org.osgi.util.tracker; version=1.0.0," + - "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX + - "org.apache.qpid; " + QPID_VER_SUFFIX + - "org.apache.qpid.common; " + QPID_VER_SUFFIX + - "org.apache.qpid.exchange; " + QPID_VER_SUFFIX + - "org.apache.qpid.framing; " + QPID_VER_SUFFIX + - "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX + - "org.apache.qpid.protocol; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.management; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.util; " + QPID_VER_SUFFIX + - "org.apache.commons.configuration; version=1.0.0," + - "org.apache.commons.lang; version=1.0.0," + - "org.apache.commons.lang.builder; version=1.0.0," + - "org.apache.commons.logging; version=1.0.0," + - "org.apache.log4j; version=1.2.12," + - "javax.management.openmbean; version=1.0.0," + - "javax.management; version=1.0.0" - ); - - // No automatic shutdown hook - configMap.put("felix.shutdown.hook", "false"); - - // Add system activator - List activators = new ArrayList(); - _activator = new Activator(); - activators.add(_activator); - configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); - if (cachePath != null) + if(bundleContext == null) { - File cacheDir = new File(cachePath); - if (!cacheDir.exists() && cacheDir.canWrite()) + // Check the plugin directory path is set and exist + if (pluginPath == null) { - _logger.info("Creating plugin cache directory: " + cachePath); - cacheDir.mkdir(); + _logger.info("No plugin path specified, no plugins will be loaded."); + return; } - - // Set plugin cache directory and empty it - _logger.info("Cache bundles in directory " + cachePath); - configMap.put(FRAMEWORK_STORAGE, cachePath); - } - configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); - - // Set directory with plugins to auto-deploy - _logger.info("Auto deploying bundles from directory " + pluginPath); - configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath); - configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE); - - // Start plugin manager and trackers - _felix = new Felix(configMap); - try - { - _logger.info("Starting plugin manager..."); - _felix.init(); - process(configMap, _felix.getBundleContext()); - _felix.start(); - _logger.info("Started plugin manager"); + File pluginDir = new File(pluginPath); + if (!pluginDir.exists()) + { + _logger.warn("Plugin dir : " + pluginDir + " does not exist."); + return; + } + + // Add the bundle provided service interface package and the core OSGi + // packages to be exported from the class path via the system bundle. + + // Setup OSGi configuration property map + final StringMap configMap = new StringMap(false); + configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES); + + // No automatic shutdown hook + configMap.put("felix.shutdown.hook", "false"); + + // Add system activator + List activators = new ArrayList(); + _activator = new Activator(); + activators.add(_activator); + configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); + + if (cachePath != null) + { + File cacheDir = new File(cachePath); + if (!cacheDir.exists() && cacheDir.canWrite()) + { + _logger.info("Creating plugin cache directory: " + cachePath); + cacheDir.mkdir(); + } + + // Set plugin cache directory and empty it + _logger.info("Cache bundles in directory " + cachePath); + configMap.put(FRAMEWORK_STORAGE, cachePath); + } + configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + + // Set directory with plugins to auto-deploy + _logger.info("Auto deploying bundles from directory " + pluginPath); + configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath); + configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE); + + // Start plugin manager + _felix = new Felix(configMap); + try + { + _logger.info("Starting plugin manager framework"); + _felix.init(); + process(configMap, _felix.getBundleContext()); + _felix.start(); + _logger.info("Started plugin manager framework"); + } + catch (BundleException e) + { + throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e); + } + + bundleContext = _activator.getContext(); } - catch (BundleException e) + else { - throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e); + _logger.info("Using the specified external BundleContext"); } - - // TODO save trackers in a map, keyed by class name - - _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null); + + _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null); _exchangeTracker.open(); + _trackers.add(_exchangeTracker); - _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null); + _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null); _securityTracker.open(); + _trackers.add(_securityTracker); - _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null); + _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null); _configTracker.open(); + _trackers.add(_configTracker); - _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null); + _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null); _virtualHostTracker.open(); + _trackers.add(_virtualHostTracker); - _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); + _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null); _policyTracker.open(); - + _trackers.add(_policyTracker); + + _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null); + _authenticationManagerTracker.open(); + _trackers.add(_authenticationManagerTracker); + _logger.info("Opened service trackers"); } @@ -301,22 +348,26 @@ public class PluginManager implements Cl return getServices(_securityTracker, _securityPlugins); } + public Map> getAuthenticationManagerPlugins() + { + return getServices(_authenticationManagerTracker, _authenticationManagerPlugins); + } + public void close() { - if (_felix != null) + try { - try + // Close all bundle trackers + for(ServiceTracker tracker : _trackers) { - // Close all bundle trackers - _exchangeTracker.close(); - _securityTracker.close(); - _configTracker.close(); - _virtualHostTracker.close(); - _policyTracker.close(); + tracker.close(); } - finally + } + finally + { + if (_felix != null) { - _logger.info("Stopping plugin manager"); + _logger.info("Stopping plugin manager framework"); try { // FIXME should be stopAndWait() but hangs VM, need upgrade in felix @@ -335,7 +386,12 @@ public class PluginManager implements Cl { // Ignore } - _logger.info("Stopped plugin manager"); + _logger.info("Stopped plugin manager framework"); + } + else + { + _logger.info("Plugin manager was started with an external BundleContext, " + + "skipping remaining shutdown tasks"); } } } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Oct 21 14:42:12 2011 @@ -20,14 +20,35 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.AMQConstant; +import java.util.List; +import java.util.UUID; + import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.stats.StatisticsGatherer; -public interface AMQConnectionModel +public interface AMQConnectionModel extends StatisticsGatherer { + /** + * get a unique id for this connection. + * + * @return a {@link UUID} representing the connection + */ + public UUID getId(); + + /** + * Close the underlying Connection + * + * @param cause + * @param message + * @throws org.apache.qpid.AMQException + */ + public void close(AMQConstant cause, String message) throws AMQException; /** * Close the given requested Session + * * @param session * @param cause * @param message @@ -36,4 +57,20 @@ public interface AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; public long getConnectionId(); + + /** + * Get a list of all sessions using this connection. + * + * @return a list of {@link AMQSessionModel}s + */ + public List getSessionModels(); + + /** + * Return a {@link LogSubject} for the connection. + */ + public LogSubject getLogSubject(); + + public String getUserName(); + + public boolean isSessionNameUnique(String name); } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Oct 21 14:42:12 2011 @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -30,18 +32,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; +import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -66,12 +66,10 @@ import org.apache.qpid.framing.MethodDis import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -90,21 +88,22 @@ import org.apache.qpid.server.management import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.NetworkConnection; -public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig +public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - private static final AtomicLong idGenerator = new AtomicLong(0); - // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -134,7 +133,7 @@ public class AMQProtocolEngine implement private Object _lastSent; protected volatile boolean _closed; - + // maximum number of channels this session should have private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); @@ -146,47 +145,46 @@ public class AMQProtocolEngine implement private Map _closingChannelsList = new ConcurrentHashMap(); private ProtocolOutputConverter _protocolOutputConverter; - private Principal _authorizedID; + private Subject _authorizedSubject; private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; - // Create a simple ID that increments for ever new Session - private final long _sessionID = idGenerator.getAndIncrement(); + private final long _sessionID; private AMQPConnectionActor _actor; private LogSubject _logSubject; - private NetworkDriver _networkDriver; - private long _lastIoTime; private long _writtenBytes; private long _readBytes; - private Job _readJob; - private Job _writeJob; - private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); private final UUID _id; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + private NetworkConnection _network; + private Sender _sender; + public ManagedObject getManagedObject() { return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _networkDriver = driver; - _codecFactory = new AMQCodecFactory(true, this); - _poolReference.acquireExecutorService(); - _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); - _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + + setNetworkConnection(network); + _sessionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -195,9 +193,21 @@ public class AMQProtocolEngine implement _configStore = virtualHostRegistry.getConfigStore(); _id = _configStore.createId(); - _actor.message(ConnectionMessages.OPEN(null, null, false, false)); + _registry = virtualHostRegistry.getApplicationRegistry(); + initialiseStatistics(); + } + + public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + _network = network; + _sender = sender; } private AMQProtocolSessionMBean createMBean() throws JMException @@ -236,26 +246,18 @@ public class AMQProtocolEngine implement try { final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + for (AMQDataBlock dataBlock : dataBlocks) { - public void run() + try { - // Decode buffer - - for (AMQDataBlock dataBlock : dataBlocks) - { - try - { - dataBlockReceived(dataBlock); - } - catch (Exception e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - } - } + dataBlockReceived(dataBlock); } - }); + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); + } + } } catch (Exception e) { @@ -333,6 +335,11 @@ public class AMQProtocolEngine implement closeChannel(channelId); throw e; } + catch (TransportException e) + { + closeChannel(channelId); + throw e; + } } finally { @@ -343,7 +350,7 @@ public class AMQProtocolEngine implement private void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { // Log incomming protocol negotiation request @@ -363,15 +370,49 @@ public class AMQProtocolEngine implement null, mechanisms.getBytes(), locales.getBytes()); - _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(asByteBuffer(responseBody.generateFrame(0))); + _sender.flush(); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); + _sender.flush(); + } + } + + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); } + + buf.flip(); + return buf; } public void methodFrameReceived(int channelId, AMQMethodBody methodBody) @@ -426,19 +467,19 @@ public class AMQProtocolEngine implement AMQConstant.CHANNEL_ERROR.getName().toString()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (AMQConnectionException e) { _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e, false); + closeConnection(channelId, e); } catch (AMQSecurityException e) { AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (Exception e) @@ -481,19 +522,14 @@ public class AMQProtocolEngine implement * * @param frame the frame to write */ - public void writeFrame(AMQDataBlock frame) + public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() - { - public void run() - { - _networkDriver.send(buf); - } - }); + _sender.send(buf); + _sender.flush(); } public AMQShortString getContextKey() @@ -683,8 +719,8 @@ public class AMQProtocolEngine implement { if (delay > 0) { - _networkDriver.setMaxWriteIdle(delay); - _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _network.setMaxWriteIdle(delay); + _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -725,7 +761,7 @@ public class AMQProtocolEngine implement } closeAllChannels(); - + getConfigStore().removeConfiguredObject(this); if (_managedObject != null) @@ -745,7 +781,6 @@ public class AMQProtocolEngine implement _closed = true; notifyAll(); } - _poolReference.releaseExecutorService(); CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } } @@ -768,27 +803,32 @@ public class AMQProtocolEngine implement } } - public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + public void closeConnection(int channelId, AMQConnectionException e) throws AMQException { - if (_logger.isInfoEnabled()) + try { - _logger.info("Closing connection due to: " + e); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e); + } - if (closeProtocolSession) + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + } + finally { closeProtocolSession(); } + + } public void closeProtocolSession() { - _networkDriver.close(); + _network.close(); + try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -797,11 +837,15 @@ public class AMQProtocolEngine implement { _logger.info(e.getMessage()); } + catch (TransportException e) + { + _logger.info(e.getMessage()); + } } public String toString() { - return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); + return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } public String dump() @@ -823,17 +867,11 @@ public class AMQProtocolEngine implement */ public String getLocalFQDN() { - SocketAddress address = _networkDriver.getLocalAddress(); - // we use the vmpipe address in some tests hence the need for this rather ugly test. The host - // information is used by SASL primary. + SocketAddress address = _network.getLocalAddress(); if (address instanceof InetSocketAddress) { return ((InetSocketAddress) address).getHostName(); } - else if (address instanceof VmPipeAddress) - { - return "vmpipe:" + ((VmPipeAddress) address).getPort(); - } else { throw new IllegalArgumentException("Unsupported socket address class: " + address); @@ -912,7 +950,7 @@ public class AMQProtocolEngine implement public Object getClientIdentifier() { - return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + return _network.getRemoteAddress(); } public VirtualHost getVirtualHost() @@ -925,7 +963,7 @@ public class AMQProtocolEngine implement _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); - + _configStore.addConfiguredObject(this); try @@ -954,29 +992,33 @@ public class AMQProtocolEngine implement return _protocolOutputConverter; } - public void setAuthorizedID(Principal authorizedID) + public void setAuthorizedSubject(final Subject authorizedSubject) { - _authorizedID = authorizedID; + if (authorizedSubject == null) + { + throw new IllegalArgumentException("authorizedSubject cannot be null"); + } + _authorizedSubject = authorizedSubject; } - public Principal getAuthorizedID() + public Subject getAuthorizedSubject() { - return _authorizedID; + return _authorizedSubject; } - public Principal getPrincipal() + public Principal getAuthorizedPrincipal() { - return _authorizedID; + return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); } public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -999,6 +1041,10 @@ public class AMQProtocolEngine implement { _logger.error("Could not close protocol engine", e); } + catch (TransportException e) + { + _logger.error("Could not close protocol engine", e); + } } public void readerIdle() @@ -1006,14 +1052,9 @@ public class AMQProtocolEngine implement // Nothing } - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public void writerIdle() { - _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } public void exception(Throwable throwable) @@ -1021,7 +1062,7 @@ public class AMQProtocolEngine implement if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _networkDriver.close(); + _sender.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1039,7 +1080,7 @@ public class AMQProtocolEngine implement writeFrame(closeBody.generateFrame(0)); - _networkDriver.close(); + _sender.close(); } } @@ -1078,19 +1119,6 @@ public class AMQProtocolEngine implement return (_clientVersion == null) ? null : _clientVersion.toString(); } - public void closeIfLingeringClosedChannels() - { - for (Entryid : _closingChannelsList.entrySet()) - { - if (id.getValue() + 30000 > System.currentTimeMillis()) - { - // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection - _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); - closeProtocolSession(); - } - } - } - public Boolean isIncoming() { return true; @@ -1108,7 +1136,7 @@ public class AMQProtocolEngine implement public String getAuthId() { - return getAuthorizedID().getName(); + return getAuthorizedPrincipal().getName(); } public Integer getRemotePID() @@ -1170,7 +1198,7 @@ public class AMQProtocolEngine implement { return false; } - + public void mgmtClose() { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1263,7 +1291,6 @@ public class AMQProtocolEngine implement public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - closeChannel((Integer)session.getID()); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1273,6 +1300,110 @@ public class AMQProtocolEngine implement new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); - } + writeFrame(responseBody.generateFrame((Integer)session.getID())); + } + + public void close(AMQConstant cause, String message) throws AMQException + { + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getProtocolOutputConverter().getProtocolMajorVersion(), + getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null)); + } + + public List getSessionModels() + { + List sessions = new ArrayList(); + for (AMQChannel channel : getChannels()) + { + sessions.add((AMQSessionModel) channel); + } + return sessions; + } + + public LogSubject getLogSubject() + { + return _logSubject; + } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _virtualHost.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); + _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); + _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); + _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } + + @Override + public boolean isSessionNameUnique(String name) + { + return true; + } + + @Override + public String getUserName() + { + return getAuthorizedPrincipal().getName(); + } } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 21 14:42:12 2011 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.qpid.AMQException; @@ -28,16 +29,15 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.security.PrincipalHolder; +import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.security.Principal; import java.util.List; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel { long getSessionID(); @@ -163,8 +163,10 @@ public interface AMQProtocolSession exte /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; + void closeProtocolSession(); + /** This must be called to close the session in order to free up any resources managed by the session. */ - void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException; + void closeConnection(int channelId, AMQConnectionException e) throws AMQException; /** @return a key that uniquely identifies this session */ @@ -205,7 +207,7 @@ public interface AMQProtocolSession exte public ProtocolOutputConverter getProtocolOutputConverter(); - void setAuthorizedID(Principal authorizedID); + void setAuthorizedSubject(Subject authorizedSubject); public java.net.SocketAddress getRemoteAddress(); @@ -231,7 +233,5 @@ public interface AMQProtocolSession exte List getChannels(); - void closeIfLingeringClosedChannels(); - void mgmtCloseChannel(int channelId); } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Oct 21 14:42:12 2011 @@ -37,25 +37,15 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.management.common.mbeans.ManagedConnection; -import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; -import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; +import java.util.Date; +import java.util.List; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.NotCompliantMBeanException; import javax.management.Notification; +import javax.management.ObjectName; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -66,8 +56,20 @@ import javax.management.openmbean.Simple import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import java.util.Date; -import java.util.List; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; /** * This MBean class implements the management interface. In order to make more attributes, operations and notifications @@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean ext super(ManagedConnection.class, ManagedConnection.TYPE); _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); - remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; - _name = jmxEncode(new StringBuffer(remote), 0).toString(); + _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote; init(); } @@ -130,7 +131,7 @@ public class AMQProtocolSessionMBean ext public String getAuthorizedId() { - return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null; + return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null; } public String getVersion() @@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean ext public String getObjectInstanceName() { - return _name; + return ObjectName.quote(_name); } /** @@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean ext _broadcaster.sendNotification(n); } -} // End of MBean class + public void resetStatistics() throws Exception + { + _protocolSession.resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return _protocolSession.getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return _protocolSession.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return _protocolSession.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return _protocolSession.getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return _protocolSession.isStatisticsEnabled(); + } + + public void setStatisticsEnabled(boolean enabled) + { + _protocolSession.setStatisticsEnabled(enabled); + } +} Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 21 14:42:12 2011 @@ -20,15 +20,35 @@ */ package org.apache.qpid.server.protocol; +import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; public interface AMQSessionModel { - Object getID(); + public Object getID(); - AMQConnectionModel getConnectionModel(); + public AMQConnectionModel getConnectionModel(); - String getClientID(); + public String getClientID(); + + public void close() throws AMQException; - LogSubject getLogSubject(); + public LogSubject getLogSubject(); + + /** + * This method is called from the housekeeping thread to check the status of + * transactions on this session and react appropriately. + * + * If a transaction is open for too long or idle for too long then a warning + * is logged or the connection is closed, depending on the configuration. An open + * transaction is one that has recent activity. The transaction age is counted + * from the time the transaction was started. An idle transaction is one that + * has had no activity, such as publishing or acknowledgeing messages. + * + * @param openWarn time in milliseconds before alerting on open transaction + * @param openClose time in milliseconds before closing connection with open transaction + * @param idleWarn time in milliseconds before alerting on idle transaction + * @param idleClose time in milliseconds before closing connection with idle transaction + */ + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 21 14:42:12 2011 @@ -22,45 +22,54 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Set; -public class MultiVersionProtocolEngine implements ProtocolEngine +public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); + private final long _id; - - private NetworkDriver _networkDriver; - private Set _supported; + private Set _supported; private String _fqdn; private IApplicationRegistry _appRegistry; + private NetworkConnection _network; + private Sender _sender; + + private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); - private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + String fqdn, + Set supported, + NetworkConnection network, + long id) + { + this(appRegistry,fqdn,supported,id); + setNetworkConnection(network); + } public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, String fqdn, - Set supported, NetworkDriver networkDriver) + Set supported, + long id) { + _id = id; _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _networkDriver = networkDriver; - } - public void setNetworkDriver(NetworkDriver driver) - { - _delegate.setNetworkDriver(driver); } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine _delegate.readerIdle(); } + public void received(ByteBuffer msg) { _delegate.received(msg); @@ -106,6 +116,11 @@ public class MultiVersionProtocolEngine _delegate.exception(t); } + public long getConnectionId() + { + return _delegate.getConnectionId(); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; private static final byte[] AMQP_0_8_HEADER = @@ -130,7 +145,7 @@ public class MultiVersionProtocolEngine (byte) 9 }; -private static final byte[] AMQP_0_9_1_HEADER = + private static final byte[] AMQP_0_9_1_HEADER = new byte[] { (byte) 'A', (byte) 'M', (byte) 'Q', @@ -153,19 +168,31 @@ private static final byte[] AMQP_0_9_1_H (byte) 10 }; + public void setNetworkConnection(NetworkConnection networkConnection) + { + setNetworkConnection(networkConnection, networkConnection.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + _network = network; + _sender = sender; + } + + private static interface DelegateCreator { - VERSION getVersion(); + AmqpProtocolVersion getVersion(); byte[] getHeaderIdentifier(); - ProtocolEngine getProtocolEngine(); + ServerProtocolEngine getProtocolEngine(); } private DelegateCreator creator_0_8 = new DelegateCreator() { - public VERSION getVersion() + public AmqpProtocolVersion getVersion() { - return VERSION.v0_8; + return AmqpProtocolVersion.v0_8; } public byte[] getHeaderIdentifier() @@ -173,18 +200,18 @@ private static final byte[] AMQP_0_9_1_H return AMQP_0_8_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; private DelegateCreator creator_0_9 = new DelegateCreator() { - public VERSION getVersion() + public AmqpProtocolVersion getVersion() { - return VERSION.v0_9; + return AmqpProtocolVersion.v0_9; } @@ -193,18 +220,18 @@ private static final byte[] AMQP_0_9_1_H return AMQP_0_9_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; private DelegateCreator creator_0_9_1 = new DelegateCreator() { - public VERSION getVersion() + public AmqpProtocolVersion getVersion() { - return VERSION.v0_9_1; + return AmqpProtocolVersion.v0_9_1; } @@ -213,9 +240,9 @@ private static final byte[] AMQP_0_9_1_H return AMQP_0_9_1_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; @@ -223,9 +250,9 @@ private static final byte[] AMQP_0_9_1_H private DelegateCreator creator_0_10 = new DelegateCreator() { - public VERSION getVersion() + public AmqpProtocolVersion getVersion() { - return VERSION.v0_10; + return AmqpProtocolVersion.v0_10; } @@ -234,15 +261,15 @@ private static final byte[] AMQP_0_9_1_H return AMQP_0_10_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { final ConnectionDelegate connDelegate = new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); - ServerConnection conn = new ServerConnection(); + ServerConnection conn = new ServerConnection(_id); conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry); + return new ProtocolEngine_0_10( conn, _network, _appRegistry); } }; @@ -250,21 +277,16 @@ private static final byte[] AMQP_0_9_1_H new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; - private class ClosedDelegateProtocolEngine implements ProtocolEngine + private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -301,26 +323,30 @@ private static final byte[] AMQP_0_9_1_H { } - } - private class SelfDelegateProtocolEngine implements ProtocolEngine - { + public void setNetworkConnection(NetworkConnection network, Sender sender) + { - private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); + } - public void setNetworkDriver(NetworkDriver driver) + public long getConnectionId() { - _networkDriver = driver; + return _id; } + } + + private class SelfDelegateProtocolEngine implements ServerProtocolEngine + { + private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -355,7 +381,7 @@ private static final byte[] AMQP_0_9_1_H _header.get(headerBytes); - ProtocolEngine newDelegate = null; + ServerProtocolEngine newDelegate = null; byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) @@ -380,17 +406,20 @@ private static final byte[] AMQP_0_9_1_H // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - _networkDriver.send(ByteBuffer.wrap(newestSupported)); + _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); + + _network.close(); + } else { - newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; _header.flip(); + _delegate.setNetworkConnection(_network, _sender); _delegate.received(_header); if(msg.hasRemaining()) { @@ -402,6 +431,11 @@ private static final byte[] AMQP_0_9_1_H } + public long getConnectionId() + { + return _id; + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -421,5 +455,10 @@ private static final byte[] AMQP_0_9_1_H { } + + public void setNetworkConnection(NetworkConnection network, Sender sender) + { + + } } } Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff ============================================================================== --- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original) +++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 21 14:42:12 2011 @@ -20,56 +20,38 @@ */ package org.apache.qpid.server.protocol; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; - -import java.util.Set; -import java.util.Arrays; -import java.util.HashSet; +import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - ; - - - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; - - private static final Set ALL_VERSIONS = new HashSet(Arrays.asList(VERSION.values())); + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); private final IApplicationRegistry _appRegistry; private final String _fqdn; - private final Set _supported; - + private final Set _supported; - public MultiVersionProtocolEngineFactory() + public MultiVersionProtocolEngineFactory(String fqdn, Set supportedVersions) { - this(1, "localhost", ALL_VERSIONS); - } - - public MultiVersionProtocolEngineFactory(String fqdn, Set versions) - { - this(1, fqdn, versions); + _appRegistry = ApplicationRegistry.getInstance(); + _fqdn = fqdn; + _supported = supportedVersions; } - - public MultiVersionProtocolEngineFactory(String fqdn) + public ServerProtocolEngine newProtocolEngine(NetworkConnection network) { - this(1, fqdn, ALL_VERSIONS); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); } - public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set supportedVersions) + public ServerProtocolEngine newProtocolEngine() { - _appRegistry = ApplicationRegistry.getInstance(instance); - _fqdn = fqdn; - _supported = supportedVersions; + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement()); } - - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) - { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); - } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org