qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r447994 [22/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...
Date Tue, 19 Sep 2006 22:07:25 GMT
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.log4j.Logger;
+
+/**
+ * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
+ * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
+ * too often.
+ *
+ */
+public class ProtocolBufferMonitorFilter extends IoFilterAdapter
+{
+    private static final Logger _logger = Logger.getLogger(ProtocolBufferMonitorFilter.class);
+
+    public static long DEFAULT_FREQUENCY = 5000;
+
+    public static int DEFAULT_THRESHOLD = 3000;
+
+    private int  _bufferedMessages = 0;
+
+    private int _threshold;
+
+    private long _lastMessageOutputTime;
+
+    private long _outputFrequencyInMillis;
+
+    public ProtocolBufferMonitorFilter()
+    {
+        _threshold = DEFAULT_THRESHOLD;
+        _outputFrequencyInMillis = DEFAULT_FREQUENCY;
+    }
+
+    public ProtocolBufferMonitorFilter(int threshold, long frequency)
+    {
+        _threshold = threshold;
+        _outputFrequencyInMillis = frequency;
+    }
+
+    public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        _bufferedMessages++;
+        if (_bufferedMessages > _threshold)
+        {
+            long now = System.currentTimeMillis();
+            if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
+            {
+                _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: " +
+                             _bufferedMessages);
+                _lastMessageOutputTime = now;
+            }
+        }
+
+        nextFilter.messageReceived(session, message);
+    }
+
+    public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        _bufferedMessages--;
+        nextFilter.messageSent(session, message);
+    }
+
+    public int getBufferedMessages()
+    {
+        return _bufferedMessages;
+    }
+
+    public int getThreshold()
+    {
+        return _threshold;
+    }
+
+    public void setThreshold(int threshold)
+    {
+        _threshold = threshold;
+    }
+
+    public long getOutputFrequencyInMillis()
+    {
+        return _outputFrequencyInMillis;
+    }
+
+    public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
+    {
+        _outputFrequencyInMillis = outputFrequencyInMillis;
+    }
+
+    public long getLastMessageOutputTime()
+    {
+        return _lastMessageOutputTime;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/AMQCallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/AMQCallbackHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/AMQCallbackHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/AMQCallbackHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
+import javax.security.auth.callback.CallbackHandler;
+
+public interface AMQCallbackHandler extends CallbackHandler
+{
+    void initialise(AMQProtocolSession protocolSession);    
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/AMQCallbackHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,154 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import org.apache.log4j.Logger;
+
+import java.io.*;
+import java.util.*;
+
+public class CallbackHandlerRegistry
+{
+    private static final String FILE_PROPERTY = "amq.callbackhandler.properties";
+
+    private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class);
+
+    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+
+    private Map _mechanismToHandlerClassMap = new HashMap();
+
+    private String _mechanisms;
+
+    public static CallbackHandlerRegistry getInstance()
+    {
+        return _instance;        
+    }
+
+    public Class getCallbackHandlerClass(String mechanism)
+    {
+        return (Class) _mechanismToHandlerClassMap.get(mechanism);
+    }
+
+    public String getMechanisms()
+    {
+        return _mechanisms;
+    }
+
+    private CallbackHandlerRegistry()
+    {
+        // first we register any Sasl client factories
+        DynamicSaslRegistrar.registerSaslProviders();
+
+        InputStream is = openPropertiesInputStream();
+        try
+        {
+            Properties props = new Properties();
+            props.load(is);
+            parseProperties(props);
+            _logger.info("Available SASL mechanisms: " + _mechanisms);
+        }
+        catch (IOException e)
+        {
+            _logger.error("Error reading properties: " + e, e);
+        }
+        finally
+        {
+            if (is != null)
+            {
+                try
+                {
+                    is.close();
+
+                }
+                catch (IOException e)
+                {
+                    _logger.error("Unable to close properties stream: " + e, e);
+                }
+            }
+        }
+    }
+
+    private InputStream openPropertiesInputStream()
+    {
+        String filename = System.getProperty(FILE_PROPERTY);
+        boolean useDefault = true;
+        InputStream is = null;
+        if (filename != null)
+        {
+            try
+            {
+                is = new BufferedInputStream(new FileInputStream(new File(filename)));
+                useDefault = false;
+            }
+            catch (FileNotFoundException e)
+            {
+                _logger.error("Unable to read from file " + filename + ": " + e, e);
+            }
+        }
+
+        if (useDefault)
+        {
+            is = CallbackHandlerRegistry.class.getResourceAsStream("CallbackHandlerRegistry.properties");
+        }
+
+        return is;
+    }
+
+    private void parseProperties(Properties props)
+    {
+        Enumeration e = props.propertyNames();
+        while (e.hasMoreElements())
+        {
+            String propertyName = (String) e.nextElement();
+            int period = propertyName.indexOf(".");
+            if (period < 0)
+            {
+                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
+                continue;
+            }
+            String mechanism = propertyName.substring(period + 1);
+            String className = props.getProperty(propertyName);
+            Class clazz = null;
+            try
+            {
+                clazz = Class.forName(className);
+                if (!AMQCallbackHandler.class.isAssignableFrom(clazz))
+                {
+                    _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class +
+                                 ". Skipping");
+                    continue;
+                }
+                _mechanismToHandlerClassMap.put(mechanism, clazz);
+                if (_mechanisms == null)
+                {
+                    _mechanisms = mechanism;
+                }
+                else
+                {
+                    // one time cost
+                    _mechanisms = _mechanisms + " " + mechanism;
+                }
+            }
+            catch (ClassNotFoundException ex)
+            {
+                _logger.warn("Unable to load class " + className + ". Skipping that SASL provider");
+                continue;
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Tue Sep 19 15:06:50 2006
@@ -0,0 +1,2 @@
+CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import org.apache.log4j.Logger;
+
+import javax.security.sasl.SaslClientFactory;
+import java.io.*;
+import java.util.Properties;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.TreeMap;
+import java.security.Security;
+
+public class DynamicSaslRegistrar
+{
+    private static final String FILE_PROPERTY = "amq.dynamicsaslregistrar.properties";
+
+    private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class);
+
+    public static void registerSaslProviders()
+    {
+        InputStream is = openPropertiesInputStream();
+        try
+        {
+            Properties props = new Properties();
+            props.load(is);
+            Map<String, Class<? extends SaslClientFactory>> factories = parseProperties(props);
+            if (factories.size() > 0)
+            {
+                Security.addProvider(new JCAProvider(factories));
+                _logger.debug("Dynamic SASL provider added as a security provider");
+            }
+        }
+        catch (IOException e)
+        {
+            _logger.error("Error reading properties: " + e, e);
+        }
+        finally
+        {
+            if (is != null)
+            {
+                try
+                {
+                    is.close();
+
+                }
+                catch (IOException e)
+                {
+                    _logger.error("Unable to close properties stream: " + e, e);
+                }
+            }
+        }
+    }
+
+    private static InputStream openPropertiesInputStream()
+    {
+        String filename = System.getProperty(FILE_PROPERTY);
+        boolean useDefault = true;
+        InputStream is = null;
+        if (filename != null)
+        {
+            try
+            {
+                is = new BufferedInputStream(new FileInputStream(new File(filename)));
+                useDefault = false;
+            }
+            catch (FileNotFoundException e)
+            {
+                _logger.error("Unable to read from file " + filename + ": " + e, e);
+            }
+        }
+
+        if (useDefault)
+        {
+            is = CallbackHandlerRegistry.class.getResourceAsStream("DynamicSaslRegistrar.properties");
+        }
+
+        return is;
+    }
+
+    private static Map<String, Class<? extends SaslClientFactory>> parseProperties(Properties props)
+    {
+        Enumeration e = props.propertyNames();
+        TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+                new TreeMap<String, Class<? extends SaslClientFactory>>();
+        while (e.hasMoreElements())
+        {
+            String mechanism = (String) e.nextElement();
+            String className = props.getProperty(mechanism);
+            try
+            {
+                Class<?> clazz = Class.forName(className);
+                if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+                {
+                    _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+                    continue;
+                }
+                factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+            }
+            catch (Exception ex)
+            {
+                _logger.error("Error instantiating SaslClientFactory calss " + className  + " - skipping");
+            }
+        }
+        return factoriesToRegister;
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.properties?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.properties (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.properties Tue Sep 19 15:06:50 2006
@@ -0,0 +1 @@
+AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/JCAProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/JCAProvider.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/JCAProvider.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/JCAProvider.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import javax.security.sasl.SaslClientFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public class JCAProvider extends Provider
+{
+    public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
+    {
+        super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+              "AMQ SASL providers that want to be registered");
+        register(providerMap);
+        Security.addProvider(this);
+    }
+
+    private void register(Map<String, Class<? extends SaslClientFactory>> providerMap)
+    {
+        for (Map.Entry<String, Class<? extends SaslClientFactory>> me :
+             providerMap.entrySet())
+        {
+            put("SaslClientFactory." + me.getKey(), me.getValue().getName());
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/JCAProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
+import javax.security.auth.callback.*;
+import java.io.IOException;
+
+public class UsernamePasswordCallbackHandler implements AMQCallbackHandler
+{
+    private AMQProtocolSession _protocolSession;
+
+    public void initialise(AMQProtocolSession protocolSession)
+    {
+        _protocolSession = protocolSession;
+    }
+
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+    {
+        for (int i = 0; i < callbacks.length; i++)
+        {
+            Callback cb = callbacks[i];
+            if (cb instanceof NameCallback)
+            {
+                ((NameCallback)cb).setName(_protocolSession.getUsername());
+            }
+            else if (cb instanceof PasswordCallback)
+            {
+                ((PasswordCallback)cb).setPassword(_protocolSession.getPassword().toCharArray());
+            }
+            else
+            {
+                throw new UnsupportedCallbackException(cb);
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+    /**
+     *  The name of this mechanism
+     */
+    public static final String MECHANISM = "AMQPLAIN";
+
+    private CallbackHandler _cbh;
+
+    public AmqPlainSaslClient(CallbackHandler cbh)
+    {
+        _cbh = cbh;
+    }
+
+    public String getMechanismName()
+    {
+        return "AMQPLAIN";
+    }
+
+    public boolean hasInitialResponse()
+    {
+        return true;
+    }
+
+    public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+    {
+        // we do not care about the prompt or the default name
+        NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+        PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+        Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+        try
+        {
+            _cbh.handle(callbacks);
+        }
+        catch (Exception e)
+        {
+            throw new SaslException("Error handling SASL callbacks: " + e, e);
+        }
+        FieldTable table = new FieldTable();
+        table.put("LOGIN", nameCallback.getName());
+        table.put("PASSWORD", pwdCallback.getPassword());
+        return table.getDataAsBytes();
+    }
+
+    public boolean isComplete()
+    {
+        return true;
+    }
+
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+    {
+        throw new SaslException("Not supported");
+    }
+
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+    {
+        throw new SaslException("Not supported");
+    }
+
+    public Object getNegotiatedProperty(String propName)
+    {
+        return null;
+    }
+
+    public void dispose() throws SaslException
+    {
+        _cbh = null;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClientFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClientFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClientFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+    public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+    {
+        for (int i = 0; i < mechanisms.length; i++)
+        {
+            if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+            {
+                if (cbh == null)
+                {
+                    throw new SaslException("CallbackHandler must not be null");
+                }
+                return new AmqPlainSaslClient(cbh);
+            }
+        }
+        return null;
+    }
+
+    public String[] getMechanismNames(Map props)
+    {
+        if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+            props.containsKey(Sasl.POLICY_NOACTIVE))
+        {
+            // returned array must be non null according to interface documentation
+            return new String[0];
+        }
+        else
+        {
+            return new String[]{AmqPlainSaslClient.MECHANISM};
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/security/amqplain/AmqPlainSaslClientFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQState.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQState.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQState.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+/**
+ * States used in the AMQ protocol. Used by the finite state machine to determine
+ * valid responses.
+ */
+public class AMQState
+{
+    private final int _id;
+
+    private final String _name;
+
+    private AMQState(int id, String name)
+    {
+        _id = id;
+        _name = name;
+    }
+
+    public String toString()
+    {
+        return "AMQState: id = " + _id + " name: " + _name;
+    }
+
+    public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
+    
+    public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
+    
+    public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");        
+
+    public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
+
+    public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
+    
+    public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
+    
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateChangedEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateChangedEvent.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateChangedEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateChangedEvent.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+/**
+ * An event that is fired when the protocol state has changed.
+ * 
+ */
+public class AMQStateChangedEvent
+{
+    private final AMQState _oldState;
+
+    private final AMQState _newState;
+
+    public AMQStateChangedEvent(AMQState oldState, AMQState newState)
+    {
+        _oldState = oldState;
+        _newState = newState;
+    }
+
+    public AMQState getOldState()
+    {
+        return _oldState;
+    }
+
+    public AMQState getNewState()
+    {
+        return _newState;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateChangedEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+public interface AMQStateListener
+{
+    void stateChanged(AMQStateChangedEvent evt);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateManager.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,224 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.handler.*;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQMethodListener;
+import org.apache.qpid.framing.*;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * The state manager is responsible for managing the state of the protocol session.
+ * <p/>
+ * For each AMQProtocolHandler there is a separate state manager.
+ */
+public class AMQStateManager implements AMQMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+
+    /**
+     * The current state
+     */
+    private AMQState _currentState;
+
+    /**
+     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
+     * The class must be a subclass of AMQFrame.
+     */
+    private final Map _state2HandlersMap = new HashMap();
+
+    private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
+    public AMQStateManager()
+    {
+        this(AMQState.CONNECTION_NOT_STARTED, true);
+    }
+
+    protected AMQStateManager(AMQState state, boolean register)
+    {
+        _currentState = state;
+        if(register)
+        {
+            registerListeners();
+        }
+    }
+
+    protected void registerListeners()
+    {
+        Map frame2handlerMap = new HashMap();
+
+        // we need to register a map for the null (i.e. all state) handlers otherwise you get
+        // a stack overflow in the handler searching code when you present it with a frame for which
+        // no handlers are registered
+        //
+        _state2HandlersMap.put(null, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
+
+        //
+        // ConnectionOpen handlers
+        //
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
+        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
+        frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+        frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
+    }
+
+    public AMQState getCurrentState()
+    {
+        return _currentState;
+    }
+
+    public void changeState(AMQState newState) throws AMQException
+    {
+        _logger.debug("State changing to " + newState + " from old state " + _currentState);
+        final AMQState oldState = _currentState;
+        _currentState = newState;
+
+        synchronized (_stateListeners)
+        {
+            final Iterator it = _stateListeners.iterator();
+            while (it.hasNext())
+            {
+                final StateListener l = (StateListener) it.next();
+                l.stateChanged(oldState, newState);
+            }
+        }
+    }
+
+    public void error(Exception e)
+    {
+        _logger.debug("State manager receive error notification: " + e);
+        synchronized (_stateListeners)
+        {
+            final Iterator it = _stateListeners.iterator();
+            while (it.hasNext())
+            {
+                final StateListener l = (StateListener) it.next();
+                l.error(e);
+            }
+        }
+    }
+
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+    {
+        StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
+        if (handler != null)
+        {
+            handler.methodReceived(this, evt);
+            return true;
+        }
+        return false;
+    }
+
+    protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState,
+                                                                  AMQMethodBody frame)
+            throws IllegalStateTransitionException
+    {
+        final Class clazz = frame.getClass();
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Looking for state transition handler for frame " + clazz);
+        }
+        final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
+
+        if (classToHandlerMap == null)
+        {
+            // if no specialised per state handler is registered look for a
+            // handler registered for "all" states
+            return findStateTransitionHandler(null, frame);
+        }
+        final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
+        if (handler == null)
+        {
+            if (currentState == null)
+            {
+                _logger.debug("No state transition handler defined for receiving frame " + frame);
+                return null;
+            }
+            else
+            {
+                // if no specialised per state handler is registered look for a
+                // handler registered for "all" states
+                return findStateTransitionHandler(null, frame);
+            }
+        }
+        else
+        {
+            return handler;
+        }
+    }
+
+    public void addStateListener(StateListener listener)
+    {
+        _logger.debug("Adding state listener");
+        _stateListeners.add(listener);
+    }
+
+    public void removeStateListener(StateListener listener)
+    {
+        _stateListeners.remove(listener);
+    }
+
+    public void attainState(AMQState s) throws AMQException
+    {
+        boolean needToWait = false;
+        StateWaiter sw = null;
+        synchronized (_stateListeners)
+        {
+            if (_currentState != s)
+            {
+                _logger.debug("Adding state wait to reach state " + s);
+                sw = new StateWaiter(s);
+                addStateListener(sw);
+                // we use a boolean since we must release the lock before starting to wait
+                needToWait = true;
+            }
+        }
+        if (needToWait)
+        {
+            sw.waituntilStateHasChanged();
+        }
+        // at this point the state will have changed.
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/AMQStateManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/IllegalStateTransitionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/IllegalStateTransitionException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/IllegalStateTransitionException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/IllegalStateTransitionException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.AMQException;
+
+public class IllegalStateTransitionException extends AMQException
+{
+    private AMQState _originalState;
+
+    private Class _frame;
+
+    public IllegalStateTransitionException(AMQState originalState, Class frame)
+    {
+        super("No valid state transition defined for receiving frame " + frame +
+              " from state " + originalState);
+        _originalState = originalState;
+        _frame = frame;
+    }
+
+    public AMQState getOriginalState()
+    {
+        return _originalState;
+    }
+
+    public Class getFrameClass()
+    {
+        return _frame;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/IllegalStateTransitionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateAwareMethodListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateAwareMethodListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateAwareMethodListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+/**
+ * A frame listener that is informed of the protocl state when invoked and has
+ * the opportunity to update state.
+ *
+ */
+public interface StateAwareMethodListener
+{
+    void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateAwareMethodListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.AMQException;
+
+public interface StateListener
+{
+    void stateChanged(AMQState oldState, AMQState newState) throws AMQException;
+
+    void error(Throwable t);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateWaiter.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateWaiter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateWaiter.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+/**
+ * Waits for a particular state to be reached.
+ *
+ */
+public class StateWaiter implements StateListener
+{
+    private static final Logger _logger = Logger.getLogger(StateWaiter.class);
+
+    private final AMQState _state;
+
+    private volatile boolean _newStateAchieved;
+
+    private volatile Throwable _throwable;
+
+    private final Object _monitor = new Object();
+
+    public StateWaiter(AMQState state)
+    {
+        _state = state;
+    }
+
+    public void waituntilStateHasChanged() throws AMQException
+    {
+        synchronized (_monitor)
+        {
+            //
+            // The guard is required in case we are woken up by a spurious
+            // notify().
+            //
+            while (!_newStateAchieved && _throwable == null)
+            {
+                try
+                {
+                    _logger.debug("State " + _state + " not achieved so waiting...");
+                    _monitor.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    _logger.debug("Interrupted exception caught while waiting: " + e, e);
+                }
+            }
+        }
+
+        if (_throwable != null)
+        {
+            _logger.debug("Throwable reached state waiter: " + _throwable);
+            if (_throwable instanceof AMQException)
+            {
+                throw (AMQException) _throwable;
+            }
+            else
+            {
+                throw new AMQException("Error: "  + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
+            }
+        }
+    }
+
+    public void stateChanged(AMQState oldState, AMQState newState)
+    {
+        synchronized (_monitor)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("stateChanged called");
+            }
+            if (_state == newState)
+            {
+                _newStateAchieved = true;
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("New state reached so notifying monitor");
+                }
+                _monitor.notifyAll();
+            }
+        }
+    }
+
+    public void error(Throwable t)
+    {
+        synchronized (_monitor)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("exceptionThrown called");
+            }
+
+            _throwable = t;
+            _monitor.notifyAll();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/StateWaiter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.state.listener;
+
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class SpecificMethodFrameListener extends BlockingMethodFrameListener
+{
+    private final Class _expectedClass;
+
+    public SpecificMethodFrameListener(int channelId, Class expectedClass)
+    {
+        super(channelId);
+        _expectedClass = expectedClass;
+    }
+
+    public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+    {
+        return _expectedClass.isInstance(frame);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/ITransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/ITransportConnection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/ITransportConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/ITransportConnection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.transport;
+
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.BrokerDetails;
+
+import java.io.IOException;
+
+public interface ITransportConnection
+{
+    void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+            throws IOException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/ITransportConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.transport;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class SocketTransportConnection implements ITransportConnection
+{
+    private static final Logger _logger = Logger.getLogger(SocketTransportConnection.class);
+    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+    private SocketConnectorFactory _socketConnectorFactory;
+
+    static interface SocketConnectorFactory {
+        IoConnector newSocketConnector();
+    }
+
+    public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory)
+    {
+        _socketConnectorFactory = socketConnectorFactory;
+    }
+
+    public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+            throws IOException
+    {
+        ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+
+        // the MINA default is currently to use the pooled allocator although this may change in future
+        // once more testing of the performance of the simple allocator has been done
+        if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
+        {
+            ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+        }
+
+        final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
+        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
+        // if we do not use our own thread model we get the MINA default which is to use
+        // its own leader-follower model
+        boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
+        if (readWriteThreading)
+        {
+            cfg.setThreadModel(new ReadWriteThreadModel());
+        }
+
+        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+        scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
+        scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
+        _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
+        scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
+        _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
+        final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
+        protocolHandler.setUseSSL(brokerDetail.useSSL());
+        _logger.info("Attempting connection to " + address);
+        ConnectFuture future = ioConnector.connect(address, protocolHandler);
+
+        // wait for connection to complete
+        if (future.join(brokerDetail.getTimeout()))
+        {
+            // we call getSession which throws an IOException if there has been an error connecting
+            future.getSession();
+        }
+        else
+        {
+            throw new IOException("Timeout waiting for connection.");
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.transport;
+
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+
+/**
+ * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up
+ * the underlying connector, which currently always uses TCP/IP sockets. It creates the
+ * "protocol handler" which deals with MINA protocol events.
+ *
+ * Could be extended in future to support different transport types by turning this into concrete class/interface
+ * combo.
+ */
+public class TransportConnection
+{
+    private static ITransportConnection _instance;
+
+    static
+    {
+        if (Boolean.getBoolean("amqj.useBlockingIo"))
+        {
+            _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() {
+                public IoConnector newSocketConnector() {
+                    return new org.apache.qpid.bio.SocketConnector(); // blocking connector
+                }
+            });
+        }
+        else
+        {
+            _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() {
+                public IoConnector newSocketConnector() {
+                    SocketConnector result = new SocketConnector(); // non-blocking connector
+
+                    // Don't have the connector's worker thread wait around for other connections (we only use
+                    // one SocketConnector per connection at the moment anyway). This allows short-running
+                    // clients (like unit tests) to complete quickly.
+                    result.setWorkerTimeout(0L);
+
+                    return result;
+                }
+            });
+        }
+    }
+
+    public static void setInstance(ITransportConnection transport)
+    {
+        _instance = transport;
+    }
+
+    public static ITransportConnection getInstance()
+    {
+        return _instance;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A blocking queue that emits events above a user specified threshold allowing
+ * the caller to take action (e.g. flow control) to try to prevent the queue
+ * growing (much) further. The underlying queue itself is not bounded therefore
+ * the caller is not obliged to react to the events.
+ * <p/>
+ * This implementation is <b>only</b> safe where we have a single thread adding
+ * items and a single (different) thread removing items.
+ *
+ */
+public class FlowControllingBlockingQueue
+{
+    /**
+     * This queue is bounded and is used to store messages before being dispatched to the consumer
+     */
+    private final BlockingQueue _queue = new LinkedBlockingQueue();
+
+    private final int _flowControlThreshold;
+
+    private final ThresholdListener _listener;
+
+    /**
+     * We require a separate count so we can track whether we have reached the
+     * threshold
+     */
+    private int _count;
+
+    public interface ThresholdListener
+    {
+        void aboveThreshold(int currentValue);
+
+        void underThreshold(int currentValue);
+    }
+
+    public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
+    {
+        _flowControlThreshold = threshold;
+        _listener = listener;
+    }
+
+    public Object take() throws InterruptedException
+    {
+        Object o = _queue.take();
+        synchronized (_listener)
+        {
+            if (--_count == (_flowControlThreshold - 1))
+            {
+                _listener.underThreshold(_count);
+            }
+        }
+        return o;
+    }
+
+    public void add(Object o)
+    {
+        _queue.add(o);
+        synchronized (_listener)
+        {
+            if (++_count == _flowControlThreshold)
+            {
+                _listener.aboveThreshold(_count);
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/BrokerDetails.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/BrokerDetails.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/BrokerDetails.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+public interface BrokerDetails
+{
+
+    /*
+     * Known URL Options
+     * @see ConnectionURL
+    */
+    public static final String OPTIONS_RETRY = "retries";
+    public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
+    public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+    public static final int DEFAULT_PORT = 5672;
+    public static final String DEFAULT_TRANSPORT = "tcp";
+
+    public static final String URL_FORMAT_EXAMPLE =
+            "<transport>://<hostname>[:<port Default=\""+DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]";
+
+    public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+
+    String getHost();
+    void setHost(String host);
+
+    int getPort();
+    void setPort(int port);
+
+    String getTransport();
+    void setTransport(String transport);
+
+    boolean useSSL();
+    void useSSL(boolean ssl);
+
+    String getOption(String key);
+    void setOption(String key,String value);
+
+    long getTimeout();
+    void setTimeout(long timeout);
+
+    String toString();
+
+    boolean equals(Object o);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/BrokerDetails.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ChannelLimitReachedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ChannelLimitReachedException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ChannelLimitReachedException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ChannelLimitReachedException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.ResourceAllocationException;
+
+/**
+ * Indicates that the maximum number of sessions per connection limit has been reached.
+ */
+public class ChannelLimitReachedException extends ResourceAllocationException
+{
+    private static final String ERROR_CODE = "1";
+
+    private long _limit;
+
+    public ChannelLimitReachedException(long limit)
+    {
+        super("Unable to create session since maximum number of sessions per connection is " +
+              limit + ". Either close one or more sessions or increase the " +
+              "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE);
+        _limit = limit;
+    }
+
+    public long getLimit()
+    {
+        return _limit;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ChannelLimitReachedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Connection.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Connection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Connection.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+
+public interface Connection extends javax.jms.Connection
+{
+    /**
+     * @return the maximum number of sessions supported by this Connection
+     */
+    long getMaximumChannelCount();
+
+    void setConnectionListener(ConnectionListener listener);
+
+    /**
+     * Get the connection listener that has been registered with this connection, if any
+     * @return the listener or null if none has been set
+     */
+    ConnectionListener getConnectionListener();
+
+    /**
+     * Create a session specifying the prefetch limit of messages.
+     * @param transacted
+     * @param acknowledgeMode
+     * @param prefetch the maximum number of messages to buffer in the client. This
+     * applies as a total across all consumers
+     * @return
+     * @throws JMSException
+     */
+    org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode,
+                                          int prefetch) throws JMSException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/Connection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+public interface ConnectionListener
+{
+    /**
+     * Called when bytes have been transmitted to the server
+     * @param count the number of bytes sent in total since the connection was opened
+     */
+    void bytesSent(long count);
+
+    /**
+     * Called when some bytes have been received on a connection
+     * @param count the number of bytes received in total since the connection was opened
+     */
+    void bytesReceived(long count);
+
+    /**
+     * Called after the infrastructure has detected that failover is required but before attempting failover.
+     * @param redirect true if the broker requested redirect. false if failover is occurring due to a connection error.
+     * @return true to continue failing over, false to veto failover and raise a connection exception
+     */
+    boolean preFailover(boolean redirect);
+
+    /**
+     * Called after connection has been made to another broker after failover has been started but before
+     * any resubscription has been done.
+     * @return true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+     * cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+     * and consumers are invalidated.
+     */
+    boolean preResubscribe();
+
+    /**
+     * Called once failover has completed successfully. This is called irrespective of whether the client has
+     * vetoed automatic resubscription.
+     */
+    void failoverComplete();
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionURL.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionURL.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionURL.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import java.util.List;
+
+/**
+ Connection URL format
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+  */
+public interface ConnectionURL
+{
+    public static final String AMQ_PROTOCOL = "amqp";
+    public static final String OPTIONS_BROKERLIST = "brokerlist";
+    public static final String OPTIONS_FAILOVER = "failover";
+    public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+    public static final String OPTIONS_SSL = "ssl";
+
+    String getURL();
+
+    String getFailoverMethod();
+
+    String getFailoverOption(String key);
+
+    int getBrokerCount();
+
+    BrokerDetails getBrokerDetails(int index);
+
+    void addBrokerDetails(BrokerDetails broker);
+
+    List<BrokerDetails> getAllBrokerDetails();
+
+    String getClientName();
+
+    void setClientName(String clientName);
+
+    String getUsername();
+
+    void setUsername(String username);
+
+    String getPassword();
+
+    void setPassword(String password);
+
+    String getVirtualHost();
+
+    void setVirtualHost(String virtualHost);
+
+    String getOption(String key);
+
+    void setOption(String key, String value);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/jms/ConnectionURL.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message