qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r577253 [2/7] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: nclient/ nclient/impl/ nclient/util/ njms/ njms/message/
Date Wed, 19 Sep 2007 11:36:26 GMT
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient.util;
+
+import org.apache.qpidity.api.Message;
+
+/**
+ *A message listener
+ */
+public interface MessageListener
+{
+    /**
+     * Process an incoming message.
+     * 
+     * @param message The incoming message.
+     */
+    public void onMessage(Message message);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.nclient.MessagePartListener;
+
+/**
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee
+ * when all message data is read.
+ *
+ * This is a good convinience utility for handling
+ * small messages
+ */
+public class MessagePartListenerAdapter implements MessagePartListener
+{
+	MessageListener _adaptee;
+    ByteBufferMessage _currentMsg;
+
+	public MessagePartListenerAdapter(MessageListener listener)
+	{
+		_adaptee = listener;
+    }
+
+    public void messageTransfer(long transferId)
+    {
+        _currentMsg = new ByteBufferMessage(transferId);
+    }
+
+    public void data(ByteBuffer src)
+    {
+        try
+        {
+            _currentMsg.appendData(src);
+        }
+        catch(IOException e)
+        {
+            // A chance for IO exception
+            // doesn't occur as we are using
+            // a ByteBuffer
+        }
+    }
+
+    public void messageHeader(Header header)
+    {
+        _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
+        _currentMsg.setMessageProperties(header.get(MessageProperties.class));
+    }
+
+    public void messageReceived()
+    {
+        _adaptee.onMessage(_currentMsg);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,38 @@
+package org.apache.qpidity.nclient.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public abstract class ReadOnlyMessage implements Message
+{
+    MessageProperties _messageProperties;
+    DeliveryProperties _deliveryProperties;
+        
+    public void appendData(byte[] src)
+    {
+        throw new UnsupportedOperationException("This Message is read only after the initial source");
+    }
+
+    public void appendData(ByteBuffer src)
+    {
+        throw new UnsupportedOperationException("This Message is read only after the initial source");
+    }
+
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _deliveryProperties;
+    }
+
+    public MessageProperties getMessageProperties()
+    {
+        return _messageProperties;
+    }
+    
+    public void clearData()
+    {
+        throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data");
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public class StreamingMessage extends ReadOnlyMessage implements Message
+{
+    SocketChannel _socChannel;
+    private int _chunkSize;
+    private ByteBuffer _readBuf;
+    
+    public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+    {
+        _messageProperties = messageProperties;
+        _deliveryProperties = deliveryProperties;
+        
+        _socChannel = in;
+        _chunkSize = chunkSize;
+        _readBuf = ByteBuffer.allocate(_chunkSize);
+    }
+    
+    public void readData(byte[] target) throws IOException
+    {
+        throw new UnsupportedOperationException(); 
+    }
+
+    public ByteBuffer readData() throws IOException
+    {
+        if(_socChannel.isConnected() && _socChannel.isOpen())
+        {
+            _readBuf.clear();
+            _socChannel.read(_readBuf);
+        }
+        else
+        {
+            throw new EOFException("The underlying socket/channel has closed");
+        }
+        
+        return _readBuf.duplicate();
+    }
+    
+    /**
+     * This message is used by an application user to
+     * provide data to the client library using pull style
+     * semantics. Since the message is not transfered yet, it
+     * does not have a transfer id. Hence this method is not
+     * applicable to this implementation.    
+     */
+    public long getMessageTransferId()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,521 @@
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+import javax.naming.*;
+import javax.naming.spi.ObjectFactory;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.BrokerDetails;
+import org.apache.qpidity.url.QpidURLImpl;
+import org.apache.qpidity.url.QpidURL;
+import org.apache.qpidity.url.BindingURLImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Hashtable;
+import java.net.MalformedURLException;
+
+/**
+ * Implements all the JMS connection factories.
+ * <p> In all the implementations in our code base
+ * when we create a Reference we pass in <code>ConnectionFactoryImpl</code> as the
+ * factory for creating the objects. This is the factory (or
+ * {@link ObjectFactory}) that is used to turn the description in to a real object.
+ * <p>In our construction of the Reference the last param. is null,
+ * we could put a url to a jar that contains our {@link ObjectFactory} so that
+ * any of our objects stored in JNDI can be recreated without even having
+ * the classes locally. As it is the <code>ConnectionFactoryImpl</code> must be on the
+ * classpath when you do a lookup in a JNDI context.. else you'll get a
+ * ClassNotFoundEx.
+ */
+public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
+                                              XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory,
+                                              ObjectFactory, Referenceable
+{
+    /**
+     * this ConnectionFactoryImpl's logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(ConnectionFactoryImpl.class);
+
+    /**
+     * The virtual host on which the broker is deployed.
+     */
+    private String _host;
+    /**
+     * The port on which the broker is listening for connection.
+     */
+    private int _port;
+    /**
+     * The default user name used of user identification.
+     */
+    private String _defaultUsername;
+    /**
+     * The default password used of user identification.
+     */
+    private String _defaultPassword;
+    /**
+     * The virtual host on which the broker is deployed.
+     */
+    private String _virtualHost;
+    /**
+     * The URL used to build this factory, (not yet supported)
+     */
+    private QpidURL _qpidURL;
+
+    // Undefined at the moment
+    public ConnectionFactoryImpl(QpidURL url)
+    {
+        _qpidURL = url;
+    }
+
+    public ConnectionFactoryImpl(String url) throws MalformedURLException
+    {
+        _qpidURL = new QpidURLImpl(url);
+        BrokerDetails bd = _qpidURL.getAllBrokerDetails().get(0);
+        _host = bd.getHost();
+        _port = bd.getPort();
+        _defaultUsername = bd.getUserName();
+        _defaultPassword = bd.getPassword();
+        _virtualHost = bd.getVirtualHost();
+    }
+
+    /**
+     * Create a connection Factory
+     *
+     * @param host            The broker host name.
+     * @param port            The port on which the broker is listening for connection.
+     * @param virtualHost     The virtual host on which the broker is deployed.
+     * @param defaultUsername The user name used of user identification.
+     * @param defaultPassword The password used of user identification.
+     */
+    public ConnectionFactoryImpl(String host, int port, String virtualHost, String defaultUsername,
+                                 String defaultPassword)
+    {
+        _host = host;
+        _port = port;
+        _defaultUsername = defaultUsername;
+        _defaultPassword = defaultPassword;
+        _virtualHost = virtualHost;
+    }
+
+    //-- Interface ConnectionFactory
+
+    /**
+     * Creates a connection with the default user identity.
+     * <p> The connection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created connection.
+     * @throws JMSException         If creating the connection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public Connection createConnection() throws JMSException
+    {
+        try
+        {
+            return new ConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a connection with the specified user identity.
+     * <p> The connection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created connection.
+     * @throws JMSException         If creating the connection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public Connection createConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new ConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    // ----------------------------------------
+    // Support for JMS 1.0 classes
+    // ----------------------------------------
+    //--- Interface QueueConnection
+    /**
+     * Creates a queueConnection with the default user identity.
+     * <p> The queueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created queueConnection
+     * @throws JMSException         If creating the queueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public QueueConnection createQueueConnection() throws JMSException
+    {
+        try
+        {
+            return new QueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a queueConnection with the specified user identity.
+     * <p> The queueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created queueConnection.
+     * @throws JMSException         If creating the queueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public QueueConnection createQueueConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new QueueConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    //--- Interface TopicConnection
+    /**
+     * Creates a topicConnection with the default user identity.
+     * <p> The topicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created topicConnection
+     * @throws JMSException         If creating the topicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public TopicConnection createTopicConnection() throws JMSException
+    {
+        try
+        {
+            return new TopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a topicConnection with the specified user identity.
+     * <p> The topicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created topicConnection.
+     * @throws JMSException         If creating the topicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public TopicConnection createTopicConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new TopicConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    // ---------------------------------------------------------------------------------------------------
+    // the following methods are provided for XA compatibility
+    // ---------------------------------------------------------------------------------------------------
+
+    /**
+     * Creates a XAConnection with the default user identity.
+     * <p> The XAConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XAConnection
+     * @throws JMSException         If creating the XAConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAConnection createXAConnection() throws JMSException
+    {
+        try
+        {
+            return new XAConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a XAConnection with the specified user identity.
+     * <p> The XAConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XAConnection.
+     * @throws JMSException         If creating the XAConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAConnection createXAConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new XAConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+
+    /**
+     * Creates a XATopicConnection with the default user identity.
+     * <p> The XATopicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XATopicConnection
+     * @throws JMSException         If creating the XATopicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XATopicConnection createXATopicConnection() throws JMSException
+    {
+        try
+        {
+            return new XATopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a XATopicConnection with the specified user identity.
+     * <p> The XATopicConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XATopicConnection.
+     * @throws JMSException         If creating the XATopicConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new XATopicConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a XAQueueConnection with the default user identity.
+     * <p> The XAQueueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @return A newly created XAQueueConnection
+     * @throws JMSException         If creating the XAQueueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAQueueConnection createXAQueueConnection() throws JMSException
+    {
+        try
+        {
+            return new XAQueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    /**
+     * Creates a XAQueueConnection with the specified user identity.
+     * <p> The XAQueueConnection is created in stopped mode. No messages
+     * will be delivered until the <code>Connection.start</code> method
+     * is explicitly called.
+     *
+     * @param username the caller's user name
+     * @param password the caller's password
+     * @return A newly created XAQueueConnection.
+     * @throws JMSException         If creating the XAQueueConnection fails due to some internal error.
+     * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+     */
+    public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
+    {
+        try
+        {
+            return new XAQueueConnectionImpl(_host, _port, _virtualHost, username, password);
+        }
+        catch (QpidException e)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("PRoblem when creating connection", e);
+            }
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+    }
+
+    // ----------------------------------------
+    // Support for JNDI
+    // ----------------------------------------
+
+    /**
+     * Creates an object using the location or reference information
+     * specified.
+     *
+     * @param obj         The possibly null object containing location or reference
+     *                    information that can be used in creating an object.
+     * @param name        The name of this object relative to <code>nameCtx</code>,
+     *                    or null if no name is specified.
+     * @param nameCtx     The context relative to which the <code>name</code>
+     *                    parameter is specified, or null if <code>name</code> is
+     *                    relative to the default initial context.
+     * @param environment The possibly null environment that is used in
+     *                    creating the object.
+     * @return The object created; null if an object cannot be created.
+     * @throws Exception if this object factory encountered an exception
+     *                   while attempting to create an object, and no other object factories are
+     *                   to be tried.
+     */
+    public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable environment) throws Exception
+    {
+        if (obj instanceof Reference)
+        {
+            Reference ref = (Reference) obj;
+
+            if (ref.getClassName().equals(QueueImpl.class.getName()))
+            {
+                RefAddr addr = ref.get(QueueImpl.class.getName());
+
+                if (addr != null)
+                {
+                    return new QueueImpl(new BindingURLImpl((String) addr.getContent()));
+                }
+            }
+
+            if (ref.getClassName().equals(TopicImpl.class.getName()))
+            {
+                RefAddr addr = ref.get(TopicImpl.class.getName());
+
+                if (addr != null)
+                {
+                    return new TopicImpl(new BindingURLImpl((String) addr.getContent()));
+                }
+            }
+
+            if (ref.getClassName().equals(DestinationImpl.class.getName()))
+            {
+                RefAddr addr = ref.get(DestinationImpl.class.getName());
+
+                if (addr != null)
+                {
+                    return new DestinationImpl(new BindingURLImpl((String) addr.getContent()));
+                }
+            }
+
+            if (ref.getClassName().equals(ConnectionFactoryImpl.class.getName()))
+            {
+                RefAddr addr = ref.get(ConnectionFactoryImpl.class.getName());
+                if (addr != null)
+                {
+                    return new ConnectionFactoryImpl(new QpidURLImpl((String) addr.getContent()));
+                }
+            }
+
+        }
+        return null;
+    }
+
+    //-- interface Reference
+    /**
+     * Retrieves the Reference of this object.
+     *
+     * @return The non-null Reference of this object.
+     * @throws NamingException If a naming exception was encountered while retrieving the reference.
+     */
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(ConnectionFactoryImpl.class.getName(),
+                             new StringRefAddr(ConnectionFactoryImpl.class.getName(), _qpidURL.getURL()),
+                             ConnectionFactoryImpl.class.getName(), null);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,503 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.QpidURL;
+import org.apache.qpidity.nclient.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements javax.njms.Connection, javax.njms.QueueConnection and javax.njms.TopicConnection
+ */
+public class ConnectionImpl implements Connection
+{
+    /**
+     * This class's logger
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(ConnectionImpl.class);
+
+    /**
+     * Maps from session id (Integer) to SessionImpl instance
+     */
+    protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
+
+    /**
+     * This is the clientID
+     */
+    private String _clientID;
+
+    /**
+     * The Exception listenr get informed when a serious problem is detected
+     */
+    private ExceptionListener _exceptionListener;
+
+    /**
+     * Whether this connection is started, i.e. whether messages are flowing to consumers.
+     * It has no meaning for message publication.
+     */
+    private boolean _started;
+
+    /**
+     * set to true if this Connection has been closed.
+     * <p/>
+     * A closed Connection cannot accept invocations to any of its methods with the exception
+     * of close(). All other methods should throw javax.njms.IllegalStateExceptions if the
+     * Connection has been closed.
+     * <p/>
+     * A Connection is open after creation, but not started. Once it has been closed, a Connection
+     * cannot be reused any more.
+     */
+    private boolean _isClosed = false;
+
+
+    /**
+     * The QpidConeection instance that is mapped with thie JMS connection
+     */
+    org.apache.qpidity.nclient.Connection _qpidConnection;
+
+    /**
+     * This is the exception listener for this qpid connection.
+     * The njms exception listener is registered with this listener.
+     */
+    QpidExceptionListenerImpl _qpidExceptionListener;
+
+    //------ Constructors ---//
+    /**
+     * Create a connection.
+     *
+     * @param host        The broker host name.
+     * @param port        The port on which the broker is listening for connection.
+     * @param virtualHost The virtual host on which the broker is deployed.
+     * @param username    The user name used of user identification.
+     * @param password    The password name used of user identification.
+     * @throws QpidException If creating a connection fails due to some internal error.
+     */
+    protected ConnectionImpl(String host, int port, String virtualHost, String username, String password)
+            throws QpidException
+    {
+        _qpidConnection = Client.createConnection();
+        _qpidConnection.connect(host, port, virtualHost, username, password);
+    }
+
+    /**
+     * Create a connection from a QpidURL
+     *
+     * @param qpidURL The url used to create this connection
+     * @throws QpidException If creating a connection fails due to some internal error.
+     */
+    protected ConnectionImpl(QpidURL qpidURL) throws QpidException
+    {
+        _qpidConnection = Client.createConnection();
+        _qpidConnection.connect(qpidURL);
+    }
+
+    //---- Interface javax.njms.Connection ---//
+    /**
+     * Creates a Session
+     *
+     * @param transacted      Indicates whether the session is transacted.
+     * @param acknowledgeMode ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+     *                        <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return A newly created session
+     * @throws JMSException If the Connection object fails to create a session due to some internal error.
+     */
+    public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        checkNotClosed();
+        SessionImpl session;
+        try
+        {
+            session = new SessionImpl(this, transacted, acknowledgeMode, false);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add this session with the list of session that are handled by this connection
+        _sessions.add(session);
+        return session;
+    }
+
+    /**
+     * Gets the client identifier for this connection.
+     * <P>It is either preconfigured as a JNDI property or assigned dynamically by the application
+     * by calling the <code>setClientID</code> method.
+     * <p/>
+     * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE>
+     *
+     * @return The unique client identifier.
+     * @throws JMSException If this connection is closed.
+     */
+    public String getClientID() throws JMSException
+    {
+        checkNotClosed();
+        return _clientID;
+    }
+
+    /**
+     * Sets the client identifier for this connection.
+     * <P>The preferred way to assign a JMS client's client identifier is for
+     * it to be configured in a client-specific <CODE>ConnectionFactory</CODE>
+     * object and transparently assigned to the <CODE>Connection</CODE> object
+     * it creates.
+     * <p> In Qpid it is not possible to change the client ID. If one is not specified
+     * upon connection construction, an id is generated automatically. Therefore
+     * we can always throw an exception.
+     * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE>
+     *
+     * @param clientID the unique client identifier
+     * @throws JMSException Always as clientID is always set at construction time.
+     */
+    public void setClientID(String clientID) throws JMSException
+    {
+        checkNotClosed();
+        throw new IllegalStateException("Client name cannot be changed after being set");
+    }
+
+    /**
+     * Gets the metadata for this connection.
+     *
+     * @return The connection metadata
+     * @throws JMSException If there ie a problem getting the connection metadata for this connection.
+     * @see javax.jms.ConnectionMetaData
+     */
+    public ConnectionMetaData getMetaData() throws JMSException
+    {
+        checkNotClosed();
+        return ConnectionMetaDataImpl.getInstance();
+    }
+
+    /**
+     * Gets the <CODE>ExceptionListener</CODE> object for this connection.
+     *
+     * @return the <CODE>ExceptionListener</CODE> for this connection
+     * @throws JMSException In case of unforeseen problem
+     */
+    public synchronized ExceptionListener getExceptionListener() throws JMSException
+    {
+        checkNotClosed();
+        return _exceptionListener;
+    }
+
+    /**
+     * Sets an exception listener for this connection.
+     * <p/>
+     * <p> The JMS specification says:
+     * <P>If a JMS provider detects a serious problem with a connection, it
+     * informs the connection's <CODE>ExceptionListener</CODE>, if one has been
+     * registered. It does this by calling the listener's
+     * <CODE>onException</CODE> method, passing it a <CODE>JMSException</CODE>
+     * object describing the problem.
+     * <p/>
+     * <P>A connection serializes execution of its
+     * <CODE>ExceptionListener</CODE>.
+     * <p/>
+     * <P>A JMS provider should attempt to resolve connection problems
+     * itself before it notifies the client of them.
+     *
+     * @param exceptionListener The connection listener.
+     * @throws JMSException If the connection is closed.
+     */
+    public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
+    {
+        checkNotClosed();
+        _exceptionListener = exceptionListener;
+        _qpidExceptionListener.setJMSExceptionListner(_exceptionListener);
+    }
+
+    /**
+     * Starts (or restarts) a connection's delivery of incoming messages.
+     * A call to start on a connection that has already been
+     * started is ignored.
+     *
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public synchronized void start() throws JMSException
+    {
+        checkNotClosed();
+        if (!_started)
+        {
+            // start all the sessions
+            for (SessionImpl session : _sessions)
+            {
+                try
+                {
+                    session.start();
+                }
+                catch (Exception e)
+                {
+                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                }
+            }
+            _started = true;
+        }
+    }
+
+    /**
+     * Temporarily stops a connection's delivery of incoming messages.
+     * <p> The JMS specification says:
+     * <p> Delivery can be restarted using the connection's <CODE>start</CODE>
+     * method. When the connection is stopped, delivery to all the connection's message consumers is inhibited:
+     * synchronous receives block, and messages are not delivered to message listeners.
+     * <P>This call blocks until receives and/or message listeners in progress have completed.
+     *
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public synchronized void stop() throws JMSException
+    {
+        checkNotClosed();
+        if (_started)
+        {
+            // stop all the sessions
+            for (SessionImpl session : _sessions)
+            {
+                try
+                {
+                    session.stop();
+                }
+                catch (Exception e)
+                {
+                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                }
+            }
+            _started = false;
+        }
+    }
+
+    /**
+     * Closes the connection.
+     * <p/>
+     * <p> The JMS specification says:
+     * <P>Since a provider typically allocates significant resources outside
+     * the JVM on behalf of a connection, clients should close these resources
+     * when they are not needed. Relying on garbage collection to eventually
+     * reclaim these resources may not be timely enough.
+     * <P>There is no need to close the sessions, producers, and consumers of a closed connection.
+     * <P>Closing a connection causes all temporary destinations to be deleted.
+     * <P>When this method is invoked, it should not return until message
+     * processing has been shut down in an orderly fashion.
+     *
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public synchronized void close() throws JMSException
+    {
+        checkNotClosed();
+        if (!_isClosed)
+        {
+            _isClosed = true;
+            _started = false;
+            // close all the sessions
+            for (SessionImpl session : _sessions)
+            {
+                session.close();
+            }
+            // close the underlaying Qpid connection
+            try
+            {
+                _qpidConnection.close();
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
+        }
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility for App server integration.
+     *
+     * @param destination     The destination to access.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @param sessionPool     The session pool to associate with this connection consumer.
+     * @param maxMessages     The maximum number of messages that can be assigned to a server session at one time.
+     * @return Null for the moment.
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages)
+            throws JMSException
+    {
+        checkNotClosed();
+        return null;
+    }
+
+    /**
+     * Create a durable connection consumer for this connection (optional operation).
+     *
+     * @param topic            The topic to access.
+     * @param subscriptionName Durable subscription name.
+     * @param messageSelector  Only messages with properties matching the message selector expression are delivered.
+     * @param sessionPool      The server session pool to associate with this durable connection consumer.
+     * @param maxMessages      The maximum number of messages that can be assigned to a server session at one time.
+     * @return Null for the moment.
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+                                                              String messageSelector, ServerSessionPool sessionPool,
+                                                              int maxMessages) throws JMSException
+    {
+        checkNotClosed();
+        return null;
+    }
+
+    //-------------- QueueConnection API
+
+    /**
+     * Create a QueueSession.
+     *
+     * @param transacted      Indicates whether the session is transacted.
+     * @param acknowledgeMode Indicates whether the consumer or the
+     *                        client will acknowledge any messages it receives; ignored if the session
+     *                        is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+     *                        <code>Session.CLIENT_ACKNOWLEDGE</code> and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return A queueSession object/
+     * @throws JMSException If creating a QueueSession fails due to some internal error.
+     */
+    public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        checkNotClosed();
+        QueueSessionImpl queueSession;
+        try
+        {
+            queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add this session to the list of handled sessions.
+        _sessions.add(queueSession);
+        return queueSession;
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility for App server integration.
+     *
+     * @param queue           The queue to access.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @param sessionPool     The session pool to associate with this connection consumer.
+     * @param maxMessages     The maximum number of messages that can be assigned to a server session at one time.
+     * @return Null for the moment.
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages)
+            throws JMSException
+    {
+        return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages);
+    }
+
+    //-------------- TopicConnection API
+    /**
+     * Create a TopicSession.
+     *
+     * @param transacted      Indicates whether the session is transacted
+     * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+     *                        <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+     * @return a newly created topic session
+     * @throws JMSException  If creating the session fails due to some internal error.
+     */
+    public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+    {
+        checkNotClosed();
+        TopicSessionImpl session;
+        try
+        {
+            session = new TopicSessionImpl(this, transacted, acknowledgeMode);
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        // add the session with this Connection's sessions
+        // important for when the Connection is closed.
+        _sessions.add(session);
+        return session;
+    }
+
+    /**
+     * Creates a connection consumer for this connection (optional operation).
+     * This is an expert facility for App server integration.
+     *
+     * @param topic           The topic to access.
+     * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+     * @param sessionPool     The session pool to associate with this connection consumer.
+     * @param maxMessages     The maximum number of messages that can be assigned to a server session at one time.
+     * @return Null for the moment.
+     * @throws JMSException In case of a problem due to some internal error.
+     */
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+                                                       ServerSessionPool sessionPool, int maxMessages)
+            throws JMSException
+    {
+        return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages);
+    }
+
+    //-------------- protected and private methods
+    /**
+     * Validate that the Connection is not closed.
+     * <p/>
+     * If the Connection has been closed, throw a IllegalStateException. This behaviour is
+     * required by the JMS specification.
+     *
+     * @throws IllegalStateException If the session is closed.
+     */
+    protected synchronized void checkNotClosed() throws IllegalStateException
+    {
+        if (_isClosed)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Connection has been closed. Cannot invoke any further operations.");
+            }
+            throw new javax.jms.IllegalStateException(
+                    "Connection has been closed. Cannot invoke any further operations.");
+        }
+    }
+
+    /**
+     * Provide access to the underlying qpid Connection.
+     *
+     * @return This JMS connection underlying Qpid Connection.
+     */
+    protected org.apache.qpidity.nclient.Connection getQpidConnection()
+    {
+        return _qpidConnection;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,165 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpid.common.QpidProperties;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+import java.util.Enumeration;
+
+/**
+ * Implements javax.njms.ConnectionMetaData
+ * A ConnectionMetaDataImpl provides information describing the JMS <code>Connection</code>.
+ */
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+
+    /**
+     * A singleton instance.
+     */
+    static ConnectionMetaDataImpl _singleton = new ConnectionMetaDataImpl();
+
+    // ------------------------  The metadata
+    // JMS major version
+    private static final int JMS_MAJOR_VERSION = 1;
+    // JMS minor version
+    private static final int JMS_MINOR_VERSION = 1;
+    // JMS version
+    private static final String JMS_VERSION = "1.1";
+    // Provider name
+    private static final String PROVIDER_NAME = "Apache " + QpidProperties.getProductName();
+    // Provider major version
+    private static final int PROVIDER_MAJOR_VERSION = 0;
+    // Provider minor version
+    private static final int PROVIDER_MINOR_VERSION = 10;
+    // Provider version
+    private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "]  ; Protocol: [ 0.10 ] )";
+
+    /**
+     * Prevent instantiation.
+     */
+    private ConnectionMetaDataImpl()
+    {
+    }
+
+    /**
+     * Get the singleton instance of ConnectionMetaDataImpl.
+     *
+     * @return the singleton instance of ConnectionMetaDataImpl.
+     */
+    public static ConnectionMetaDataImpl getInstance()
+    {
+        return _singleton;
+    }
+
+    //-- Connection MetaData API
+
+    /**
+     * Gets the JMS API version.
+     *
+     * @return the JMS API version
+     * @throws JMSException Never
+     */
+    public String getJMSVersion() throws JMSException
+    {
+        return JMS_VERSION;
+    }
+
+
+    /**
+     * Gets the JMS major version number.
+     *
+     * @return the JMS API major version number
+     * @throws JMSException Never
+     */
+    public int getJMSMajorVersion() throws JMSException
+    {
+        return JMS_MAJOR_VERSION;
+    }
+
+
+    /**
+     * Gets the JMS minor version number.
+     *
+     * @return the JMS API minor version number
+     * @throws JMSException Never
+     */
+    public int getJMSMinorVersion() throws JMSException
+    {
+        return JMS_MINOR_VERSION;
+    }
+
+
+    /**
+     * Gets Qpid name.
+     *
+     * @return Qpid name
+     * @throws JMSException Never
+     */
+    public String getJMSProviderName() throws JMSException
+    {
+        return PROVIDER_NAME;
+    }
+
+    /**
+     * Gets Qpid version.
+     *
+     * @return Qpid version
+     * @throws JMSException Never
+     */
+    public String getProviderVersion() throws JMSException
+    {
+        return PROVIDER_VERSION;
+        // TODO: We certainly can dynamically get the server version.
+    }
+
+    /**
+     * Gets Qpid major version number.
+     *
+     * @return Qpid major version number
+     * @throws JMSException Never
+     */
+    public int getProviderMajorVersion() throws JMSException
+    {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    /**
+     * Gets Qpid minor version number.
+     *
+     * @return Qpid minor version number
+     * @throws JMSException Never
+     */
+    public int getProviderMinorVersion() throws JMSException
+    {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    /**
+     * Gets an enumeration of the JMSX property names.
+     *
+     * @return an Enumeration of JMSX property names
+     * @throws JMSException if cannot retrieve metadata due to some internal error.
+     */
+    public Enumeration getJMSXPropertyNames() throws JMSException
+    {
+        return CustomJMSXProperty.asEnumeration();
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,47 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public enum CustomJMSXProperty
+{
+    JMS_AMQP_NULL,
+    JMS_QPID_DESTTYPE,
+    JMSXGroupID,
+    JMSXGroupSeq;
+
+    private static Enumeration _names;
+
+    public static synchronized Enumeration asEnumeration()
+    {
+        if (_names == null)
+        {
+            CustomJMSXProperty[] properties = values();
+            ArrayList<String> nameList = new ArrayList<String>(properties.length);
+            for (CustomJMSXProperty property : properties)
+            {
+                nameList.add(property.toString());
+            }
+            _names = Collections.enumeration(nameList);
+        }
+        return _names;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,259 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.url.BindingURLImpl;
+import org.apache.qpidity.url.URLSyntaxException;
+import org.apache.qpid.url.URLHelper;
+
+import javax.jms.Destination;
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+
+/**
+ * Implementation of the JMS Destination interface
+ */
+public class DestinationImpl implements Destination, Referenceable
+{
+    /**
+     * The destination's name
+     */
+    protected String _destinationName = null;
+
+    /**
+     * The excahnge name
+     */
+    protected String _exchangeName;
+
+    /**
+     * The excahnge class
+     */
+    protected String _exchangeType;
+
+    /**
+     * The queue name
+     */
+    protected String _queueName;
+
+    /**
+     * Indicate whether this destination is exclusive
+     */
+    protected boolean _isExclusive;
+
+    /**
+     * Indicates whether this destination is auto delete.
+     */
+    protected boolean _isAutoDelete;
+
+    /**
+     * Indicates whether this destination is durable
+     */
+    protected boolean _isDurable;
+    
+    protected String _routingKey;
+
+    /**
+     * The biding URL used to create this destiantion
+     */
+    protected BindingURL _url;
+
+    //--- Constructor
+
+    protected DestinationImpl(String name) throws QpidException
+    {
+       _queueName = name;
+       _routingKey = name;
+    }
+
+    /**
+     * Create a destiantion from a binding URL
+     *
+     * @param binding The URL
+     * @throws QpidException If the URL is not valid
+     */
+    public DestinationImpl(BindingURL binding) throws QpidException
+    {
+        _exchangeName = binding.getExchangeName();
+        _exchangeType = binding.getExchangeClass();
+        _destinationName = binding.getDestinationName();
+        _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+        _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+        _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+        _queueName = binding.getQueueName();
+        _routingKey = binding.getQueueName();
+        _url = binding;
+    }
+
+    //---- Getters and Setters
+    /**
+     * Overrides Object.toString();
+     *
+     * @return Stringified destination representation.
+     */
+    public String toString()
+    {
+        return _destinationName;
+    }
+
+    /**
+     * Get the destination name.
+     *
+     * @return The destination name
+     */
+    public String getDestinationName()
+    {
+        return _destinationName;
+    }
+
+
+    /**
+     * The exchange name
+     *
+     * @return The exchange name
+     */
+    public String getExchangeName()
+    {
+        return _exchangeName;
+    }
+
+    /**
+     * The exchange type.
+     *
+     * @return The exchange type.
+     */
+    public String getExchangeType()
+    {
+        return _exchangeType;
+    }
+
+    /**
+     * The queue name.
+     *
+     * @return The queue name.
+     */
+    public String getQpidQueueName()
+    {
+        return _queueName;
+    }
+
+    /**
+     * Indicates whether this destination is exclusive.
+     *
+     * @return true if this destination is exclusive.
+     */
+    public boolean isExclusive()
+    {
+        return _isExclusive;
+    }
+
+    /**
+     * Indicates whether this destination is AutoDelete.
+     *
+     * @return true if this destination is AutoDelete.
+     */
+    public boolean isAutoDelete()
+    {
+        return _isAutoDelete;
+    }
+
+    public String getRoutingKey()
+    {
+        return _routingKey;
+    }
+    
+    /**
+     * Indicates whether this destination is Durable.
+     *
+     * @return true if this destination is Durable.
+     */
+    public boolean isDurable()
+    {
+        return _isDurable;
+    }
+
+    //----- Interface Referenceable
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+                             ConnectionFactoryImpl.class.getName(), // factory
+                             null);          // factory location
+    }
+
+    //--- non public method s
+
+    /**
+     * Get the URL used to create this destiantion
+     *
+     * @return The URL used to create this destiantion
+     */
+    public String toURL()
+    {
+        if (_url == null)
+        {
+            StringBuffer sb = new StringBuffer();
+            sb.append(_exchangeType);
+            sb.append("://");
+            sb.append(_exchangeName);
+            sb.append('/');
+            if (_destinationName != null)
+            {
+                sb.append(_destinationName);
+            }
+            sb.append('/');
+            if (_queueName != null)
+            {
+                sb.append(_queueName);
+            }
+            sb.append('?');
+            if (_isDurable)
+            {
+                sb.append(org.apache.qpid.url.BindingURL.OPTION_DURABLE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+            if (_isExclusive)
+            {
+                sb.append(org.apache.qpid.url.BindingURL.OPTION_EXCLUSIVE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+            if (_isAutoDelete)
+            {
+                sb.append(org.apache.qpid.url.BindingURL.OPTION_AUTODELETE);
+                sb.append("='true'");
+                sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+            }
+            //removeKey the last char '?' if there is no options , ',' if there are.
+            sb.deleteCharAt(sb.length() - 1);
+            try
+            {
+                _url = new BindingURLImpl(sb.toString());
+            }
+            catch (URLSyntaxException e)
+            {
+                // this should not happen.
+            }
+        }
+        return _url.getURL();
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
+
+/**
+ * Helper class for handling exceptions
+ */
+public class ExceptionHelper
+{
+    static public JMSException convertQpidExceptionToJMSException(Exception exception)
+    {
+        JMSException jmsException = null;
+        if (!(exception instanceof JMSException))
+        {
+            if (exception instanceof QpidException)
+            {
+                jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode()));
+            }
+            else
+            {
+                jmsException = new JMSException(exception.getMessage());
+            }
+            jmsException.setLinkedException(exception);
+        }
+        else
+        {
+            jmsException = (JMSException) exception;
+        }
+        return jmsException;
+    }
+
+    static public XAException convertQpidExceptionToXAException(QpidException exception)
+    {
+        String qpidErrorCode = String.valueOf(exception.getErrorCode());
+        // todo map this error to an XA code
+        int xaCode = XAException.XAER_PROTO;
+        return new XAException(xaCode);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,176 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
+/**
+ * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl.
+ */
+public abstract class MessageActor
+{
+    /**
+     * Used for debugging.
+     */
+    protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+
+    /**
+     * Indicates whether this MessageActor is closed.
+     */
+    protected boolean _isClosed = false;
+
+    /**
+     * This messageActor's session
+     */
+    private SessionImpl _session;
+
+    /**
+     * The JMS destination this actor is set for.
+     */
+    DestinationImpl _destination;
+
+    /**
+     * Indicates that this actor is stopped
+     */
+    protected boolean _isStopped;
+
+    /**
+     * The ID of this actor for the session.
+     */
+    private String _messageActorID;
+
+    //-- Constructor
+
+    //TODO define the parameters
+
+    protected MessageActor(String messageActorID)
+    {
+        _messageActorID = messageActorID;
+    }
+
+    protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID)
+    {
+        _session = session;
+        _destination = destination;
+        _messageActorID = messageActorID;
+    }
+
+    //--- public methods (part of the njms public API)
+    /**
+     * Closes the MessageActor and deregister it from its session.
+     *
+     * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+     */
+    public void close() throws JMSException
+    {
+        if (!_isClosed)
+        {
+            closeMessageActor();
+            getSession().getQpidSession().messageCancel(getMessageActorID());
+            //todo: We need to unset the qpid message listener  
+            // notify the session that this message actor is closing
+            _session.closeMessageActor(this);
+        }
+    }
+
+    //-- protected methods
+
+    /**
+     * Stop this message actor
+     *
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
+     */
+    protected void stop() throws Exception
+    {
+        _isStopped = true;
+    }
+
+    /**
+     * Start this message Actor
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+
+        _isStopped = false;
+
+    }
+
+    /**
+     * Check if this MessageActor is not closed.
+     * <p> If the MessageActor is closed, throw a javax.njms.IllegalStateException.
+     * <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
+     *
+     * @throws IllegalStateException if the MessageActor is closed
+     */
+    protected void checkNotClosed() throws IllegalStateException
+    {
+        if (_isClosed || _session == null)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Actor " + this + " is already closed");
+            }
+            throw new IllegalStateException("Actor " + this + " is already closed");
+        }
+        _session.checkNotClosed();
+    }
+
+    /**
+     * Closes a MessageActor.
+     * <p> This method is invoked when the session is closing or when this
+     * messageActor is closing.
+     *
+     * @throws JMSException If the MessaeActor cannot be closed due to some internal error.
+     */
+    protected void closeMessageActor() throws JMSException
+    {
+        if (!_isClosed)
+        {
+            getSession().getQpidSession().messageCancel(getMessageActorID());
+            _isClosed = true;
+        }
+    }
+
+    /**
+     * Get the associated session object.
+     *
+     * @return This Actor's Session.
+     */
+    public SessionImpl getSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the ID of this actor within its session.
+     *
+     * @return This actor ID.
+     */
+    protected String getMessageActorID()
+    {
+        return _messageActorID;
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message