qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r651133 [3/17] - in /incubator/qpid/trunk/qpid: gentools/ gentools/src/org/apache/qpid/gentools/ java/ java/broker/ java/broker/src/main/grammar/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apach...
Date Thu, 24 Apr 2008 01:54:42 GMT
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java Wed Apr 23 18:54:20 2008
@@ -39,4 +39,9 @@
     {
         super(error, msg, cause);
     }
+    public boolean isHardError()
+    {
+        return true;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Wed Apr 23 18:54:20 2008
@@ -58,8 +58,9 @@
             if (transport != null)
             {
                 //todo this list of valid transports should be enumerated somewhere
-                if ((!(transport.equalsIgnoreCase("vm") ||
-                       transport.equalsIgnoreCase("tcp"))))
+                if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
+                       transport.equalsIgnoreCase(BrokerDetails.TCP) ||
+                       transport.equalsIgnoreCase(BrokerDetails.SOCKET))))
                 {
                     if (transport.equalsIgnoreCase("localhost"))
                     {
@@ -164,7 +165,10 @@
             }
             else
             {
-                setPort(port);
+                if (!_transport.equalsIgnoreCase(SOCKET))
+                {
+                    setPort(port);
+                }
             }
 
             String queryString = connection.getQuery();
@@ -271,13 +275,16 @@
         sb.append(_transport);
         sb.append("://");
 
-        if (!(_transport.equalsIgnoreCase("vm")))
+        if (!(_transport.equalsIgnoreCase(VM)))
         {
             sb.append(_host);
         }
 
-        sb.append(':');
-        sb.append(_port);
+        if (!(_transport.equalsIgnoreCase(SOCKET)))
+        {
+            sb.append(':');
+            sb.append(_port);
+        }
 
         sb.append(printOptionsURL());
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Apr 23 18:54:20 2008
@@ -1,5 +1,5 @@
 /*
- *
+*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,17 +20,31 @@
  */
 package org.apache.qpid.client;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.transport.TransportConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -49,25 +63,117 @@
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
-
-import org.apache.qpid.*;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
+    public static final class ChannelToSessionMap
+    {
+        private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+        private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+        private int _size = 0;
+        private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+        public AMQSession get(int channelId)
+        {
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                return _fastAccessSessions[channelId];
+            }
+            else
+            {
+                return _slowAccessSessions.get(channelId);
+            }
+        }
+
+        public AMQSession put(int channelId, AMQSession session)
+        {
+            AMQSession oldVal;
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                oldVal = _fastAccessSessions[channelId];
+                _fastAccessSessions[channelId] = session;
+            }
+            else
+            {
+                oldVal = _slowAccessSessions.put(channelId, session);
+            }
+            if((oldVal != null) && (session == null))
+            {
+                _size--;
+            }
+            else if((oldVal == null) && (session != null))
+            {
+                _size++;
+            }
+
+            return session;
+
+        }
+
+
+        public AMQSession remove(int channelId)
+        {
+            AMQSession session;
+            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                 session = _fastAccessSessions[channelId];
+                _fastAccessSessions[channelId] = null;
+            }
+            else
+            {
+                session = _slowAccessSessions.remove(channelId);
+            }
+
+            if(session != null)
+            {
+                _size--;
+            }
+            return session;
+
+        }
+
+        public Collection<AMQSession> values()
+        {
+            ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+            for(int i = 0; i < 16; i++)
+            {
+                if(_fastAccessSessions[i] != null)
+                {
+                    values.add(_fastAccessSessions[i]);
+                }
+            }
+            values.addAll(_slowAccessSessions.values());
+
+            return values;
+        }
+
+        public int size()
+        {
+            return _size;
+        }
+
+        public void clear()
+        {
+            _size = 0;
+            _slowAccessSessions.clear();
+            for(int i = 0; i<16; i++)
+            {
+                _fastAccessSessions[i] = null;
+            }
+        }
+    }
+
+
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
     protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -76,7 +182,9 @@
      * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
      * held by any child objects of this connection such as the session, producers and consumers.
      */
-    protected final Object _failoverMutex = new Object();
+    private final Object _failoverMutex = new Object();
+
+    private final Object _sessionCreationLock = new Object();
 
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
@@ -85,7 +193,7 @@
     protected long _maximumChannelCount;
 
     /** The maximum size of frame supported by the server */
-    protected long _maximumFrameSize;
+    private long _maximumFrameSize;
 
     /**
      * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
@@ -95,30 +203,31 @@
     protected AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+    private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
 
-    protected String _clientName;
+    private String _clientName;
 
     /** The user name to use for authentication */
-    protected String _username;
+    private String _username;
 
     /** The password to use for authentication */
-    protected String _password;
+    private String _password;
 
     /** The virtual path to connect to on the AMQ server */
-    protected String _virtualHost;
+    private String _virtualHost;
+   
 
     protected ExceptionListener _exceptionListener;
 
-    protected ConnectionListener _connectionListener;
+    private ConnectionListener _connectionListener;
 
-    protected ConnectionURL _connectionURL;
+    private ConnectionURL _connectionURL;
 
     /**
      * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
      * publication.
      */
-    protected boolean _started;
+    protected volatile boolean _started;
 
     /** Policy dictating how to failover */
     protected FailoverPolicy _failoverPolicy;
@@ -136,22 +245,23 @@
     /*
      * The connection meta data
      */
-    protected QpidConnectionMetaData _connectionMetaData;
+    private QpidConnectionMetaData _connectionMetaData;
 
     /** Configuration info for SSL */
-    protected SSLConfiguration _sslConfiguration;
+    private SSLConfiguration _sslConfiguration;
 
-    protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
-    protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
 
     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
-    protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
-    protected static final long DEFAULT_TIMEOUT = 1000 * 30;
+    private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+    private static final long DEFAULT_TIMEOUT = 1000 * 30;
+    private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
 
     protected AMQConnectionDelegate _delegate;
-
+    
     // this connection maximum number of prefetched messages
     private long _maxPrefetch;
 
@@ -259,7 +369,8 @@
         }
 
         _failoverPolicy = new FailoverPolicy(connectionURL);
-        if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
+        BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
+        if (brokerDetails.getTransport().equals(BrokerDetails.VM))
         {
             _delegate = new AMQConnectionDelegate_0_8(this);
         }
@@ -272,6 +383,26 @@
             _delegate = new AMQConnectionDelegate_0_10(this);
         }
 
+        final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
+
+        class Listener implements ExceptionListener
+        {
+            public void onException(JMSException e)
+            {
+                exceptions.add(e);
+            }
+        }
+
+        try
+        {
+            setExceptionListener(new Listener());
+        }
+        catch (JMSException e)
+        {
+            // Shouldn't happen
+            throw new AMQException(null, null, e);
+        }
+
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
@@ -288,6 +419,7 @@
         _clientName = connectionURL.getClientName();
         _username = connectionURL.getUsername();
         _password = connectionURL.getPassword();
+
         setVirtualHost(connectionURL.getVirtualHost());
 
         if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -311,7 +443,7 @@
         }
 
 
-        _protocolHandler = new AMQProtocolHandler(this);
+         _protocolHandler = new AMQProtocolHandler(this);
 
         // We are not currently connected
         _connected = false;
@@ -319,11 +451,13 @@
         Exception lastException = new Exception();
         lastException.initCause(new ConnectException());
 
-        while (!_connected && _failoverPolicy.failoverAllowed())
+        // TMG FIXME this seems... wrong...
+        boolean retryAllowed = true;
+        while (!_connected && retryAllowed )
         {
             try
             {
-                makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+                makeBrokerConnection(brokerDetails);
                 lastException = null;
                 _connected = true;
             }
@@ -346,8 +480,10 @@
                 if (_logger.isInfoEnabled())
                 {
                     _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
-                                 e.getCause());
+                            e.getCause());
                 }
+                retryAllowed = _failoverPolicy.failoverAllowed();
+                brokerDetails = _failoverPolicy.getNextBrokerDetails();
             }
         }
 
@@ -359,8 +495,31 @@
         if (!_connected)
         {
             String message = null;
+            try
+            {
+                Thread.sleep(150);
+            }
+            catch (InterruptedException e)
+            {
+                // Eat it, we've hopefully got all the exceptions if this happened
+            }
+            if (exceptions.size() > 0)
+            {
+                JMSException e = exceptions.get(0);
+                int code = -1;
+                try
+                {
+                    code = new Integer(e.getErrorCode()).intValue();
+                }
+                catch (NumberFormatException nfe)
+                {
+                    // Ignore this, we have some error codes and messages swapped around
+                }
 
-            if (lastException != null)
+                throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
+                                                        e.getMessage(), e);
+            }
+            else if (lastException != null)
             {
                 if (lastException.getCause() != null)
                 {
@@ -397,7 +556,7 @@
                 if (e.getCause() != null)
                 {
                     e.initCause(lastException);
-                }
+	        }
             }
 
             throw e;
@@ -406,6 +565,18 @@
         _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
+    protected boolean checkException(Throwable thrown)
+    {
+        Throwable cause = thrown.getCause();
+
+        if (cause == null)
+        {
+            cause = thrown;
+        }
+
+        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+    }
+
     private void getDelegate() throws AMQProtocolException
     {
         try
@@ -444,20 +615,6 @@
         _virtualHost = virtualHost;
     }
 
-    protected boolean checkException(Throwable thrown)
-    {
-        Throwable cause = thrown.getCause();
-
-        if (cause == null)
-        {
-            cause = thrown;
-        }
-
-        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
-    }
-
-
-
     public boolean attemptReconnection(String host, int port)
     {
         BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
@@ -559,8 +716,55 @@
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetchHigh, final int prefetchLow) throws JMSException
     {
+        synchronized (_sessionCreationLock)
+        {
+            checkNotClosed();
+            return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
+        }
+    }
+
+    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+            throws AMQException, FailoverException
+    {
 
-        return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
+        ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+        // TODO: Be aware of possible changes to parameter order as versions change.
+
+        _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+
+        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
+
+        // todo send low water mark when protocol allows.
+        // todo Be aware of possible changes to parameter order as versions change.
+        _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
+
+        if (transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Issuing TxSelect for " + channelId);
+            }
+
+            TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
+            // TODO: Be aware of possible changes to parameter order as versions change.
+            _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
+        }
+    }
+
+    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+            throws AMQException, FailoverException
+    {
+        try
+        {
+            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+        }
+        catch (AMQException e)
+        {
+            deregisterSession(channelId);
+            throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
+        }
     }
 
     public void setFailoverPolicy(FailoverPolicy policy)
@@ -659,10 +863,10 @@
         if (!_started)
         {
             _started = true;
-            final Iterator it = _sessions.entrySet().iterator();
+            final Iterator it = _sessions.values().iterator();
             while (it.hasNext())
             {
-                final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+                final AMQSession s = (AMQSession) (it.next());
                 try
                 {
                     s.start();
@@ -673,6 +877,7 @@
                 }
             }
 
+
         }
     }
 
@@ -704,49 +909,89 @@
 
     public void close(long timeout) throws JMSException
     {
-        synchronized (getFailoverMutex())
+        close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+    }
+
+    public void close(List<AMQSession> sessions, long timeout) throws JMSException
+    {
+        synchronized(_sessionCreationLock)
         {
-            if (!_closed.getAndSet(true))
+            if(!sessions.isEmpty())
             {
-                try
+                AMQSession session = sessions.remove(0);
+                synchronized(session.getMessageDeliveryLock())
                 {
-                    long startCloseTime = System.currentTimeMillis();
-
-                    _taskPool.shutdown();
-                    closeAllSessions(null, timeout, startCloseTime);
-
-                    if (!_taskPool.isTerminated())
+                    close(sessions, timeout);
+                }
+            }
+            else
+            {
+                if (!_closed.getAndSet(true))
+                {
+                    synchronized (getFailoverMutex())
                     {
                         try
                         {
+                            long startCloseTime = System.currentTimeMillis();
+
+                        closeAllSessions(null, timeout, startCloseTime);
+
+                        //This MUST occur after we have successfully closed all Channels/Sessions
+                        _taskPool.shutdown();
+
+                        if (!_taskPool.isTerminated())
+                        {
+                                try
+                                {
+                                    // adjust timeout
+                                    long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+                                    _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                                }
+                                catch (InterruptedException e)
+                                {
+                                    _logger.info("Interrupted while shutting down connection thread pool.");
+                                }
+                            }
+
                             // adjust timeout
-                            long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+                            timeout = adjustTimeout(timeout, startCloseTime);
+                            _delegate.closeConneciton(timeout);
 
-                            _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                            //If the taskpool hasn't shutdown by now then give it shutdownNow.
+                            // This will interupt any running tasks.
+                            if (!_taskPool.isTerminated())
+                            {
+                                List<Runnable> tasks = _taskPool.shutdownNow();
+                                for (Runnable r : tasks)
+                                {
+                                    _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+                                }
+                            }
                         }
-                        catch (InterruptedException e)
+                        catch (AMQException e)
                         {
-                            _logger.info("Interrupted while shutting down connection thread pool.");
+                            JMSException jmse = new JMSException("Error closing connection: " + e);
+                            jmse.setLinkedException(e);
+                            throw jmse;
                         }
                     }
-
-                    // adjust timeout
-                    timeout = adjustTimeout(timeout, startCloseTime);
-                    _delegate.closeConneciton(timeout);
-                    //_protocolHandler.closeConnection(timeout);
-
-                }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing connection: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
                 }
             }
         }
     }
 
+    private long adjustTimeout(long timeout, long startTime)
+    {
+        long now = System.currentTimeMillis();
+        timeout -= now - startTime;
+        if (timeout < 0)
+        {
+            timeout = 0;
+        }
 
+        return timeout;
+    }
 
     /**
      * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
@@ -811,19 +1056,6 @@
         }
     }
 
-
-    private long adjustTimeout(long timeout, long startTime)
-    {
-        long now = System.currentTimeMillis();
-        timeout -= now - startTime;
-        if (timeout < 0)
-        {
-            timeout = 0;
-        }
-
-        return timeout;
-    }
-
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException
     {
@@ -889,11 +1121,11 @@
         return _maximumFrameSize;
     }
 
-    public Map getSessions()
+    public ChannelToSessionMap getSessions()
     {
         return _sessions;
     }
-
+    
     public String getUsername()
     {
         return _username;
@@ -1017,18 +1249,7 @@
 
     /**
      * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
-     * to a JMS exception liste
-    {
-        ArrayList sessions = new ArrayList(_sessions.values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
-        for (Iterator it = sessions.iterator(); it.hasNext();)
-        {
-            AMQSession s = (AMQSession) it.next();
-            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
-            s.resubscribe();
-        }
-    }ner, if configured, and propagates the exception to sessions, which in turn will
+     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
      * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
      *
      * @param cause the exception
@@ -1085,8 +1306,12 @@
         {
             _exceptionListener.onException(je);
         }
+        else
+        {
+            _logger.error("Throwable Received but no listener set: " + cause.getMessage());
+        }
 
-        if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
+        if (hardError(cause))
         {
             try
             {
@@ -1110,6 +1335,16 @@
         }
     }
 
+    private boolean hardError(Throwable cause)
+    {
+        if (cause instanceof AMQException)
+        {
+            return ((AMQException)cause).isHardError();
+        }
+
+        return true;
+    }
+
     void registerSession(int channelId, AMQSession session)
     {
         _sessions.put(channelId, session);
@@ -1120,6 +1355,24 @@
         _sessions.remove(channelId);
     }
 
+    /**
+     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+     * The caller must hold the failover mutex before calling this method.
+     */
+    public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
+    {
+        ArrayList sessions = new ArrayList(_sessions.values());
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        for (Iterator it = sessions.iterator(); it.hasNext();)
+        {
+            AMQSession s = (AMQSession) it.next();
+            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+            s.resubscribe();
+            s.setFlowControl(true);
+        }
+    }
+
     public String toString()
     {
         StringBuffer buf = new StringBuffer("AMQConnection:\n");
@@ -1205,6 +1458,22 @@
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
+    public void setProtocolVersion(ProtocolVersion protocolVersion)
+    {
+        _protocolVersion = protocolVersion;
+        _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+    }
+
+    public boolean isFailingOver()
+    {
+        return (_protocolHandler.getFailoverLatch() != null);
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Wed Apr 23 18:54:20 2008
@@ -178,31 +178,25 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException, FailoverException
     {
-
+        ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
         // TODO: Be aware of possible changes to parameter order as versions change.
-
-        _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand
-                                                                                                                     ChannelOpenOkBody.class);
+        _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
 
         // todo send low water mark when protocol allows.
         // todo Be aware of possible changes to parameter order as versions change.
-        _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                _conn._protocolHandler.getProtocolMinorVersion(), false, // global
-                                                               prefetchHigh, // prefetchCount
-                                                               0), // prefetchSize
-                                                                   BasicQosOkBody.class);
-
+        BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+        _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+        
         if (transacted)
         {
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-
+            TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
+            
             // TODO: Be aware of possible changes to parameter order as versions change.
-            _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                    _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+            _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
         }
     }
 
@@ -212,7 +206,7 @@
      */
     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
     {
-        ArrayList sessions = new ArrayList(_conn._sessions.values());
+        ArrayList sessions = new ArrayList(_conn.getSessions().values());
         _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Wed Apr 23 18:54:20 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Hashtable;
+import java.util.UUID;
 
 import javax.jms.*;
 import javax.naming.Context;
@@ -259,7 +260,7 @@
         }
         catch (UnknownHostException e)
         {
-            return null;
+            return "UnknownHost" + UUID.randomUUID();
         }
     }
 
@@ -352,7 +353,9 @@
      * @param name
      * @param ctx
      * @param env
+     *
      * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+     *
      * @throws Exception
      */
     public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception
@@ -408,8 +411,9 @@
 
     public Reference getReference() throws NamingException
     {
-        return new Reference(AMQConnectionFactory.class.getName(),
-                             new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+        return new Reference(
+                AMQConnectionFactory.class.getName(),
+                new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
                              AMQConnectionFactory.class.getName(), null);          // factory location
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Wed Apr 23 18:54:20 2008
@@ -27,6 +27,7 @@
 
 import org.apache.qpid.client.url.URLParser;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.url.URLHelper;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Apr 23 18:54:20 2008
@@ -56,7 +56,9 @@
     private String _url;
     private AMQShortString _urlAsShortString;
 
-    private boolean _validated;
+    private boolean _checkedForQueueBinding;
+
+    private boolean _exchangeExistsChecked;
 
     private byte[] _byteEncoding;
     private static final int IS_DURABLE_MASK = 0x1;
@@ -234,14 +236,25 @@
 
     }
 
-    public boolean isValidated()
+    public boolean isCheckedForQueueBinding()
+    {
+        return _checkedForQueueBinding;
+    }
+
+    public void setCheckedForQueueBinding(boolean checkedForQueueBinding)
+    {
+        _checkedForQueueBinding = checkedForQueueBinding;
+    }
+
+
+    public boolean isExchangeExistsChecked()
     {
-        return _validated;
+        return _exchangeExistsChecked;
     }
 
-    public void setValidated(boolean validated)
+    public void setExchangeExistsChecked(final boolean exchangeExistsChecked)
     {
-        _validated = validated;
+        _exchangeExistsChecked = exchangeExistsChecked;
     }
 
     public String toURL()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Wed Apr 23 18:54:20 2008
@@ -49,6 +49,6 @@
         //Not sure what the best approach is here, probably to treat this like a topic
         //and allow server to generate names. As it is AMQ specific it doesn't need to
         //fit the JMS API expectations so this is not as yet critical.
-        return false;
+        return getAMQQueueName() == null;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Wed Apr 23 18:54:20 2008
@@ -22,6 +22,7 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -50,7 +51,9 @@
         _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
         // Create Consumer to verify message selector.
         BasicMessageConsumer consumer =
-            (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+                (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+        // Close this consumer as we are not looking to consume only to establish that, at least for now,
+        // the QB can be created
         consumer.close();
     }
 
@@ -88,36 +91,37 @@
         checkState();
         final BasicMessageConsumer consumer =
             (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+
         _consumers.add(consumer);
 
         return new Enumeration()
+        {
+
+            Message _nextMessage = consumer == null ? null : consumer.receive(1000);
+
+            public boolean hasMoreElements()
             {
-                Message _nextMessage = consumer.receive(1000);
+                _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+                return (_nextMessage != null);
+            }
 
-                public boolean hasMoreElements()
+            public Object nextElement()
+            {
+                Message msg = _nextMessage;
+                try
                 {
-                    _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
-                    return (_nextMessage != null);
+                    _logger.info("QB:nextElement about to receive");
+                    _nextMessage = consumer.receive(1000);
+                    _logger.info("QB:nextElement received:" + _nextMessage);
                 }
-
-                public Object nextElement()
+                catch (JMSException e)
                 {
-                    Message msg = _nextMessage;
-                    try
-                    {
-                        _logger.info("QB:nextElement about to receive");
-                        _nextMessage = consumer.receive(1000);
-                        _logger.info("QB:nextElement received:" + _nextMessage);
-                    }
-                    catch (JMSException e)
-                    {
-                        _logger.warn("Exception caught while queue browsing", e);
-                        _nextMessage = null;
-                    }
-
-                    return msg;
+                    _logger.warn("Exception caught while queue browsing", e);
+                    _nextMessage = null;
                 }
-            };
+                return msg;
+            }
+        };
     }
 
     public void close() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Apr 23 18:54:20 2008
@@ -20,14 +20,15 @@
  */
 package org.apache.qpid.client;
 
-
 import java.io.Serializable;
 import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,9 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
 
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -77,11 +80,12 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +110,86 @@
  */
 public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
+    public static final class IdToConsumerMap
+    {
+        private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+        private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+        public BasicMessageConsumer get(int id)
+        {
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                return _fastAccessConsumers[id];
+            }
+            else
+            {
+                return _slowAccessConsumers.get(id);
+            }
+        }
+
+        public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+        {
+            BasicMessageConsumer oldVal;
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                oldVal = _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = consumer;
+            }
+            else
+            {
+                oldVal = _slowAccessConsumers.put(id, consumer);
+            }
+
+            return consumer;
+
+        }
+
+
+        public BasicMessageConsumer remove(int id)
+        {
+            BasicMessageConsumer consumer;
+            if((id & 0xFFFFFFF0) == 0)
+            {
+                 consumer = _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = null;
+            }
+            else
+            {
+                consumer = _slowAccessConsumers.remove(id);
+            }
+
+            return consumer;
+
+        }
+
+        public Collection<BasicMessageConsumer> values()
+        {
+            ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+            for(int i = 0; i < 16; i++)
+            {
+                if(_fastAccessConsumers[i] != null)
+                {
+                    values.add(_fastAccessConsumers[i]);
+                }
+            }
+            values.addAll(_slowAccessConsumers.values());
+
+            return values;
+        }
+
+
+        public void clear()
+        {
+            _slowAccessConsumers.clear();
+            for(int i = 0; i<16; i++)
+            {
+                _fastAccessConsumers[i] = null;
+            }
+        }
+    }
+
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -155,7 +239,7 @@
     protected boolean _transacted;
 
     /** Holds the sessions acknowledgement mode. */
-    protected int _acknowledgeMode;
+    protected final int _acknowledgeMode;
 
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
     protected int _channelId;
@@ -231,8 +315,16 @@
      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
      * consumer.
      */
-    protected Map<AMQShortString, BasicMessageConsumer> _consumers =
-            new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+    protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+    
+            //Map<AMQShortString, BasicMessageConsumer> _consumers =
+            //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+    /**
+     * Contains a list of consumers which have been removed but which might still have
+     * messages to acknowledge, eg in client ack or transacted modes
+     */
+    private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -284,6 +376,32 @@
     protected final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
+    /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+    private boolean _dirty;
+    /** Has failover occured on this session */
+    private boolean _failedOver;
+
+
+
+    private static final class FlowControlIndicator
+    {
+        private volatile boolean _flowControl = true;
+
+        public synchronized void setFlowControl(boolean flowControl)
+        {
+            _flowControl= flowControl;
+            notify();
+        }
+
+        public boolean getFlowControl()
+        {
+            return _flowControl;
+        }
+    }
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
     /**
      * Creates a new session on a connection.
      *
@@ -330,24 +448,20 @@
                                                      {
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                             {
                                                                  _logger.debug(
                                                                          "Above threshold(" + _defaultPrefetchHighMark
                                                                          + ") so suspending channel. Current value is " + currentValue);
                                                                  new Thread(new SuspenderRunner(true)).start();
-                                                             }
+
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                             {
                                                                  _logger.debug(
                                                                          "Below threshold(" + _defaultPrefetchLowMark
                                                                          + ") so unsuspending channel. Current value is " + currentValue);
                                                                  new Thread(new SuspenderRunner(false)).start();
-                                                             }
+
                                                          }
                                                      });
         }
@@ -357,7 +471,22 @@
         }
     }
 
-
+    /**
+     * Creates a new session on a connection with the default message factory factory.
+     *
+     * @param con                 The connection on which to create the session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+               int defaultPrefetchLow)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+             defaultPrefetchLow);
+    }
 
     // ===== JMS Session methods.
 
@@ -371,6 +500,12 @@
         close(-1);
     }
 
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new JMSBytesMessage();
+    }
+
     /**
      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
      *
@@ -381,6 +516,10 @@
         if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
+        } 
+        else if (hasFailedOver())
+        {
+            throw new IllegalStateException("has failed over");
         }
 
         while (true)
@@ -405,6 +544,12 @@
      */
     public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
 
+    public MethodRegistry getMethodRegistry()
+    {
+        MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+        return methodRegistry;
+    }
+
     /**
      * Binds the named queue, with the specified routing key, to the named exchange.
      *
@@ -471,30 +616,26 @@
     {
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Closing session: " + this );//+ ":"
-                        // + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+            _logger.info("Closing session: " + this); // + ":"
+                         // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
-        if( _dispatcher != null )
-        {
-            _dispatcher.setConnectionStopped(true);
-        }
-        synchronized (_messageDeliveryLock)
+        // Ensure we only try and close an open session.
+        if (!_closed.getAndSet(true))
         {
-
-            // We must close down all producers and consumers in an orderly fashion. This is the only method
-            // that can be called from a different thread of control from the one controlling the session.
             synchronized (_connection.getFailoverMutex())
             {
-                // Ensure we only try and close an open session.
-                if (!_closed.getAndSet(true))
+                // We must close down all producers and consumers in an orderly fashion. This is the only method
+                // that can be called from a different thread of control from the one controlling the session.
+                synchronized (_messageDeliveryLock)
                 {
                     // we pass null since this is not an error case
                     closeProducersAndConsumers(null);
 
                     try
                     {
-                        sendClose(timeout);
+                       sendClose(timeout);
                     }
                     catch (AMQException e)
                     {
@@ -527,25 +668,44 @@
      */
     public void closed(Throwable e) throws JMSException
     {
-        synchronized (_messageDeliveryLock)
+        // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+        // calls through connection.closeAllSessions which is also called by the public connection.close()
+        // with a null cause
+        // When we are closing the Session due to a protocol session error we simply create a new AMQException
+        // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+        // We need to determin here if the connection should be
+
+        if (e instanceof AMQDisconnectedException)
+        {
+            if (_dispatcher != null)
+            {
+                // Failover failed and ain't coming back. Knife the dispatcher.
+                _dispatcher.interrupt();
+            }
+        }
+
+        if (!_closed.getAndSet(true))
         {
             synchronized (_connection.getFailoverMutex())
             {
-                // An AMQException has an error code and message already and will be passed in when closure occurs as a
-                // result of a channel close request
-                _closed.set(true);
-                AMQException amqe;
-                if (e instanceof AMQException)
-                {
-                    amqe = (AMQException) e;
-                }
-                else
+                synchronized (_messageDeliveryLock)
                 {
-                    amqe = new AMQException(null, "Closing session forcibly", e);
-                }
+                    // An AMQException has an error code and message already and will be passed in when closure occurs as a
+                    // result of a channel close request
+                    AMQException amqe;
+                    if (e instanceof AMQException)
+                    {
+                        amqe = (AMQException) e;
+                    }
+                    else
+                    {
+                        amqe = new AMQException("Closing session forcibly", e);
+                    }
 
-                _connection.deregisterSession(_channelId);
-                closeProducersAndConsumers(amqe);
+
+                    _connection.deregisterSession(_channelId);
+                    closeProducersAndConsumers(amqe);
+                }
             }
         }
     }
@@ -565,10 +725,12 @@
      */
     public void commit() throws JMSException
     {
-        checkTransacted();
+    	checkTransacted();
 
         try
         {
+
+            // TGM FIXME: what about failover?
             // Acknowledge all delivered messages
             while (true)
             {
@@ -580,7 +742,6 @@
 
                 acknowledgeMessage(tag, false);
             }
-
             // Commits outstanding messages and acknowledgments
             sendCommit();
         }
@@ -600,16 +761,10 @@
     {
 
         // Remove the consumer from the map
-        BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+        BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
         if (consumer != null)
         {
-            // fixme this isn't right.. needs to check if _queue contains data for this consumer
-            if (consumer.isAutoClose()) // && _queue.isEmpty())
-            {
-                consumer.closeWhenNoMessages(true);
-            }
-
-            if (!consumer.isNoConsume())
+            if (!consumer.isNoConsume())  // Normal Consumer
             {
                 // Clean the Maps up first
                 // Flush any pending messages for this consumerTag
@@ -620,13 +775,12 @@
                 else
                 {
                     _logger.info("Dispatcher is null so created stopped dispatcher");
-
-                    startDistpatcherIfNecessary(true);
+                    startDispatcherIfNecessary(true);
                 }
 
                 _dispatcher.rejectPending(consumer);
             }
-            else
+            else // Queue Browser
             {
                 // Just close the consumer
                 // fixme  the CancelOK is being processed before the arriving messages..
@@ -634,13 +788,24 @@
                 // has yet to receive before the close comes in.
 
                 // consumer.markClosed();
+
+
+
+                if (consumer.isAutoClose())
+                {     
+                    // There is a small window where the message is between the two queues in the dispatcher.
+                    if (consumer.isClosed())
+                    {
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Closing consumer:" + consumer.debugIdentity());
+                        }
+
+                        deregisterConsumer(consumer);
+                    }
+                }
             }
         }
-        else
-        {
-            _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
-        }
-
     }
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -675,21 +840,11 @@
                                   messageSelector, null, true, true);
     }
 
-    public BytesMessage createBytesMessage() throws JMSException
-    {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSBytesMessage();
-        }
-    }
-
     public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
                                   false, false);
     }
 
@@ -701,11 +856,12 @@
                                   false, false);
     }
 
+
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
                                   messageSelector, null, false, false);
     }
 
@@ -714,16 +870,27 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+                                  messageSelector, null, false, false);
+    }
+
+
+    public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
                                   messageSelector, null, false, false);
     }
 
+
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
                                           String selector) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -739,7 +906,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -770,12 +937,8 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSMapMessage();
-        }
+        checkNotClosed();
+        return new JMSMapMessage();
     }
 
     public javax.jms.Message createMessage() throws JMSException
@@ -785,12 +948,8 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return (ObjectMessage) new JMSObjectMessage();
-        }
+        checkNotClosed();
+        return (ObjectMessage) new JMSObjectMessage();
     }
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -827,7 +986,7 @@
     {
         checkNotClosed();
 
-        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -874,7 +1033,7 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendCreateQueue(name, autoDelete,durable,exclusive);
+                sendCreateQueue(name, autoDelete, durable, exclusive);
                 return null;
             }
         }, _connection).execute();
@@ -989,9 +1148,10 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
     }
 
+
     /**
      * Creates a non-durable subscriber with a message selector
      *
@@ -1009,7 +1169,7 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
     }
 
     public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1267,14 +1427,14 @@
         }
     }
 
-    public abstract void sendRecover() throws AMQException, FailoverException;
+    abstract void sendRecover() throws AMQException, FailoverException;
 
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag());
+            _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);
@@ -1282,9 +1442,9 @@
 
     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
     {
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
+            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);
@@ -1325,6 +1485,8 @@
 
                 sendRollback();
 
+                markClean();
+
                 if (!isSuspended)
                 {
                     suspendChannel(false);
@@ -1460,6 +1622,7 @@
 
                         AMQDestination amqd = (AMQDestination) destination;
 
+                        final AMQProtocolHandler protocolHandler = getProtocolHandler();
                         // TODO: Define selectors in AMQP
                         // TODO: construct the rawSelector from the selector string if rawSelector == null
                         final FieldTable ft = FieldTableFactory.newFieldTable();
@@ -1499,11 +1662,6 @@
                         {
                             JMSException ex = new JMSException("Error registering consumer: " + e);
 
-                            if (_logger.isDebugEnabled())
-                            {
-                                e.printStackTrace();
-                            }
-
                             ex.setLinkedException(e);
                             throw ex;
                         }
@@ -1531,7 +1689,7 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        if (_consumers.remove(consumer.getConsumerTag()) != null)
+        if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
         {
             String subscriptionName = _reverseSubscriptionMap.remove(consumer);
             if (subscriptionName != null)
@@ -1547,6 +1705,13 @@
                     _destinationConsumerCount.remove(dest);
                 }
             }
+
+            // Consumers that are closed in a transaction must be stored
+            // so that messages they have received can be acknowledged on commit
+            if (_transacted)
+            {
+                _removedConsumers.add(consumer);
+            }
         }
     }
 
@@ -1582,8 +1747,7 @@
      */
     public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException;
-
-
+            
     public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
 
     /**
@@ -1605,6 +1769,7 @@
      */
     void resubscribe() throws AMQException
     {
+        _failedOver = true;
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -1639,13 +1804,20 @@
         // If the event dispatcher is not running then start it too.
         if (hasMessageListeners())
         {
-            startDistpatcherIfNecessary();
+            startDispatcherIfNecessary();
         }
     }
 
-    synchronized void startDistpatcherIfNecessary()
+    void startDispatcherIfNecessary()
     {
+        //If we are the dispatcher then we don't need to check we are started
+        if (Thread.currentThread() == _dispatcher)
+        {
+            return;
+        }
+
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+        // This is final per session so will be multi-thread safe.
         if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
@@ -1662,10 +1834,10 @@
             }
         }
 
-        startDistpatcherIfNecessary(false);
+        startDispatcherIfNecessary(false);
     }
 
-    synchronized void startDistpatcherIfNecessary(boolean initiallyStopped)
+    synchronized void startDispatcherIfNecessary(boolean initiallyStopped)
     {
         if (_dispatcher == null)
         {
@@ -1816,7 +1988,7 @@
             }
             else
             {
-                con.close();
+                con.close(false);
             }
         }
         // at this point the _consumers map will be empty
@@ -1891,20 +2063,22 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
-        //need to generate a consumer tag on the client so we can exploit the nowait flag
-        AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+        int tagId = _nextTag++;
+        // need to generate a consumer tag on the client so we can exploit the nowait flag
+        AMQShortString tag = new AMQShortString(Integer.toString(tagId));
+
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
-        _consumers.put(tag, consumer);
+        _consumers.put(tagId, consumer);
 
         try
         {
-            sendConsume(consumer,queueName,protocolHandler,nowait,messageSelector,tag);
+            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
         }
         catch (AMQException e)
         {
             // clean-up the map in the event of an error
-            _consumers.remove(tag);
+            _consumers.remove(tagId);
             throw e;
         }
     }
@@ -1945,6 +2119,34 @@
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
     }
 
+
+    /**
+     * Returns the number of messages currently queued for the given destination.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param amqd            The destination to be checked
+     *
+     * @return the number of queued messages.
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     */
+    public long getQueueDepth(final AMQDestination amqd)
+            throws AMQException
+    {
+        return new FailoverNoopSupport<Long, AMQException>(
+                new FailoverProtectedOperation<Long, AMQException>()
+                {
+                    public Long execute() throws AMQException, FailoverException
+                    {
+                        return requestQueueDepth(amqd);
+                    }
+                }, _connection).execute();
+
+    }
+
+    abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+
     /**
      * Declares the named exchange and type of exchange.
      *
@@ -1960,7 +2162,7 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-            final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
@@ -2013,7 +2215,7 @@
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        sendQueueDeclare(amqd,protocolHandler);
+                        sendQueueDeclare(amqd, protocolHandler);
 
                         return amqd.getAMQQueueName();
                     }
@@ -2064,12 +2266,12 @@
         return _connection.getProtocolHandler();
     }
 
-    protected byte getProtocolMajorVersion()
+    public byte getProtocolMajorVersion()
     {
         return getProtocolHandler().getProtocolMajorVersion();
     }
 
-    protected byte getProtocolMinorVersion()
+    public byte getProtocolMinorVersion()
     {
         return getProtocolHandler().getProtocolMinorVersion();
     }
@@ -2227,7 +2429,7 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                                  + message.getDeliveryTag());
+                        + message.getDeliveryTag());
                 }
 
                 messages.remove();
@@ -2250,6 +2452,7 @@
         for (Iterator it = consumers.iterator(); it.hasNext();)
         {
             BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+            consumer.failedOver();
             registerConsumer(consumer, true);
         }
     }
@@ -2276,11 +2479,10 @@
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                                                                  msg.getExchange(), msg.getContentHeader(), msg.getBodies());
-
-                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
-                    AMQShortString reason = msg.getReplyText();
-                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+                            		msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                        AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                        AMQShortString reason = msg.getReplyText();
+                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
                     if (errorCode == AMQConstant.NO_CONSUMERS)
@@ -2318,7 +2520,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    protected  void suspendChannel(boolean suspend) throws AMQException // , FailoverException
+    protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
     {
         synchronized (_suspensionLock)
         {
@@ -2339,6 +2541,13 @@
         }
     }
 
+    public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
+
+    Object getMessageDeliveryLock()
+    {
+        return _messageDeliveryLock;
+    }
+
     /**
      * Indicates whether this session consumers pre-fetche messages
      *
@@ -2350,7 +2559,62 @@
     }
 
 
-    public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
+    /** Signifies that the session has pending sends to commit. */
+    public void markDirty()
+    {
+        _dirty = true;
+    }
+
+    /** Signifies that the session has no pending sends to commit. */
+    public void markClean()
+    {
+        _dirty = false;
+        _failedOver = false;
+    }
+
+    /**
+     * Check to see if failover has occured since the last call to markClean(commit or rollback).
+     *
+     * @return boolean true if failover has occured.
+     */
+    public boolean hasFailedOver()
+    {
+        return _failedOver;
+    }
+
+    /**
+     * Check to see if any message have been sent in this transaction and have not been commited.
+     *
+     * @return boolean true if a message has been sent but not commited
+     */
+    public boolean isDirty()
+    {
+        return _dirty;
+    }
+
+    public void setTicket(int ticket)
+    {
+        _ticket = ticket;
+    }
+
+    public void setFlowControl(final boolean active)
+    {
+        _flowControl.setFlowControl(active);
+    }
+
+
+    public void checkFlowControl() throws InterruptedException
+    {
+        synchronized(_flowControl)
+        {
+            while(!_flowControl.getFlowControl())
+            {
+                _flowControl.wait();
+            }
+        }
+
+    }
+
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     class Dispatcher extends Thread
@@ -2361,6 +2625,7 @@
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
+        private String dispatcherID = "" + System.identityHashCode(this);
 
         public Dispatcher()
         {
@@ -2392,10 +2657,11 @@
                 }
 
                 // Reject messages on pre-receive queue
-                consumer.rollback();
+                consumer.rollbackPendingMessages();
 
                 // Reject messages on pre-dispatch queue
                 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+                //Let the dispatcher deal with this when it gets to them.
 
                 // closeConsumer
                 consumer.markClosed();
@@ -2436,6 +2702,13 @@
 
                 }
 
+                for (int i = 0; i < _removedConsumers.size(); i++)
+                {
+                    // Sends acknowledgement to server
+                    _removedConsumers.get(i).rollback();
+                    _removedConsumers.remove(i);
+                }
+
                 setConnectionStopped(isStopped);
             }
 
@@ -2484,9 +2757,16 @@
                         }
                         else
                         {
-                            synchronized (_messageDeliveryLock)
+                            if (message.getDeliveryTag() <= _rollbackMark.get())
                             {
-                                dispatchMessage(message);
+                                rejectMessage(message, true);
+                            }
+                            else
+                            {
+                                synchronized (_messageDeliveryLock)
+                                {
+                                    dispatchMessage(message);
+                                }
                             }
                         }
 
@@ -2535,38 +2815,38 @@
             //This if block is not needed anymore as bounce messages are handled separately
             //if (message.getDeliverBody() != null)
             //{
-                final BasicMessageConsumer consumer =
-                        (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
+            final BasicMessageConsumer consumer =
+                _consumers.get(message.getConsumerTag().toIntValue());
 
-                if ((consumer == null) || consumer.isClosed())
+            if ((consumer == null) || consumer.isClosed())
+            {
+                if (_dispatcherLogger.isInfoEnabled())
                 {
-                    if (_dispatcherLogger.isInfoEnabled())
+                    if (consumer == null)
                     {
-                        if (consumer == null)
-                        {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliveryTag() + "] from queue "
-                                                   + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
-                        }
-                        else
-                        {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
-                                                   + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
-                        }
+                        _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+                                + message.getDeliveryTag() + "] from queue "
+                                + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                     }
-                    // Don't reject if we're already closing
-                    if (!_closed.get())
+                    else
                     {
-                        rejectMessage(message, true);
+                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+                                + message.getDeliveryTag() + "] from queue " + " consumer("
+                                + message.getConsumerTag() + ") is closed rejecting(requeue)...");
                     }
                 }
-                else
+                // Don't reject if we're already closing
+                if (!_closed.get())
                 {
-                    consumer.notifyMessage(message, _channelId);
+                    rejectMessage(message, true);
                 }
             }
-        //}
+            else
+            {
+                consumer.notifyMessage(message);
+            }
+
+        }
     }
 
     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java?rev=651133&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java Wed Apr 23 18:54:20 2008
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+    public AMQSessionDirtyException(String msg)
+    {
+        super(AMQConstant.RESOURCE_ERROR, msg, null);
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=651133&r1=651132&r2=651133&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Apr 23 18:54:20 2008
@@ -395,16 +395,16 @@
         try
         {
             preAcquire = ( ! consumer.isNoConsume()  && consumer.getMessageSelector() == null) || !(consumer.getDestination() instanceof AMQQueue);
+            getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+                                              getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+                                              preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
+                                              new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
+                                              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
         }
         catch (JMSException e)
         {
             throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
         }
-        getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
-                                          (Boolean.getBoolean("noAck") ?Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED),
-                                          preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
-                                          new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
-                                          consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
 
         if (! prefetch())
         {
@@ -746,4 +746,10 @@
 
         return subscriber;
     }
+
+    Long requestQueueDepth(AMQDestination amqd)
+    {
+        return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
+    }
+
 }



Mime
View raw message