qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r562737 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient: impl/ jms/ jms/message/
Date Sat, 04 Aug 2007 16:33:05 GMT
Author: arnaudsimon
Date: Sat Aug  4 09:32:57 2007
New Revision: 562737

URL: http://svn.apache.org/viewvc?view=rev&rev=562737
Log: (empty)

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Sat Aug  4 09:32:57 2007
@@ -89,6 +89,27 @@
     public void messageRelease(Range... range) throws QpidException
     {
         //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    public void messageFlowMode(String destination, short mode)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void messageFlow(String destination, short unit, long value)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean messageFlush(String destination)
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void messageStop(String destination)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
     }// -----------------------------------------------
     //            Local transaction methods
     //  ----------------------------------------------

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java Sat Aug  4 09:32:57 2007
@@ -225,7 +225,14 @@
             // start all the sessions
             for (SessionImpl session : _sessions)
             {
-                session.start();
+                try
+                {
+                    session.start();
+                }
+                catch (Exception e)
+                {
+                   throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                }
             }
             _started = true;
         }
@@ -249,7 +256,14 @@
             // stop all the sessions
             for (SessionImpl session : _sessions)
             {
-                session.stop();
+                try
+                {
+                    session.stop();
+                }
+                catch (Exception e)
+                {
+                   throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                }
             }
             _started = false;
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java Sat Aug  4 09:32:57 2007
@@ -22,22 +22,29 @@
 import javax.jms.JMSException;
 
 /**
- *Helper class for handling exceptions
+ * Helper class for handling exceptions
  */
 public class ExceptionHelper
 {
     static public JMSException convertQpidExceptionToJMSException(Exception exception)
     {
         JMSException jmsException = null;
-        if (exception instanceof QpidException)
+        if (!(exception instanceof JMSException))
         {
-            jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+            if (exception instanceof QpidException)
+            {
+                jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+            }
+            else
+            {
+                jmsException = new JMSException(exception.getMessage());
+            }
+            jmsException.setLinkedException(exception);
         }
         else
         {
-            jmsException = new JMSException(exception.getMessage());
+            jmsException = (JMSException) exception;
         }
-        jmsException.setLinkedException(exception);
         return jmsException;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java Sat Aug  4 09:32:57 2007
@@ -32,12 +32,12 @@
     /**
      * Used for debugging.
      */
-    private static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+    protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
 
     /**
      * Indicates whether this MessageActor is closed.
      */
-    private boolean _isClosed = false;
+    protected boolean _isClosed = false;
 
     /**
      * This messageActor's session
@@ -49,6 +49,10 @@
      */
     DestinationImpl _destination;
 
+    /**
+     * Indicates that this actor is stopped
+     */
+    protected boolean _isStopped;
 
     /**
      * The ID of this actor for the session.
@@ -59,9 +63,9 @@
 
     //TODO define the parameters
 
-     protected MessageActor()
+    protected MessageActor()
     {
-        
+
     }
 
     protected MessageActor(SessionImpl session, DestinationImpl destination)
@@ -87,7 +91,30 @@
     }
 
     //-- protected methods
-     /**
+
+    /**
+     * Stop this message actor
+     *
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
+     */
+    protected void stop() throws Exception
+    {
+        _isStopped = true;
+    }
+
+    /**
+     * Start this message Actor
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+
+        _isStopped = false;
+
+    }
+
+    /**
      * Check if this MessageActor is not closed.
      * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException.
      * <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
@@ -118,38 +145,32 @@
     {
         if (!_isClosed)
         {
-            // close the underlying qpid resource
-           /* try
+            try
             {
-            	// Arnaud I can't see where this var is initialized
-            	// I assume it's the session
-                //_qpidResource.close();
-            	
-            	
+                // cancle this destination 
+                getSession().getQpidSession().messageCancel(getMessageActorID());
             }
             catch (QpidException e)
             {
                 throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-            }*/
-        	
-        	_session.close(); //is this correct ?
+            }
             _isClosed = true;
         }
     }
 
     /**
-     * Get the associated session object. 
+     * Get the associated session object.
      *
      * @return This Actor's Session.
      */
-    protected  SessionImpl getSession()
+    protected SessionImpl getSession()
     {
         return _session;
     }
 
     /**
      * Get the ID of this actor within its session.
-     *  
+     *
      * @return This actor ID.
      */
     protected String getMessageActorID()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Sat Aug  4 09:32:57 2007
@@ -19,16 +19,21 @@
 
 //import org.apache.qpid.nclient.api.MessageReceiver;
 
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Message;
+import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.apache.qpid.nclient.impl.MessagePartListenerAdapter;
+import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+
+import javax.jms.*;
 
 /**
  * Implementation of JMS message consumer
  */
 public class MessageConsumerImpl extends MessageActor implements MessageConsumer
 {
+    public static final short MESSAGE_FLOW_MODE = 0; // we use message flow mode
 
     /**
      * This MessageConsumer's messageselector.
@@ -53,9 +58,20 @@
     private MessageListener _messageListener;
 
     /**
-     * A warpper around the JSM message listener 
+     * The synchronous message just delivered
      */
-    private MessageListenerWrapper _messageListenerWrapper;
+    private QpidMessage _incomingMessage;
+
+    /**
+     * A lcok on the syncrhonous message
+     */
+    private final Object _incomingMessageLock = new Object();
+
+    /**
+     * Indicates that this consumer is receiving a synch message
+     */
+    private boolean _isReceiving = false;
+   
 
     //----- Constructors
     /**
@@ -67,25 +83,43 @@
      * @param noLocal          If true inhibits the delivery of messages published by its own connection.
      * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
      *                         If this value is null, a non-durable subscription is created.
-     * @throws JMSException If the MessageProducerImpl cannot be created due to some internal error.
+     * @throws Exception If the MessageProducerImpl cannot be created due to some internal error.
      */
     protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
-                                  boolean noLocal, String subscriptionName) throws JMSException
+                                  boolean noLocal, String subscriptionName) throws Exception
     {
         super(session, destination);
         _messageSelector = messageSelector;
         _noLocal = noLocal;
         _subscriptionName = subscriptionName;
-        /*try
+        _isStopped = getSession().isStopped();
+        if (destination instanceof Queue)
         {
-            // TODO define the relevant options 
-            _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE);
-            _qpidResource = _qpidReceiver;
+            // this is a queue we expect that this queue exists
+            // let's create a message part assembler
+            /**
+             * A Qpid message listener that pushes messages to this consumer session when this consumer is
+             * asynchronous or directly to this consumer when it is synchronously accessed.
+             */
+            MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+            // we use the default options:  EXCLUSIVE = false, PRE-ACCQUIRE and CONFIRM = off
+            if (_noLocal)
+            {
+                getSession().getQpidSession()
+                        .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null,
+                                          Option.NO_LOCAL);
+            }
+            else
+            {
+                getSession().getQpidSession()
+                        .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null);
+            }
         }
-        catch (QpidException e)
+        else
         {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-        }*/
+            // this is a topic we need to create a temporary queue for this consumer
+            // unless this is a durable subscriber
+        }
     }
     //----- Message consumer API
 
@@ -129,20 +163,12 @@
      * @param messageListener The listener to which the messages are to be delivered
      * @throws JMSException If setting the message listener fails due to some internal error.
      */
-    public void setMessageListener(MessageListener messageListener) throws JMSException
+    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException
     {
+        // this method is synchronized as onMessage also access _messagelistener
+        // onMessage, getMessageListener and this method are the only synchronized methods
         checkNotClosed();
         _messageListener = messageListener;
-        if( messageListener == null )
-        {
-
-          _messageListenerWrapper = null;
-        }
-        else
-        {
-            _messageListenerWrapper = new MessageListenerWrapper(this);          
-              //TODO      _qpidReceiver.setAsynchronous(_messageListenerWrapper);
-        }
     }
 
     /**
@@ -160,7 +186,8 @@
 
     /**
      * Receive the next message that arrives within the specified timeout interval.
-     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer is closed.
+     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+     * is closed.
      * <p> A timeout of zero never expires, and the call blocks indefinitely.
      * <p> A timeout less than 0 throws a JMSException.
      *
@@ -170,9 +197,20 @@
      */
     public Message receive(long timeout) throws JMSException
     {
-        Message message = null;
-        // todo convert this message into a JMS one: _qpidReceiver.receive(-1);
-        return message;
+        if (timeout < 0)
+        {
+            throw new JMSException("Invalid timeout value: " + timeout);
+        }
+        Message result;
+        try
+        {
+            result = internalReceive(timeout);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
     /**
@@ -183,44 +221,221 @@
      */
     public Message receiveNoWait() throws JMSException
     {
-        return receive(-1);
+        Message result;
+        try
+        {
+            result = internalReceive(-1);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return result;
     }
 
-
     // not public methods
+
+    /**
+     * Receive a synchronous message
+     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
+     * is closed.
+     * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
+     * is closed)
+     * <p> A timeout less than 0 returns the next message or null if one is not available.
+     *
+     * @param timeout The timeout value (in milliseconds)
+     * @return the next message or null if one is not available.
+     * @throws Exception If receiving the next message fails due to some internal error.
+     */
+    private Message internalReceive(long timeout) throws Exception
+    {
+        checkNotClosed();
+        if (_messageListener != null)
+        {
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
+        }
+
+        Message result = null;
+        synchronized (_incomingMessageLock)
+        {
+            // This indicate to the delivery thread to deliver the message to this consumer
+            // as it can happens that a message is delivered after a receive operation as returned.
+            _isReceiving = true;
+            boolean received = false;
+            if (!_isStopped)
+            {
+                // if this consumer is stopped then this will be call when starting
+                getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+                received = getSession().getQpidSession().messageFlush(getMessageActorID());
+            }
+            if (!received && timeout < 0)
+            {
+                // this is a nowait and we havent received a message then we must immediatly return
+                result = null;
+            }
+            else
+            {
+                while (_incomingMessage == null && !_isClosed)
+                {
+                    try
+                    {
+                        _incomingMessageLock.wait(timeout);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // do nothing
+                    }
+                }
+                if (_incomingMessage != null)
+                {
+                    result = _incomingMessage.getJMSMessage();
+                    // tell the session that a message is inprocess
+                    getSession().preProcessMessage(_incomingMessage);
+                    // tell the session to acknowledge this message (if required)
+                    getSession().acknowledgeMessage(_incomingMessage);
+                }
+                _incomingMessage = null;
+                // We now release any message received for this consumer
+                _isReceiving = false;
+            }
+        }
+        return result;
+    }
+
     /**
-     * Stop the delivery of messages to this receiver.
+     * Stop the delivery of messages to this consumer.
      * <p>For asynchronous receiver, this operation blocks until the message listener
      * finishes processing the current message,
      *
-     * @throws JMSException If the consumer cannot be stopped due to some internal error.
+     * @throws Exception If the consumer cannot be stopped due to some internal error.
      */
-    void stop() throws JMSException
+    protected void stop() throws Exception
     {
-        /*try
+        getSession().getQpidSession().messageStop(getMessageActorID());
+        _isStopped = true;
+    }
+
+    /**
+     * Start the delivery of messages to this consumer.
+     *
+     * @throws Exception If the consumer cannot be started due to some internal error.
+     */
+    protected void start() throws Exception
+    {
+        synchronized (_incomingMessageLock)
         {
-            _qpidReceiver.stop();
+            _isStopped = false;
+            if (_isReceiving)
+            {
+                // there is a synch call waiting for a message to be delivered
+                // so tell the broker to deliver a message
+                getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+                getSession().getQpidSession().messageFlush(getMessageActorID());
+            }
         }
-        catch (QpidException e)
+    }
+
+    /**
+     * Deliver a message to this consumer.
+     *
+     * @param message The message delivered to this consumer.
+     */
+    protected synchronized void onMessage(QpidMessage message)
+    {
+        try
         {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-            }*/
+            // if this consumer is synchronous then set the current message and
+            // notify the waiting thread
+            if (_messageListener == null)
+            {
+                synchronized (_incomingMessageLock)
+                {
+                    if (_isReceiving)
+                    {
+                        _incomingMessage = message;
+                        _incomingMessageLock.notify();
+                    }
+                    else
+                    {
+                        // this message has been received after a received as returned
+                        // we need to release it
+                        releaseMessage(message);
+                    }
+                }
+            }
+            else
+            {
+                // This is an asynchronous message
+                // tell the session that a message is in process
+                getSession().preProcessMessage(message);
+                // If the session is transacted we need to ack the message first
+                // This is because a message is associated with its tx only when acked
+                if (getSession().getTransacted())
+                {
+                    getSession().acknowledgeMessage(message);
+                }
+                // The JMS specs says:
+                /* The result of a listener throwing a RuntimeException depends on the session?s
+                * acknowledgment mode.
+                ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+                * will be immediately redelivered. The number of times a JMS provider will
+                * redeliver the same message before giving up is provider-dependent.
+                ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+                * --- Transacted Session - the next message for the listener is delivered.
+                *
+                * The number of time we try redelivering the message is 0
+                **/
+                try
+                {
+                    _messageListener.onMessage(message.getJMSMessage());
+                }
+                catch (RuntimeException re)
+                {
+                    // do nothing as this message will not be redelivered
+                }
+                // If the session has been recovered we then need to redelivered this message
+                if (getSession().isInRecovery())
+                {
+                    releaseMessage(message);
+                }
+                else if (!getSession().getTransacted())
+                {
+                    // Tell the jms Session to ack this message if required
+                    getSession().acknowledgeMessage(message);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
     }
 
     /**
-     * Start the delivery of messages to this consumer.
+     * Release a message
      *
-     * @throws JMSException If the consumer cannot be started due to some internal error.
+     * @param message The message to be released
+     * @throws JMSException If the message cannot be released due to some internal error.
      */
-    void start() throws JMSException
+    private void releaseMessage(QpidMessage message) throws JMSException
     {
-        /*try
+        Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+        try
         {
-            _qpidReceiver.start();
+            getSession().getQpidSession().messageRelease(range);
         }
         catch (QpidException e)
         {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-            }*/
+            // notify the Exception listener
+            if (getSession().getConnection().getExceptionListener() != null)
+            {
+                getSession().getConnection().getExceptionListener()
+                        .onException(ExceptionHelper.convertQpidExceptionToJMSException(e));
+            }
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Excpetion when releasing message " + message, e);
+            }
+        }
     }
 }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java?view=auto&rev=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java Sat Aug  4 09:32:57 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.apache.qpidity.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session.
+ * This is for guarantying that asynch messages are sequentially processed within their session.
+ * <p> when used synchonously, messages are dispatched to the receiver itself.
+ */
+public class QpidMessageListener implements MessageListener
+{
+    /**
+     * Used for debugging.
+     */
+    private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+    /**
+     * This message listener consumer
+     */
+    MessageConsumerImpl _consumer = null;
+
+    //---- constructor
+    /**
+     * Create a message listener wrapper for a given consumer
+     *
+     * @param consumer The consumer of this listener
+     */
+    public QpidMessageListener(MessageConsumerImpl consumer)
+    {
+        _consumer = consumer;
+    }
+
+    //---- org.apache.qpid.nclient.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            //convert this message into a JMS one
+            QpidMessage jmsMessage = null; // todo
+            // if consumer is asynchronous then send this message to its session.
+            if( _consumer.getMessageListener() != null )
+            {
+                _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
+            }
+            else
+            {
+                // deliver this message to the consumer itself
+                _consumer.onMessage(jmsMessage);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java Sat Aug  4 09:32:57 2007
@@ -33,9 +33,9 @@
      * @param session         The session from which the QueueReceiverImpl is instantiated.
      * @param queue           The default queue for this QueueReceiverImpl.
      * @param messageSelector the message selector for this QueueReceiverImpl.  
-     * @throws JMSException If the QueueReceiverImpl cannot be created due to some internal error.
+     * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error.
      */
-    protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException
+    protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception
     {
         super(session, (DestinationImpl) queue, messageSelector, false, null);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java Sat Aug  4 09:32:57 2007
@@ -123,7 +123,16 @@
     {
         checkNotClosed();
         checkDestination(queue);
-        return new QueueReceiverImpl(this, queue, messageSelector);
+        QueueReceiver receiver;
+        try
+        {
+            receiver =  new QueueReceiverImpl(this, queue, messageSelector);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return receiver;
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Sat Aug  4 09:32:57 2007
@@ -21,6 +21,7 @@
 import org.slf4j.LoggerFactory;
 import org.apache.qpid.nclient.jms.message.*;
 import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
@@ -28,9 +29,9 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import java.io.Serializable;
-import java.util.Vector;
 import java.util.LinkedList;
 import java.util.HashMap;
+import java.util.ArrayList;
 
 /**
  * Implementation of the JMS Session interface
@@ -43,43 +44,41 @@
     private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
 
     /**
-     * A queue for incoming messages including synch and asych messages.
+     * A queue for incoming asynch messages.
      */
-    private LinkedList<QpidMessage> _incomingAsynchronousMessages = new LinkedList<QpidMessage>();
+    private final LinkedList<IncomingMessage> _incomingAsynchronousMessages = new LinkedList<IncomingMessage>();
 
-    //--- Session thread locking
+    //--- MessageDispatcherThread and Session locking
     /**
-     * indicates that the sessionThread has stopped
+     * indicates that the MessageDispatcherThread has stopped
      */
     private boolean _hasStopped = false;
 
     /**
-     * lock for the sessionThread to wait until the session is stopped
+     * lock for the MessageDispatcherThread to wait until the session is stopped
      */
-    private Object _stoppingLock = new Object();
+    private final Object _stoppingLock = new Object();
 
     /**
-     * lock for the stopper thread to wait on when the sessionThread is stopping
+     * lock for the stopper thread to wait on when the MessageDispatcherThread is stopping
      */
-    private Object _stoppingJoin = new Object();
+    private final Object _stoppingJoin = new Object();
 
     /**
      * thread to dispatch messages to async consumers
      */
     private MessageDispatcherThread _messageDispatcherThread = null;
-
+    //----END
 
     /**
      * The messageActors of this session.
      */
-    private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
+    private final HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
 
     /**
      * All the not yet acknoledged messages
-     * We use a vector as access to this list has to be synchronised
-     * This is because messages are acked from messagelistner threads
      */
-    private Vector<QpidMessage> _unacknowledgedMessages = new Vector<QpidMessage>();
+    private final ArrayList<QpidMessage> _unacknowledgedMessages = new ArrayList<QpidMessage>();
 
     /**
      * Indicates whether this session is closed.
@@ -156,10 +155,8 @@
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
-        // Create and start a MessageDispatcherThread
-        // This thread is dispatching messages to the async consumers
-        _messageDispatcherThread = new MessageDispatcherThread();
-        _messageDispatcherThread.start();
+        // init the message dispatcher.
+        initMessageDispatcherThread();
     }
 
     //--- javax.jms.Session API
@@ -379,10 +376,9 @@
                 // that will stop the sessionThread
                 if (_isStopped)
                 {
-                    start();
+                    startDispatchThread();
                 }
-
-                //stop the sessionThread
+                //notify the sessionThread
                 synchronized (_incomingAsynchronousMessages)
                 {
                     _incomingAsynchronousMessages.notifyAll();
@@ -401,8 +397,9 @@
             // from now all the session methods will throw a IllegalStateException
             _isClosed = true;
             // close all the actors
-            closeAllActors();
+            closeAllMessageActors();
             _messageActors.clear();
+            // We may have a thread trying to add a message
             synchronized (_incomingAsynchronousMessages)
             {
                 _incomingAsynchronousMessages.clear();
@@ -417,7 +414,6 @@
             {
                 throw ExceptionHelper.convertQpidExceptionToJMSException(e);
             }
-
         }
     }
 
@@ -451,8 +447,17 @@
         // release all unack messages
         for (QpidMessage message : _unacknowledgedMessages)
         {
-            // release all those messages
-            //Todo: message.getQpidMEssage.release();
+            // release this message
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+            try
+            {
+                getQpidSession().messageRelease(range);
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
+            // TODO We can be a little bit cleverer and build a set of ranges
         }
     }
 
@@ -559,12 +564,20 @@
      * @throws InvalidDestinationException If an invalid destination is specified.
      * @throws InvalidSelectorException    If the message selector is invalid.
      */
-    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws
-                                                                                                            JMSException
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
     {
         checkNotClosed();
         checkDestination(destination);
-        MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+        MessageConsumerImpl consumer = null;
+        try
+        {
+            consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
         // register this actor with the session
         _messageActors.put(consumer.getMessageActorID(), consumer);
         return consumer;
@@ -647,12 +660,21 @@
      * @throws InvalidDestinationException If an invalid topic is specified.
      * @throws InvalidSelectorException    If the message selector is invalid.
      */
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector,
-                                                   boolean noLocal) throws JMSException
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+            throws JMSException
     {
         checkNotClosed();
         checkDestination(topic);
-        TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
+        TopicSubscriberImpl subscriber;
+        try
+        {
+            subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal,
+                                                 _connection.getClientID() + ":" + name);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
         _messageActors.put(subscriber.getMessageActorID(), subscriber);
         return subscriber;
     }
@@ -739,43 +761,71 @@
      * Remove a message actor form this session
      * <p> This method is called when an actor is independently closed.
      *
-     * @param actor The closed actor.
+     * @param messageActor The closed actor.
+     */
+    protected void closeMessageActor(MessageActor messageActor)
+    {
+        _messageActors.remove(messageActor.getMessageActorID());
+    }
+
+    /**
+     * Idincates whether this session is stopped.
+     *
+     * @return True is this session is stopped, false otherwise.
      */
-    protected void closeMessageActor(MessageActor actor)
+    protected boolean isStopped()
     {
-        _messageActors.remove(actor);
+        return _isStopped;
     }
 
     /**
      * Start the flow of message to this session.
      *
-     * @throws JMSException If starting the session fails due to some communication error.
+     * @throws Exception If starting the session fails due to some communication error.
      */
-    protected void start() throws JMSException
+    protected synchronized void start() throws Exception
     {
         if (_isStopped)
         {
-            synchronized (_stoppingLock)
+            // start all the MessageActors
+            for (MessageActor messageActor : _messageActors.values())
             {
-                _isStopped = false;
-                _stoppingLock.notify();
-            }
-            synchronized (_stoppingJoin)
-            {
-                _hasStopped = false;
+                messageActor.start();
             }
+            startDispatchThread();
+        }
+    }
+
+    /**
+     * Restart delivery of asynch messages
+     */
+    private void startDispatchThread()
+    {
+        synchronized (_stoppingLock)
+        {
+            _isStopped = false;
+            _stoppingLock.notify();
+        }
+        synchronized (_stoppingJoin)
+        {
+            _hasStopped = false;
         }
     }
 
     /**
      * Stop the flow of message to this session.
      *
-     * @throws JMSException If stopping the session fails due to some communication error.
+     * @throws Exception If stopping the session fails due to some communication error.
      */
-    protected void stop() throws JMSException
+    protected synchronized void stop() throws Exception
     {
         if (!_isClosing && !_isStopped)
         {
+            // stop all the MessageActors
+            for (MessageActor messageActor : _messageActors.values())
+            {
+                messageActor.stop();
+            }
             synchronized (_incomingAsynchronousMessages)
             {
                 _isStopped = true;
@@ -811,6 +861,21 @@
     }
 
     /**
+     * Dispatch this message to this session asynchronous consumers
+     *
+     * @param consumerID The consumer ID.
+     * @param message    The message to be dispatched.
+     */
+    public void dispatchMessage(String consumerID, QpidMessage message)
+    {
+        synchronized (_incomingAsynchronousMessages)
+        {
+            _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message));
+            _incomingAsynchronousMessages.notifyAll();
+        }
+    }
+
+    /**
      * Indicate whether this session is recovering .
      *
      * @return true if this session is recovering.
@@ -850,7 +915,8 @@
     {
         if (dest == null)
         {
-            throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest, "Invalid destination");
+            throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest,
+                                                            "Invalid destination");
         }
     }
 
@@ -869,14 +935,25 @@
         {
             // messages will be acknowldeged by the client application.
             // store this message for acknowledging it afterward
-            _unacknowledgedMessages.add(message);
+            synchronized (_unacknowledgedMessages)
+            {
+                _unacknowledgedMessages.add(message);
+            }
         }
         else
         {
             // acknowledge this message
-            // TODO: message.acknowledgeQpidMessge();
+            Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+            try
+            {
+                getQpidSession().messageAcknowledge(range);
+            }
+            catch (QpidException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
         }
-        //TODO: Implement DUPS OK heuristic
+        //tobedone: Implement DUPS OK heuristic
     }
 
     /**
@@ -895,13 +972,25 @@
         checkNotClosed();
         if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
         {
-            for (QpidMessage message : _unacknowledgedMessages)
+            synchronized (_unacknowledgedMessages)
             {
-                // acknowledge this message
-                // TODO: message.acknowledgeQpidMessge();
+                for (QpidMessage message : _unacknowledgedMessages)
+                {
+                    // acknowledge this message
+                    Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+                    try
+                    {
+                        getQpidSession().messageAcknowledge(range);
+                    }
+                    catch (QpidException e)
+                    {
+                        throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+                    }
+                    // TODO We can be a little bit cleverer and build a set of ranges
+                }
+                //empty the list of unack messages
+                _unacknowledgedMessages.clear();
             }
-            //empty the list of unack messages
-            _unacknowledgedMessages.clear();
         }
         //else there is no effect
     }
@@ -916,13 +1005,23 @@
         return _qpidSession;
     }
 
+    /**
+     * Get this session's conneciton
+     *
+     * @return This session's connection
+     */
+    protected ConnectionImpl getConnection()
+    {
+        return _connection;
+    }
+
     //------ Private Methods
     /**
      * Close the producer and the consumers of this session
      *
      * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error.
      */
-    private void closeAllActors() throws JMSException
+    private void closeAllMessageActors() throws JMSException
     {
         for (MessageActor messageActor : _messageActors.values())
         {
@@ -930,9 +1029,66 @@
         }
     }
 
+    /**
+     * create and start the MessageDispatcherThread.
+     */
+    private synchronized void initMessageDispatcherThread()
+    {
+        // Create and start a MessageDispatcherThread
+        // This thread is dispatching messages to the async consumers
+        _messageDispatcherThread = new MessageDispatcherThread();
+        _messageDispatcherThread.start();
+    }
+
     //------ Inner classes
 
     /**
+     * Convenient class for storing incoming messages associated with a consumer ID.
+     * <p> Those messages are enqueued in _incomingAsynchronousMessages
+     */
+    private class IncomingMessage
+    {
+        // The consumer ID
+        private String _consumerId;
+        // The message
+        private QpidMessage _message;
+
+        //-- constructor
+        /**
+         * Creat a new incoming message
+         *
+         * @param consumerId The consumer ID
+         * @param message    The message to be delivered
+         */
+        IncomingMessage(String consumerId, QpidMessage message)
+        {
+            _consumerId = consumerId;
+            _message = message;
+        }
+
+        // Getters
+        /**
+         * Get the consumer ID
+         *
+         * @return The consumer ID for this message
+         */
+        public String getConsumerId()
+        {
+            return _consumerId;
+        }
+
+        /**
+         * Get the message.
+         *
+         * @return The message.
+         */
+        public QpidMessage getMessage()
+        {
+            return _message;
+        }
+    }
+
+    /**
      * A MessageDispatcherThread is attached to every SessionImpl.
      * <p/>
      * This thread is responsible for removing messages from m_incomingMessages and
@@ -957,9 +1113,8 @@
          */
         public void run()
         {
-            QpidMessage message = null;
-
-            // deliver messages to consumers until the stop flag is set.
+            IncomingMessage message = null;
+            // deliver messages to asynchronous consumers until the stop flag is set.
             do
             {
                 // When this session is not closing and and stopped
@@ -1012,19 +1167,19 @@
                     MessageConsumerImpl mc;
                     synchronized (_messageActors)
                     {
-                        mc = null; // todo _messageActors.get(message.consumerID);
+                        mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId());
                     }
-                    boolean consumed = false;
                     if (mc != null)
                     {
                         try
                         {
-                            // todo call onMessage
+                            mc.onMessage(message.getMessage());
                         }
                         catch (RuntimeException t)
                         {
                             // the JMS specification tells us to flag that to the client!
-                            _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
+                            _logger.error(
+                                    "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
                         }
                     }
                 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java Sat Aug  4 09:32:57 2007
@@ -139,6 +139,15 @@
     {
         checkNotClosed();
         checkDestination(topic);
-        return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
-    }       
+        TopicSubscriber topicSubscriber;
+        try
+        {
+            topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        return topicSubscriber;
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java Sat Aug  4 09:32:57 2007
@@ -36,10 +36,10 @@
      * @param noLocal          If true inhibits the delivery of messages published by its own connection.
      * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
      *                         If this value is null, a non-durable subscription is created.
-     * @throws javax.jms.JMSException If the TopicSubscriberImpl cannot be created due to internal error.
+     * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error.
      */
     protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
-                                  String subscriptionName) throws JMSException
+                                  String subscriptionName) throws Exception
     {
         super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java?view=diff&rev=562737&r1=562736&r2=562737
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java Sat Aug  4 09:32:57 2007
@@ -21,6 +21,7 @@
 package  org.apache.qpid.nclient.jms.message;
 
 import javax.jms.JMSException;
+import javax.jms.Message;
 
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.ContentHeaderProperties;
@@ -132,4 +133,18 @@
     {
         return getPropertyHeaders().getTimestamp(propertyName);
     }
+
+    public Message getJMSMessage()
+    {
+        // todo
+        return null;
+    }
+
+    public Long getMessageID()
+    {
+        //todo
+        return new Long(1);
+    }
 }
+
+



Mime
View raw message