qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r447994 [19/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...
Date Tue, 19 Sep 2006 22:07:25 GMT
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,1149 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.text.MessageFormat;
+
+public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+{
+    private static final Logger _logger = Logger.getLogger(AMQSession.class);
+
+    public static final int DEFAULT_PREFETCH = 5000;
+
+    private AMQConnection _connection;
+
+    private boolean _transacted;
+
+    private int _acknowledgeMode;
+
+    private int _channelId;
+
+    private int _defaultPrefetch = DEFAULT_PREFETCH;
+
+    /**
+     * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
+     * feature.
+     */
+    private int _nextTag = 1;
+
+    /**
+     * This queue is bounded and is used to store messages before being dispatched to the consumer
+     */
+    private final FlowControllingBlockingQueue _queue;
+
+    private Dispatcher _dispatcher;
+
+    private MessageFactoryRegistry _messageFactoryRegistry;
+
+    /**
+     * Set of all producers created by this session
+     */
+    private Map _producers = new ConcurrentHashMap();
+
+    /**
+     * Maps from consumer tag (String) to JMSMessageConsumer instance
+     */
+    private Map _consumers = new ConcurrentHashMap();
+
+    /**
+     * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
+     * need to be attached to a queue
+     */
+    protected static final boolean DEFAULT_IMMEDIATE = false;
+
+    /**
+     * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently
+     * drop messages where no queue is connected to the exchange for the message
+     */
+    protected static final boolean DEFAULT_MANDATORY = true;
+
+    /**
+     * The counter of the next producer id. This id is generated by the session and used only to allow the
+     * producer to identify itself to the session when deregistering itself.
+     *
+     * Access to this id does not require to be synchronized since according to the JMS specification only one
+     * thread of control is allowed to create producers for any given session instance.
+     */
+    private long _nextProducerId;
+
+    /**
+     * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+     */
+    private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+
+    /**
+     * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
+     */
+    private class Dispatcher extends Thread
+    {
+        public Dispatcher()
+        {
+            super("Dispatcher-Channel-" + _channelId);
+        }
+
+        public void run()
+        {
+            UnprocessedMessage message;
+            _stopped.set(false);
+            try
+            {
+                while (!_stopped.get() && (message = (UnprocessedMessage)_queue.take()) != null)
+                {
+                    dispatchMessage(message);
+                }
+            }
+            catch(InterruptedException e)
+            {
+                ;
+            }
+
+            _logger.info("Dispatcher thread terminating for channel " + _channelId);
+        }
+
+        private void dispatchMessage(UnprocessedMessage message)
+        {
+            if (message.deliverBody != null)
+            {
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+
+                if (consumer == null)
+                {
+                    _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+                }
+                else
+                {
+                    consumer.notifyMessage(message, _channelId);
+                }
+            }
+            else
+            {
+                try
+                {
+                    // Bounced message is processed here, away from the mina thread
+                    AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+                                                                                              false,
+                                                                                              message.contentHeader,
+                                                                                              message.bodies);
+
+                    int errorCode = message.bounceBody.replyCode;
+                    String reason = message.bounceBody.replyText;
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+                    //Todo should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                    if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+                    {
+                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+                    }
+                    else
+                    {
+                        if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                        {
+                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                        }
+                        else
+                        {
+                            _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+                }
+            }
+        }
+
+        public void stopDispatcher()
+        {
+            _stopped.set(true);
+            interrupt();
+        }
+    }
+
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+               MessageFactoryRegistry messageFactoryRegistry)
+    {
+        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH);
+    }
+
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+    {
+        _connection = con;
+        _transacted = transacted;
+        if (transacted)
+        {
+            _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
+        }
+        else
+        {
+            _acknowledgeMode = acknowledgeMode;
+        }
+        _channelId = channelId;
+        _messageFactoryRegistry = messageFactoryRegistry;
+        _defaultPrefetch = defaultPrefetch;
+        _queue = new FlowControllingBlockingQueue(_defaultPrefetch,
+            new FlowControllingBlockingQueue.ThresholdListener()
+            {
+                public void aboveThreshold(int currentValue)
+                {
+                    if(_acknowledgeMode == NO_ACKNOWLEDGE)
+                    {
+                        _logger.warn("Above threshold so suspending channel. Current value is " + currentValue);
+                        suspendChannel();
+                    }
+                }
+
+                public void underThreshold(int currentValue)
+                {
+                    if(_acknowledgeMode == NO_ACKNOWLEDGE)
+                    {
+                        _logger.warn("Below threshold so unsuspending channel. Current value is " + currentValue);
+                        unsuspendChannel();
+                    }
+                }
+            });
+    }
+
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
+    }
+
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
+    }
+
+    AMQConnection getAMQConnection()
+    {
+        return _connection;
+    }
+
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public MapMessage createMapMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public javax.jms.Message createMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
+                msg.setObject(object);
+                return msg;
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException
+    {
+        checkNotClosed();
+        throw new UnsupportedOperationException("Stream messages not supported");
+    }
+
+    public TextMessage createTextMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+
+            try
+            {
+                return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public TextMessage createTextMessage(String text) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+            try
+            {
+                TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
+                msg.setText(text);
+                return msg;
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create message: " + e);
+            }
+        }
+    }
+
+    public boolean getTransacted()
+    {
+        checkNotClosed();
+        return _transacted;
+    }
+
+    public int getAcknowledgeMode() throws JMSException
+    {
+        checkNotClosed();
+        return _acknowledgeMode;
+    }
+
+    public void commit() throws JMSException
+    {
+        checkTransacted();
+        try
+        {
+            // Acknowledge up to message last delivered (if any) for each consumer.
+            //need to send ack for messages delivered to consumers so far
+            for(Iterator i = _consumers.values().iterator(); i.hasNext();)
+            {
+                ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
+            }
+
+            // Commits outstanding messages sent and outstanding acknowledgements.
+            _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+        }
+        catch (AMQException e)
+        {
+            JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
+            exception.setLinkedException(e);
+            throw exception;
+        }
+    }
+
+    public void rollback() throws JMSException
+    {
+        checkTransacted();
+        try
+        {
+            _connection.getProtocolHandler().syncWrite(
+                    TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+        }
+        catch (AMQException e)
+        {
+            throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        // We must close down all producers and consumers in an orderly fashion. This is the only method
+        // that can be called from a different thread of control from the one controlling the session
+        synchronized (_connection.getFailoverMutex())
+        {
+            _closed.set(true);
+
+            // we pass null since this is not an error case
+            closeProducersAndConsumers(null);
+
+            try
+            {
+                _connection.getProtocolHandler().closeSession(this);
+                final AMQFrame frame = ChannelCloseBody.createAMQFrame(
+                        getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+                _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+                // When control resumes at this point, a reply will have been received that
+                // indicates the broker has closed the channel successfully
+
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Error closing session: " + e);
+            }
+            finally
+            {
+                _connection.deregisterSession(_channelId);
+            }
+        }
+    }
+
+    /**
+     * Close all producers or consumers. This is called either in the error case or when closing the session normally.
+     * @param amqe the exception, may be null to indicate no error has occurred
+     */
+    private void closeProducersAndConsumers(AMQException amqe)
+    {
+        try
+        {
+            closeProducers();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error closing session: " + e, e);
+        }
+        try
+        {
+            closeConsumers(amqe);
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error closing session: " + e, e);
+        }
+    }
+
+    /**
+     * Called when the server initiates the closure of the session
+     * unilaterally.
+     * @param e the exception that caused this session to be closed. Null causes the
+     */
+    public void closed(Throwable e)
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            // An AMQException has an error code and message already and will be passed in when closure occurs as a
+            // result of a channel close request
+            _closed.set(true);
+            AMQException amqe;
+            if (e instanceof AMQException)
+            {
+                amqe = (AMQException) e;
+            }
+            else
+            {
+                amqe = new AMQException(_logger, "Closing session forcibly", e);
+            }
+            _connection.deregisterSession(_channelId);
+            closeProducersAndConsumers(amqe);
+        }
+    }
+
+    /**
+     * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
+     * failover when the client has veoted resubscription.
+     *
+     * The caller of this method must already hold the failover mutex.
+     */
+    void markClosed()
+    {
+        _closed.set(true);
+        _connection.deregisterSession(_channelId);
+        markClosedProducersAndConsumers();
+    }
+
+    private void markClosedProducersAndConsumers()
+    {
+        try
+        {
+            // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
+            closeProducers();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error closing session: " + e, e);
+        }
+        try
+        {
+            markClosedConsumers();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error closing session: " + e, e);
+        }
+    }
+
+    /**
+     * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
+     * currently no way of propagating errors to message producers (this is a JMS limitation).
+     */
+    private void closeProducers() throws JMSException
+    {
+        // we need to clone the list of producers since the close() method updates the _producers collection
+        // which would result in a concurrent modification exception
+        final ArrayList clonedProducers = new ArrayList(_producers.values());
+
+        final Iterator it = clonedProducers.iterator();
+        while (it.hasNext())
+        {
+            final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+            prod.close();
+        }
+        // at this point the _producers map is empty
+    }
+
+    /**
+     * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+      * @param error not null if this is a result of an error occurring at the connection level
+     */
+    private void closeConsumers(Throwable error) throws JMSException
+    {
+        if (_dispatcher != null)
+        {
+            _dispatcher.stopDispatcher();
+        }
+        // we need to clone the list of consumers since the close() method updates the _consumers collection
+        // which would result in a concurrent modification exception
+        final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+
+        final Iterator it = clonedConsumers.iterator();
+        while (it.hasNext())
+        {
+            final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+            if (error != null)
+            {
+                con.notifyError(error);
+            }
+            else
+            {
+                con.close();
+            }
+        }
+        // at this point the _consumers map will be empty
+    }
+
+    private void markClosedConsumers() throws JMSException
+    {
+        if (_dispatcher != null)
+        {
+            _dispatcher.stopDispatcher();
+        }
+        // we need to clone the list of consumers since the close() method updates the _consumers collection
+        // which would result in a concurrent modification exception
+        final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+
+        final Iterator it = clonedConsumers.iterator();
+        while (it.hasNext())
+        {
+            final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+            con.markClosed();
+        }
+        // at this point the _consumers map will be empty
+    }
+
+    /**
+     * Asks the broker to resend all unacknowledged messages for the session.
+     * @throws JMSException
+     */
+    public void recover() throws JMSException
+    {
+        checkNotClosed();
+        checkNotTransacted(); // throws IllegalStateException if a transacted session
+
+        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        checkNotClosed();
+        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+    }
+
+    public void setMessageListener(MessageListener listener) throws JMSException
+    {
+        checkNotClosed();
+        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+    }
+
+    public void run()
+    {
+        throw new java.lang.UnsupportedOperationException();
+    }
+
+    public MessageProducer createProducer(Destination destination, boolean mandatory,
+                                          boolean immediate, boolean waitUntilSent)
+            throws JMSException
+    {
+        return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
+    }
+
+    public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+            throws JMSException
+    {
+        return createProducerImpl(destination, mandatory, immediate);
+    }
+
+    public MessageProducer createProducer(Destination destination, boolean immediate)
+            throws JMSException
+    {
+        return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+    }
+
+    public MessageProducer createProducer(Destination destination) throws JMSException
+    {
+        return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+    }
+
+    private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
+                                                                   boolean immediate)
+            throws JMSException
+    {
+        return createProducerImpl(destination, mandatory, immediate, false);
+    }
+
+    private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+                                                                   final boolean immediate, final boolean waitUntilSent)
+            throws JMSException
+    {
+        return (org.apache.qpid.jms.MessageProducer) new FailoverSupport()
+        {
+            public Object operation()
+            {
+                checkNotClosed();
+
+                return new BasicMessageProducer(_connection, (AMQDestination)destination, _transacted, _channelId,
+                                                    AMQSession.this, _connection.getProtocolHandler(),
+                                                    getNextProducerId(), immediate, mandatory, waitUntilSent);
+            }
+        }.execute(_connection);
+    }
+
+    public MessageConsumer createConsumer(Destination destination) throws JMSException
+    {
+        return createConsumer(destination, _defaultPrefetch, false, false, null);
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
+    {
+        return createConsumer(destination, _defaultPrefetch, false, false, messageSelector);
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        return createConsumer(destination, _defaultPrefetch, noLocal, false, messageSelector);
+    }
+
+    public MessageConsumer createConsumer(Destination destination,
+                                          int prefetch,
+                                          boolean noLocal,
+                                          boolean exclusive,
+                                          String selector) throws JMSException
+    {
+        return createConsumer(destination, prefetch, noLocal, exclusive, selector, null);
+    }
+
+    public MessageConsumer createConsumer(Destination destination,
+                                          int prefetch,
+                                          boolean noLocal,
+                                          boolean exclusive,
+                                          String selector,
+                                          FieldTable rawSelector) throws JMSException
+    {
+        return createConsumerImpl(destination, prefetch, noLocal, exclusive,
+                                  selector, rawSelector);
+    }
+
+    protected MessageConsumer createConsumerImpl(final Destination destination,
+                                                 final int prefetch,
+                                                 final boolean noLocal,
+                                                 final boolean exclusive,
+                                                 final String selector,
+                                                 final FieldTable rawSelector) throws JMSException
+    {
+        return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
+        {
+            public Object operation() throws JMSException
+            {
+                checkNotClosed();
+
+                AMQDestination amqd = (AMQDestination)destination;
+
+                final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+                // TODO: construct the rawSelector from the selector string if rawSelector == null
+                final FieldTable ft = new FieldTable();
+                //if (rawSelector != null)
+                //    ft.put("headers", rawSelector.getDataAsBytes());
+                if (rawSelector != null)
+                {
+                    ft.putAll(rawSelector);
+                }
+                BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+                                                                         _messageFactoryRegistry, AMQSession.this,
+                                                                         protocolHandler, ft, prefetch, exclusive,
+                                                                         _acknowledgeMode);
+
+                try
+                {
+                    registerConsumer(consumer);
+                }
+                catch (AMQException e)
+                {
+                    JMSException ex = new JMSException("Error registering consumer: " + e);
+                    ex.setLinkedException(e);
+                    throw ex;
+                }
+
+                return consumer;
+            }
+        }.execute(_connection);
+    }
+
+    public void declareExchange(String name, String type)
+    {
+        declareExchange(name, type, _connection.getProtocolHandler());
+    }
+
+    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
+    {
+        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
+    }
+
+    private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
+    {
+        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+        protocolHandler.writeFrame(exchangeDeclare);
+    }
+
+    /**
+     * Declare the queue.
+     * @param amqd
+     * @param protocolHandler
+     * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+     * @throws AMQException
+     */
+    private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+    {
+        // For queues (but not topics) we generate the name in the client rather than the
+        // server. This allows the name to be reused on failover if required. In general,
+        // the destination indicates whether it wants a name generated or not.
+        if(amqd.isNameRequired())
+        {
+            amqd.setQueueName(protocolHandler.generateQueueName());
+        }
+
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
+                                                                false, amqd.isDurable(), amqd.isExclusive(),
+                                                                amqd.isAutoDelete(), true, null);
+
+        protocolHandler.writeFrame(queueDeclare);
+        return amqd.getQueueName();
+    }
+
+    private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+    {
+        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
+                                                          queueName, amqd.getExchangeName(),
+                                                          amqd.getRoutingKey(), true, ft);
+
+        protocolHandler.writeFrame(queueBind);
+    }
+
+    /**
+     * Register to consume from the queue.
+     * @param queueName
+     * @return the consumer tag generated by the broker
+     */
+    private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetch,
+                                    boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+    {
+        //need to generate a consumer tag on the client so we can exploit the nowait flag
+        String tag = Integer.toString(_nextTag++);
+
+        AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
+                                                              queueName, tag, noLocal,
+                                                              acknowledgeMode == Session.NO_ACKNOWLEDGE,
+                                                              exclusive, true);
+
+        protocolHandler.writeFrame(jmsConsume);
+        return tag;
+    }
+
+    public Queue createQueue(String queueName) throws JMSException
+    {
+        if (queueName.indexOf('/') == -1)
+        {
+            return new AMQQueue(queueName);
+        }
+        else
+        {
+            try{
+                return new AMQQueue(new AMQBindingURL(queueName));
+            }catch(URLSyntaxException urlse)
+            {
+                JMSException jmse = new JMSException(urlse.getReason());
+                jmse.setLinkedException(urlse);
+
+                throw jmse;
+            }
+        }
+    }
+
+    public QueueReceiver createReceiver(Queue queue) throws JMSException
+    {
+        return (QueueReceiver) createConsumer(queue);
+    }
+
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
+    {
+        return (QueueReceiver) createConsumer(queue, messageSelector);
+    }
+
+    public QueueSender createSender(Queue queue) throws JMSException
+    {
+        return (QueueSender) createProducer(queue);
+    }
+
+    public Topic createTopic(String topicName) throws JMSException
+    {
+         if (topicName.indexOf('/') == -1)
+        {
+           return new AMQTopic(topicName);
+        }
+        else
+        {
+            try{
+                return new AMQTopic(new AMQBindingURL(topicName));
+            }
+            catch (URLSyntaxException urlse)
+            {
+                JMSException jmse = new JMSException(urlse.getReason());
+                jmse.setLinkedException(urlse);
+
+                throw jmse;
+            }
+        }
+    }
+
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
+    {
+        return (TopicSubscriber) createConsumer(topic);
+    }
+
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
+    {
+        return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal);
+    }
+
+    /**
+     * Note, currently this does not handle reuse of the same name with different topics correctly.
+     * If a name is reused in creating a new subscriber with a different topic/selecto or no-local
+     * flag then the subcriber will receive messages matching the old subscription AND the new one.
+     * The spec states that the new one should replace the old one.
+     * TODO: fix it.
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+    {
+        AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+    }
+
+    /**
+     * Note, currently this does not handle reuse of the same name with different topics correctly.
+     */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+        return new TopicSubscriberAdaptor(dest, consumer);
+    }
+
+    public TopicPublisher createPublisher(Topic topic) throws JMSException
+    {
+        return (TopicPublisher) createProducer(topic);
+    }
+
+    public QueueBrowser createBrowser(Queue queue) throws JMSException
+    {
+        throw new UnsupportedOperationException("Queue browsing not supported");
+    }
+
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+    {
+        throw new UnsupportedOperationException("Queue browsing not supported");
+    }
+
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        return new AMQTemporaryQueue();
+    }
+
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        return new AMQTemporaryTopic();
+    }
+
+    public void unsubscribe(String name) throws JMSException
+    {
+        //send a queue.delete for the subscription
+        String queue = _connection.getClientID() + ":" + name;
+        AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
+        _connection.getProtocolHandler().writeFrame(frame);
+    }
+
+    private void checkTransacted() throws JMSException
+    {
+        if (!getTransacted())
+        {
+            throw new IllegalStateException("Session is not transacted");
+        }
+    }
+
+    private void checkNotTransacted() throws JMSException
+    {
+        if (getTransacted())
+        {
+            throw new IllegalStateException("Session is transacted");
+        }
+    }
+
+    /**
+     * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
+     * Puts the message onto the queue read by the dispatcher.
+     *
+     * @param message the message that has been received
+     */
+    public void messageReceived(UnprocessedMessage message)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Message received in session with channel id " + _channelId);
+        }
+
+        _queue.add(message);
+    }
+
+    /**
+     * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
+     * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
+     * AUTO_ACK or similar.
+     * @param deliveryTag the tag of the last message to be acknowledged
+     * @param multiple if true will acknowledge all messages up to and including the one specified by the
+     * delivery tag
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+        }
+        _connection.getProtocolHandler().writeFrame(ackFrame);
+    }
+
+    public int getDefaultPrefetch()
+    {
+        return _defaultPrefetch;
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    void start()
+    {
+        if(_dispatcher != null)
+        {
+            //then we stopped this and are restarting, so signal server to resume delivery
+            unsuspendChannel();
+        }
+        _dispatcher = new Dispatcher();
+        _dispatcher.setDaemon(true);
+        _dispatcher.start();
+    }
+
+    void stop()
+    {
+        //stop the server delivering messages to this session
+        suspendChannel();
+
+        //stop the dispatcher thread
+        _stopped.set(true);
+    }
+
+    boolean isStopped()
+    {
+        return _stopped.get();
+    }
+
+    /**
+     * Callers must hold the failover mutex before calling this method.
+     * @param consumer
+     * @throws AMQException
+     */
+    void registerConsumer(BasicMessageConsumer consumer) throws AMQException
+    {
+        AMQDestination amqd = consumer.getDestination();
+
+        AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+
+        declareExchange(amqd, protocolHandler);
+
+        String queueName = declareQueue(amqd, protocolHandler);
+
+        bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+
+        String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(),
+                                       consumer.isExclusive(), consumer.getAcknowledgeMode());
+
+        consumer.setConsumerTag(consumerTag);
+        _consumers.put(consumerTag, consumer);
+    }
+
+    /**
+     * Called by the MessageConsumer when closing, to deregister the consumer from the
+     * map from consumerTag to consumer instance.
+     * @param consumerTag the consumer tag, that was broker-generated
+     */
+    void deregisterConsumer(String consumerTag)
+    {
+        _consumers.remove(consumerTag);
+    }
+
+    private void registerProducer(long producerId, MessageProducer producer)
+    {
+        _producers.put(new Long(producerId), producer);
+    }
+
+    void deregisterProducer(long producerId)
+    {
+        _producers.remove(new Long(producerId));
+    }
+
+    private long getNextProducerId()
+    {
+        return ++_nextProducerId;
+    }
+
+    /**
+     * Resubscribes all producers and consumers. This is called when performing failover.
+     * @throws AMQException
+     */
+    void resubscribe() throws AMQException
+    {
+        resubscribeProducers();
+        resubscribeConsumers();
+    }
+
+    private void resubscribeProducers() throws AMQException
+    {
+        ArrayList producers = new ArrayList(_producers.values());
+        _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
+        for (Iterator it = producers.iterator(); it.hasNext();)
+        {
+            BasicMessageProducer producer = (BasicMessageProducer) it.next();
+            producer.resubscribe();
+        }
+    }
+
+    private void resubscribeConsumers() throws AMQException
+    {
+        ArrayList consumers = new ArrayList(_consumers.values());
+        _consumers.clear();
+
+        for (Iterator it = consumers.iterator(); it.hasNext();)
+        {
+            BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+            registerConsumer(consumer);
+        }
+    }
+
+    private void suspendChannel()
+    {
+        _logger.warn("Suspending channel");
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+    }
+
+    private void unsuspendChannel()
+    {
+        _logger.warn("Unsuspending channel");
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
+/**
+ * AMQ implementation of a TemporaryQueue.
+ */
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue {
+
+    /**
+     * Create a new instance of an AMQTemporaryQueue
+     */
+    public AMQTemporaryQueue() {
+        super("TempQueue" + Long.toString(System.currentTimeMillis()),
+                null, true, true);
+    }
+
+    /**
+     * @see javax.jms.TemporaryQueue#delete()
+     */
+    public void delete() throws JMSException {
+        throw new UnsupportedOperationException("Delete not supported, " +
+            "will auto-delete when connection closed");
+    }
+    
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryTopic.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryTopic.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryTopic.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+
+/**
+ * AMQ implementation of TemporaryTopic.
+ */
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+{
+
+    /**
+     * Create new temporary topic.
+     */
+    public AMQTemporaryTopic()
+    {
+        super("TempQueue" + Long.toString(System.currentTimeMillis()));
+    }
+
+    /**
+     * @see javax.jms.TemporaryTopic#delete()
+     */
+    public void delete() throws JMSException
+    {
+        throw new UnsupportedOperationException("Delete not supported, " +
+                "will auto-delete when connection closed");
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTemporaryTopic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTopic.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTopic.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTopic.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class AMQTopic extends AMQDestination implements Topic
+    {
+    /**
+    * Constructor for use in creating a topic using a BindingURL.
+     *
+     * @param binding The binding url object.
+    */
+    public AMQTopic(BindingURL binding)
+    {
+        super(binding);
+    }
+
+    public AMQTopic(String name)
+    {
+        super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
+        _isDurable = false;
+    }
+
+    /**
+     * Constructor for use in creating a topic to represent a durable subscription
+     * @param topic
+     * @param clientId
+     * @param subscriptionName
+     */
+    public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+    {
+        super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
+        _isDurable = true;
+    }
+
+    public String getTopicName() throws JMSException
+    {
+        return super.getDestinationName();
+    }
+
+    public String getEncodedName()
+    {
+        return 'T' + getDestinationName();
+    }
+
+     public String getRoutingKey()
+    {
+        return getDestinationName();
+    }
+
+    public boolean isNameRequired()
+    {
+        // Topics always rely on a server generated queue name.
+        return false;
+    }
+
+    /**
+     * Override since the queue is always private and we must ensure it remains null. If not,
+     * reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather
+     * than getting their own (private) queue.
+     *
+     * This is relatively nasty but it is difficult to come up with a more elegant solution, given
+     * the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to
+     * use the underlying queue name even where it is server generated.
+     */
+    public void setQueueName(String queueName)
+    {
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQTopic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,499 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BasicMessageConsumer extends Closeable implements MessageConsumer
+{
+    private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
+
+    /**
+     * The connection being used by this consumer
+     */
+    private AMQConnection _connection;
+
+    private String _messageSelector;
+
+    private boolean _noLocal;
+
+    private AMQDestination _destination;
+
+    /**
+     * When true indicates that a blocking receive call is in progress
+     */
+    private final AtomicBoolean _receiving = new AtomicBoolean(false);
+    /**
+     * Holds an atomic reference to the listener installed.
+     */
+    private final AtomicReference _messageListener = new AtomicReference();
+
+    /**
+     * The consumer tag allows us to close the consumer by sending a jmsCancel method to the
+     * broker
+     */
+    private String _consumerTag;
+
+    /**
+     * We need to know the channel id when constructing frames
+     */
+    private int _channelId;
+
+    /**
+     * Used in the blocking receive methods to receive a message from
+     * the Session thread. Argument true indicates we want strict FIFO semantics
+     */
+    private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+
+    private MessageFactoryRegistry _messageFactory;
+
+    private AMQSession _session;
+
+    private AMQProtocolHandler _protocolHandler;
+
+    /**
+     * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+     */
+    private FieldTable _rawSelectorFieldTable;
+
+    /**
+     * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover
+     */
+    private int _prefetch;
+
+    /**
+     * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+     */
+    private boolean _exclusive;
+
+    /**
+     * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes
+     * per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+     * implementation.
+     */
+    private int _acknowledgeMode;
+
+    /**
+     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+     */
+    private int _outstanding;
+
+    /**
+     * Tag of last message delievered, whoch should be acknowledged on commit in
+     * transaction mode.
+     */
+    private long _lastDeliveryTag;
+
+    BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
+                         boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                         AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetch,
+                         boolean exclusive, int acknowledgeMode)
+    {
+        _channelId = channelId;
+        _connection = connection;
+        _messageSelector = messageSelector;
+        _noLocal = noLocal;
+        _destination = destination;
+        _messageFactory = messageFactory;
+        _session = session;
+        _protocolHandler = protocolHandler;
+        _rawSelectorFieldTable = rawSelectorFieldTable;
+        _prefetch = prefetch;
+        _exclusive = exclusive;
+        _acknowledgeMode = acknowledgeMode;
+    }
+
+    public AMQDestination getDestination()
+    {
+        return _destination;
+    }
+
+    public String getMessageSelector() throws JMSException
+    {
+        return _messageSelector;
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        return (MessageListener) _messageListener.get();
+    }
+
+    public int getAcknowledgeMode()
+    {
+        return _acknowledgeMode;
+    }
+
+    private boolean isMessageListenerSet()
+    {
+        return _messageListener.get() != null;
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws JMSException
+    {
+        checkNotClosed();
+
+        //if the current listener is non-null and the session is not stopped, then
+        //it is an error to call this method.
+
+        //i.e. it is only valid to call this method if
+        //
+        //    (a) the session is stopped, in which case the dispatcher is not running
+        //    OR
+        //    (b) the listener is null AND we are not receiving synchronously at present
+        //
+
+        if (_session.isStopped())
+        {
+            _messageListener.set(messageListener);
+            _logger.debug("Message listener set for destination " + _destination);
+        }
+        else
+        {
+            if (_receiving.get())
+            {
+                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+            }
+            if (!_messageListener.compareAndSet(null, messageListener))
+            {
+                throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
+            }
+            _logger.debug("Message listener set for destination " + _destination);
+
+            if (messageListener != null)
+            {
+                //handle case where connection has already been started, and the dispatcher is blocked
+                //doing a put on the _synchronousQueue
+                Object msg = _synchronousQueue.poll();
+                if (msg != null)
+                {
+                    AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+                    messageListener.onMessage(jmsMsg);
+                    postDeliver(jmsMsg);
+                }
+            }
+        }
+    }
+
+    private void acquireReceiving() throws JMSException
+    {
+        if (!_receiving.compareAndSet(false, true))
+        {
+            throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+        }
+        if (isMessageListenerSet())
+        {
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
+        }
+    }
+
+    private void releaseReceiving()
+    {
+        _receiving.set(false);
+    }
+
+    public FieldTable getRawSelectorFieldTable()
+    {
+        return _rawSelectorFieldTable;
+    }
+
+    public int getPrefetch()
+    {
+        return _prefetch;
+    }
+
+    public boolean isNoLocal()
+    {
+        return _noLocal;
+    }
+
+    public boolean isExclusive()
+    {
+        return _exclusive;
+    }
+
+    public Message receive() throws JMSException
+    {
+        return receive(0);
+    }
+
+    public Message receive(long l) throws JMSException
+    {
+        checkNotClosed();
+
+        acquireReceiving();
+
+        try
+        {
+            Object o = null;
+            if (l > 0)
+            {
+                o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+            }
+            else
+            {
+                o = _synchronousQueue.take();
+            }
+            final AbstractJMSMessage m = returnMessageOrThrow(o);
+            if (m != null)
+            {
+                postDeliver(m);
+            }
+            return m;
+        }
+        catch (InterruptedException e)
+        {
+            return null;
+        }
+        finally
+        {
+            releaseReceiving();
+        }
+    }
+
+    public Message receiveNoWait() throws JMSException
+    {
+        checkNotClosed();
+
+        acquireReceiving();
+
+        try
+        {
+            Object o = _synchronousQueue.poll();
+            final AbstractJMSMessage m = returnMessageOrThrow(o);
+            if (m != null)
+            {
+                postDeliver(m);
+            }
+            return m;
+        }
+        finally
+        {
+            releaseReceiving();
+        }
+    }
+
+    /**
+     * We can get back either a Message or an exception from the queue. This method examines the argument and deals
+     * with it by throwing it (if an exception) or returning it (in any other case).
+     * @param o
+     * @return a message only if o is a Message
+     * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not
+     * a JMSException is created with the linked exception set appropriately
+     */
+    private AbstractJMSMessage returnMessageOrThrow(Object o)
+            throws JMSException
+    {
+        // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
+        if (o instanceof Throwable)
+        {
+            JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
+            if (o instanceof Exception)
+            {
+                e.setLinkedException((Exception) o);
+            }
+            throw e;
+        }
+        else
+        {
+            return (AbstractJMSMessage) o;
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            if (!_closed.getAndSet(true))
+            {
+                final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+                try
+                {
+                    _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+                }
+                catch (AMQException e)
+                {
+                    _logger.error("Error closing consumer: " + e, e);
+                    throw new JMSException("Error closing consumer: " + e);
+                }
+
+                deregisterConsumer();
+            }
+        }
+    }
+
+    /**
+     * Called when you need to invalidate a consumer. Used for example when failover has occurred and the
+     * client has vetoed automatic resubscription.
+     * The caller must hold the failover mutex.
+     */
+    void markClosed()
+    {
+        _closed.set(true);
+        deregisterConsumer();
+    }
+
+    /**
+     * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
+     * of a message listener or a synchronous receive() caller.
+     * @param messageFrame the raw unprocessed mesage
+     * @param channelId channel on which this message was sent
+     */
+    void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+        }
+        try
+        {
+            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
+                                                                       messageFrame.deliverBody.redelivered,
+                                                                       messageFrame.contentHeader,
+                                                                       messageFrame.bodies);
+
+            _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+
+            preDeliver(jmsMessage);
+
+            if (isMessageListenerSet())
+            {
+                //we do not need a lock around the test above, and the dispatch below as it is invalid
+                //for an application to alter an installed listener while the session is started
+                getMessageListener().onMessage(jmsMessage);
+                postDeliver(jmsMessage);
+            }
+            else
+            {
+                _synchronousQueue.put(jmsMessage);
+            }
+        }
+        catch (Exception e)
+        {
+            if (e instanceof InterruptedException)
+            {
+                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+            }
+            else
+            {
+                _logger.error("Caught exception (dump follows) - ignoring...", e);
+            }
+        }
+    }
+
+    private void preDeliver(AbstractJMSMessage msg)
+    {
+        switch (_acknowledgeMode)
+        {
+            case Session.PRE_ACKNOWLEDGE:
+                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                break;
+            case Session.CLIENT_ACKNOWLEDGE:
+                // we set the session so that when the user calls acknowledge() it can call the method on session
+                // to send out the appropriate frame
+                msg.setAMQSession(_session);
+                break;
+        }
+    }
+
+    private void postDeliver(AbstractJMSMessage msg)
+    {
+        switch (_acknowledgeMode)
+        {
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                if (++_outstanding >= _prefetch)
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                }
+                break;
+            case Session.AUTO_ACKNOWLEDGE:
+                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                break;
+            case Session.SESSION_TRANSACTED:
+                _lastDeliveryTag = msg.getDeliveryTag();
+                break;
+        }
+    }
+
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     */
+    void acknowledgeLastDelivered()
+    {
+        if (_lastDeliveryTag > 0)
+        {
+            _session.acknowledgeMessage(_lastDeliveryTag, true);
+            _lastDeliveryTag = -1;
+        }
+    }
+
+    void notifyError(Throwable cause)
+    {
+        _closed.set(true);
+
+        // we have no way of propagating the exception to a message listener - a JMS limitation - so we
+        // deal with the case where we have a synchronous receive() waiting for a message to arrive
+        if (!isMessageListenerSet())
+        {
+            // offer only succeeds if there is a thread waiting for an item from the queue
+            if (_synchronousQueue.offer(cause))
+            {
+                _logger.debug("Passed exception to synchronous queue for propagation to receive()");
+            }
+        }
+        deregisterConsumer();
+    }
+
+    /**
+     * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
+     * case and in the case of an error occurring.
+     */
+    private void deregisterConsumer()
+    {
+        _session.deregisterConsumer(_consumerTag);
+    }
+
+    public String getConsumerTag()
+    {
+        return _consumerTag;
+    }
+
+    public void setConsumerTag(String consumerTag)
+    {
+        _consumerTag = consumerTag;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageProducer.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageProducer.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageProducer.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,480 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.*;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.io.UnsupportedEncodingException;
+
+public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
+{
+    protected final Logger _logger = Logger.getLogger(getClass());
+
+    private AMQConnection _connection;
+
+    /**
+     * If true, messages will not get a timestamp.
+     */
+    private boolean _disableTimestamps;
+
+    /**
+     * Priority of messages created by this producer.
+     */
+    private int _messagePriority;
+
+    /**
+     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+     */
+    private long _timeToLive;
+
+    /**
+     * Delivery mode used for this producer.
+     */
+    private int _deliveryMode = DeliveryMode.PERSISTENT;
+
+    /**
+     * The Destination used for this consumer, if specified upon creation.
+     */
+    protected AMQDestination _destination;
+
+    /**
+     * Default encoding used for messages produced by this producer.
+     */
+    private String _encoding;
+
+    /**
+     * Default encoding used for message produced by this producer.
+     */
+    private String _mimeType;
+
+    private AMQProtocolHandler _protocolHandler;
+
+    /**
+     * True if this producer was created from a transacted session
+     */
+    private boolean _transacted;
+
+    private int _channelId;
+
+    /**
+     * This is an id generated by the session and is used to tie individual producers to the session. This means we
+     * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
+     * to the session so that when an error is propagated to the session it can close the producer (meaning that
+     * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+     */
+    private long _producerId;
+
+    /**
+     * The session used to create this producer
+     */
+    private AMQSession _session;
+
+    private final boolean _immediate;
+
+    private final boolean _mandatory;
+
+    private final boolean _waitUntilSent;
+
+    protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
+                                   int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
+                                   long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+    {
+        _connection = connection;
+        _destination = destination;
+        _transacted = transacted;
+        _protocolHandler = protocolHandler;
+        _channelId = channelId;
+        _session = session;
+        _producerId = producerId;
+        if (destination != null)
+        {
+            declareDestination(destination);
+        }
+        _immediate = immediate;
+        _mandatory = mandatory;
+        _waitUntilSent = waitUntilSent;
+    }
+
+    void resubscribe() throws AMQException
+    {
+         if (_destination != null)
+         {
+             declareDestination(_destination);
+         }
+    }
+
+    private void declareDestination(AMQDestination destination)
+    {
+        // Declare the exchange
+        // Note that the durable and internal arguments are ignored since passive is set to false
+        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
+                                                              destination.getExchangeClass(), false,
+                                                              false, false, false, true, null);
+        _protocolHandler.writeFrame(declare);
+    }
+
+    public void setDisableMessageID(boolean b) throws JMSException
+    {
+        checkNotClosed();
+        // IGNORED
+    }
+
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkNotClosed();
+        // Always false for AMQP
+        return false;
+    }
+
+    public void setDisableMessageTimestamp(boolean b) throws JMSException
+    {
+        checkNotClosed();
+        _disableTimestamps = b;
+    }
+
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkNotClosed();
+        return _disableTimestamps;
+    }
+
+    public void setDeliveryMode(int i) throws JMSException
+    {
+        checkNotClosed();
+        if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
+        {
+            throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
+                                   " is illegal");
+        }
+        _deliveryMode = i;
+    }
+
+    public int getDeliveryMode() throws JMSException
+    {
+        checkNotClosed();
+        return _deliveryMode;
+    }
+
+    public void setPriority(int i) throws JMSException
+    {
+        checkNotClosed();
+        if (i < 0 || i > 9)
+        {
+            throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
+        }
+        _messagePriority = i;
+    }
+
+    public int getPriority() throws JMSException
+    {
+        checkNotClosed();
+        return _messagePriority;
+    }
+
+    public void setTimeToLive(long l) throws JMSException
+    {
+        checkNotClosed();
+        if (l < 0)
+        {
+            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
+        }
+        _timeToLive = l;
+    }
+
+    public long getTimeToLive() throws JMSException
+    {
+        checkNotClosed();
+        return _timeToLive;
+    }
+
+    public Destination getDestination() throws JMSException
+    {
+        checkNotClosed();
+        return _destination;
+    }
+
+    public void close() throws JMSException
+    {
+        _closed.set(true);
+        _session.deregisterProducer(_producerId);
+    }
+
+    public void send(Message message) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+                     _mandatory, _immediate);
+        }
+    }
+
+    public void send(Message message, int deliveryMode) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+                     _mandatory, _immediate);
+        }
+    }
+
+    public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+                     _mandatory, immediate);
+        }
+    }
+
+    public void send(Message message, int deliveryMode, int priority,
+                     long timeToLive) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
+        {
+            sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+                     _immediate);
+        }
+    }
+
+    public void send(Destination destination, Message message) throws JMSException
+    {
+        checkNotClosed();
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+                     _mandatory, _immediate);
+        }
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode,
+                     int priority, long timeToLive)
+            throws JMSException
+    {
+        checkNotClosed();
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+                     _mandatory, _immediate);
+        }
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode,
+                     int priority, long timeToLive, boolean mandatory)
+            throws JMSException
+    {
+        checkNotClosed();
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+                     mandatory, _immediate);
+        }
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode,
+                     int priority, long timeToLive, boolean mandatory, boolean immediate)
+            throws JMSException
+    {
+        checkNotClosed();
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+                     mandatory, immediate);
+        }
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode,
+                     int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, boolean waitUntilSent)
+            throws JMSException
+    {
+        checkNotClosed();
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+                     mandatory, immediate, waitUntilSent);
+        }
+    }
+
+    private void validateDestination(Destination destination) throws JMSException
+    {
+        if (!(destination instanceof AMQDestination))
+        {
+            throw new JMSException("Unsupported destination class: " +
+                                   (destination != null?destination.getClass():null));
+        }
+        declareDestination((AMQDestination)destination);
+    }
+
+    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+                          long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+    {
+        sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
+    }
+    /**
+     * The caller of this method must hold the failover mutex.
+     * @param destination
+     * @param message
+     * @param deliveryMode
+     * @param priority
+     * @param timeToLive
+     * @param mandatory
+     * @param immediate
+     * @throws JMSException
+     */
+    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+                          long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+    {
+        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
+                                                                destination.getRoutingKey(), mandatory, immediate);
+
+        long currentTime = 0;
+        if (!_disableTimestamps)
+        {
+            currentTime = System.currentTimeMillis();
+            message.setJMSTimestamp(currentTime);
+        }
+        //
+        // Very nasty temporary hack for GRM-206. Will be altered ASAP.
+        //
+        if(message instanceof JMSBytesMessage)
+        {
+            JMSBytesMessage msg = (JMSBytesMessage) message;
+            if(!msg.isReadable())
+            {
+                msg.reset();
+            }
+        }
+        ByteBuffer payload = message.getData();
+        BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
+
+        if (timeToLive > 0)
+        {
+            if (!_disableTimestamps)
+            {
+                contentHeaderProperties.setExpiration(currentTime + timeToLive);
+            }
+        }
+        else
+        {
+            if (!_disableTimestamps)
+            {
+                contentHeaderProperties.setExpiration(0);
+            }
+        }
+        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
+        contentHeaderProperties.setPriority((byte) priority);
+
+        int size = payload.limit();
+        ContentBody[] contentBodies = createContentBodies(payload);
+        AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
+        for (int i = 0; i < contentBodies.length; i++)
+        {
+            frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
+        }
+        if (contentBodies.length > 0 && _logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content body frames to " + destination);
+        }
+
+        // weight argument of zero indicates no child content headers, just bodies
+        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+                                                                       contentHeaderProperties,
+                                                                       size);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content header frame to " + destination);
+        }
+
+        frames[0] = publishFrame;
+        frames[1] = contentHeaderFrame;
+        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+        _protocolHandler.writeFrame(compositeFrame, wait);
+    }
+
+    /**
+     * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+     * maximum frame size.
+     * @param payload
+     * @return the array of content bodies
+     */
+    private ContentBody[] createContentBodies(ByteBuffer payload)
+    {
+        if (payload == null)
+        {
+            return null;
+        }
+        else if (payload.remaining() == 0)
+        {
+            return new ContentBody[0];
+        }
+        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+        // (0xCE byte).
+        int dataLength = payload.remaining();
+        final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+        int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+        int frameCount = (int) (dataLength/framePayloadMax) + lastFrame;
+        final ContentBody[] bodies = new ContentBody[frameCount];
+
+        if (frameCount == 1)
+        {
+            bodies[0] = new ContentBody();
+            bodies[0].payload = payload;
+        }
+        else
+        {
+            long remaining = dataLength;
+            for (int i = 0; i < bodies.length; i++)
+            {
+                bodies[i] = new ContentBody();
+                payload.position((int)framePayloadMax * i);
+                int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+                payload.limit(payload.position() + length);
+                bodies[i].payload = payload.slice();
+                remaining -= length;
+            }
+        }
+        return bodies;
+    }
+
+    public void setMimeType(String mimeType)
+    {
+        checkNotClosed();
+        _mimeType = mimeType;
+    }
+
+    public void setEncoding(String encoding) throws UnsupportedEncodingException
+    {
+        checkNotClosed();
+        _encoding = encoding;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/Closeable.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/Closeable.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/Closeable.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for orderly shutdown of an object.
+ */
+public abstract class Closeable
+{
+    /**
+     * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing
+     * access to this flag would mean have a synchronized block in every method.
+     */
+    protected final AtomicBoolean _closed = new AtomicBoolean(false);
+
+    protected void checkNotClosed()
+    {
+        if (_closed.get())
+        {
+            throw new IllegalStateException("Object " + toString() + " has been closed");
+        }
+    }
+
+    public boolean isClosed()
+    {
+        return _closed.get();
+    }
+
+    public abstract void close() throws JMSException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/Closeable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/ConnectionTuneParameters.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/ConnectionTuneParameters.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/ConnectionTuneParameters.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/ConnectionTuneParameters.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+public class ConnectionTuneParameters
+{
+    private long _frameMax;
+
+    private int _channelMax;
+
+    private int _heartbeat;
+
+    private long _txnLimit;
+
+    public long getFrameMax()
+    {
+        return _frameMax;
+    }
+
+    public void setFrameMax(long frameMax)
+    {
+        _frameMax = frameMax;
+    }
+
+    public int getChannelMax()
+    {
+        return _channelMax;
+    }
+
+    public void setChannelMax(int channelMax)
+    {
+        _channelMax = channelMax;
+    }    
+
+    public int getHeartbeat()
+    {
+        return _heartbeat;
+    }
+
+    public void setHeartbeat(int hearbeat)
+    {
+        _heartbeat = hearbeat;
+    }
+
+    public long getTxnLimit()
+    {
+        return _txnLimit;
+    }
+
+    public void setTxnLimit(long txnLimit)
+    {
+        _txnLimit = txnLimit;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/ConnectionTuneParameters.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message