qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From grk...@apache.org
Subject svn commit: r966722 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/main/java/org/apache/qpid/server/transport/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/ha...
Date Thu, 22 Jul 2010 16:09:40 GMT
Author: grkvlt
Date: Thu Jul 22 16:09:40 2010
New Revision: 966722

URL: http://svn.apache.org/viewvc?rev=966722&view=rev
Log:
QPID-2657: Make Exceptions propagate to client for 0-10

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Thu Jul 22 16:09:40 2010
@@ -79,7 +79,7 @@ public class Subscription_0_10 implement
 
                                                 public void stateChange(Subscription sub,
State oldState, State newState)
                                                 {
-
+                                                    // TODO something ? log a message here
?
                                                 }
                                             };
     private AMQQueue _queue;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
Thu Jul 22 16:09:40 2010
@@ -105,7 +105,7 @@ public class ServerConnection extends Co
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
throws AMQException
     {
         ExecutionException ex = new ExecutionException();
-        ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED);
+        ex.setErrorCode(ExecutionErrorCode.get(cause.getCode()));
         ex.setDescription(message);
         ((ServerSession)session).invoke(ex);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Thu Jul 22 16:09:40 2010
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -233,8 +232,13 @@ public class ServerSessionDelegate exten
                     }
                     catch (AMQException e)
                     {
-                        // TODO
-                        throw new RuntimeException(e);
+                        ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                        if (e.getErrorCode() != null)
+                        {
+                            errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                        }
+                        String description = "Cannot subscribe to '" + destination + "':
" + e.getMessage();
+                        exception(session, method, errorCode, description);
                     }
                 }
             }
@@ -259,7 +263,7 @@ public class ServerSessionDelegate exten
         {
             exchange = exchangeRegistry.getDefaultExchange();
         }
-
+        
 
         DeliveryProperties delvProps = null;
         if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class))
!= null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -268,6 +272,17 @@ public class ServerSessionDelegate exten
         }
 
         MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+        
+        if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(),
messageMetaData.getRoutingKey(), exchange.getName()))
+        {
+            ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+            String description = "Permission denied: exchange-name '" + exchange.getName()
+ "'";
+            exception(ssn, xfr, errorCode, description);
+            
+	        ssn.processed(xfr);
+            return;
+        }
+        
         final MessageStore store = getVirtualHost(ssn).getMessageStore();
         StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
         ByteBuffer body = xfr.getBody();
@@ -365,8 +380,13 @@ public class ServerSessionDelegate exten
             }
             catch (AMQException e)
             {
-                //TODO
-                throw new RuntimeException(e);
+                ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                if (e.getErrorCode() != null)
+                {
+                    errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                }
+                String description = "Cannot flush subscription '" + destination + "': "
+ e.getMessage();
+                exception(session, method, errorCode, description);
             }
         }
     }
@@ -453,17 +473,15 @@ public class ServerSessionDelegate exten
                 {
                     exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange
Type: " + method.getType());
                 }
-                catch (AMQSecurityException e)
-                {
-                    ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
-                    String description = "Permission denied: exchange-name '" + exchangeName
+ "'";
-
-                    exception(session, method, errorCode, description);
-                }
                 catch (AMQException e)
                 {
-                    //TODO
-                    throw new RuntimeException(e);
+                    ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                    if (e.getErrorCode() != null)
+                    {
+                        errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                    }
+                    String description = "Cannot declare exchange '" + exchangeName + "':
" + e.getMessage();
+                    exception(session, method, errorCode, description);
                 }
             }
             else
@@ -486,6 +504,7 @@ public class ServerSessionDelegate exten
 
         session.invoke(ex);
 
+        session.close();
     }
 
     private Exchange getExchange(Session session, String exchangeName)
@@ -543,14 +562,15 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange
in use");
         }
-        catch (AMQSecurityException e)
-        {
-            exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied:
" + method.getExchange());
-        }
         catch (AMQException e)
         {
-            // TODO
-            throw new RuntimeException(e);
+            ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+            if (e.getErrorCode() != null)
+            {
+                errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+            }
+            String description = "Cannot delete exchange '" + method.getExchange() + "':
" + e.getMessage();
+            exception(session, method, errorCode, description);
         }
     }
 
@@ -630,10 +650,15 @@ public class ServerSessionDelegate exten
                     {
                         virtualHost.getBindingFactory().addBinding(method.getBindingKey(),
queue, exchange, method.getArguments());
                     }
-                    catch (AMQSecurityException e)
+                    catch (AMQException e)
                     {
-                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind
Exchange: '" + method.getExchange()
-                                + "' to Queue: '" + method.getQueue() + "' not allowed");
+                        ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                        if (e.getErrorCode() != null)
+                        {
+                            errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                        }
+                        String description = "Cannot add binding '" + method.getBindingKey()
+ "': " + e.getMessage();
+                        exception(session, method, errorCode, description);
                     }
                 }
                 else
@@ -686,9 +711,15 @@ public class ServerSessionDelegate exten
                 {
                     virtualHost.getBindingFactory().removeBinding(method.getBindingKey(),
queue, exchange, null);
                 }
-                catch (AMQSecurityException e)
+                catch (AMQException e)
                 {
-                    exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission
denied");
+                    ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                    if (e.getErrorCode() != null)
+                    {
+                        errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                    }
+                    String description = "Cannot remove binding '" + method.getBindingKey()
+ "': " + e.getMessage();
+                    exception(session, method, errorCode, description);
                 }
             }
         }
@@ -801,7 +832,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void queueDeclare(Session session, QueueDeclare method)
+    public void queueDeclare(Session session, final QueueDeclare method)
     {
 
         VirtualHost virtualHost = getVirtualHost(session);
@@ -909,8 +940,13 @@ public class ServerSessionDelegate exten
                                         }
                                         catch (AMQException e)
                                         {
-                                            // TODO
-                                            throw new RuntimeException(e);
+                                            ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                                            if (e.getErrorCode() != null)
+                                            {
+                                                errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                                            }
+                                            String description = "Cannot delete '" + method.getQueue()
+ "': " + e.getMessage();
+                                            exception(session, method, errorCode, description);
                                         }
                                     }
                                 };
@@ -948,16 +984,15 @@ public class ServerSessionDelegate exten
                             });
                         }
                     }
-                    catch (AMQSecurityException e)
-                    {
-                        String description = "Cannot declare queue('" + queueName + "'),
permission denied";
-                        ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
-                        exception(session, method, errorCode, description);
-                    }
                     catch (AMQException e)
                     {
-                        // TODO
-                        throw new RuntimeException(e);
+                        ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                        if (e.getErrorCode() != null)
+                        {
+                            errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                        }
+                        String description = "Cannot declare queue '" + queueName + "': "
+ e.getMessage();
+                        exception(session, method, errorCode, description);
                     }
                 }
             }
@@ -976,7 +1011,7 @@ public class ServerSessionDelegate exten
     }
 
     protected AMQQueue createQueue(final String queueName,
-                                   QueueDeclare body,
+                                   final QueueDeclare body,
                                    VirtualHost virtualHost,
                                    final ServerSession session)
             throws AMQException
@@ -1003,8 +1038,13 @@ public class ServerSessionDelegate exten
                                 }
                                 catch (AMQException e)
                                 {
-                                    //TODO
-                                    throw new RuntimeException(e);
+                                    ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                                    if (e.getErrorCode() != null)
+                                    {
+                                        errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                                    }
+                                    String description = "Cannot delete queue '" + body.getQueue()
+ "': " + e.getMessage();
+                                    exception(session, body, errorCode, description);
                                 }
                             }
                         }
@@ -1071,14 +1111,15 @@ public class ServerSessionDelegate exten
                             store.removeQueue(queue);
                         }
                     }
-                    catch (AMQSecurityException e)
-                    {
-                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission
denied: " + queueName);
-                    }
                     catch (AMQException e)
                     {
-                        // TODO
-                        throw new RuntimeException(e);
+                        ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                        if (e.getErrorCode() != null)
+                        {
+                            errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                        }
+                        String description = "Cannot delete queue '" + queueName + "': "
+ e.getMessage();
+                        exception(session, method, errorCode, description);
                     }
                 }
             }
@@ -1107,14 +1148,15 @@ public class ServerSessionDelegate exten
                 {
                     queue.clearQueue();
                 }
-                catch (AMQSecurityException e)
-                {
-                    exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission
denied: " + queueName);
-                }
                 catch (AMQException e)
                 {
-                    // TODO
-                    throw new RuntimeException(e);
+                    ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+                    if (e.getErrorCode() != null)
+                    {
+                        errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+                    }
+                    String description = "Cannot purge queue '" + queueName + "': " + e.getMessage();
+                    exception(session, method, errorCode, description);
                 }
             }
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu
Jul 22 16:09:40 2010
@@ -1037,7 +1037,7 @@ public class AMQConnection extends Close
                     {
                         long startCloseTime = System.currentTimeMillis();
 
-                        closeAllSessions(null, timeout, startCloseTime);
+	                    closeAllSessions(null, timeout, startCloseTime);
 
                         //This MUST occur after we have successfully closed all Channels/Sessions
                         _taskPool.shutdown();
@@ -1433,39 +1433,44 @@ public class AMQConnection extends Close
             _protocolHandler.getProtocolSession().notifyError(je);
         }
 
-        if (_exceptionListener != null)
+        // get the failover mutex before trying to close
+        synchronized (getFailoverMutex())
         {
-            _exceptionListener.onException(je);
-        }
-        else
-        {
-            _logger.error("Throwable Received but no listener set: " + cause.getMessage());
-        }
-
-        if (hardError(cause))
-        {
-            try
+            // decide if we are going to close the session
+            if (hardError(cause))
             {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Closing AMQConnection due to :" + cause.getMessage());
-                }
-
                 closer = (!_closed.getAndSet(true)) || closer;
-                if (closer)
                 {
-                    closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with
RejectedExecutionException from executor.
+                    _logger.info("Closing AMQConnection due to :" + cause);
                 }
             }
-            catch (JMSException e)
+            else
             {
-                _logger.error("Error closing all sessions: " + e, e);
+                _logger.info("Not a hard-error connection not closing: " + cause);
+            }
+            
+            // deliver the exception if there is a listener
+            if (_exceptionListener != null)
+            {
+                _exceptionListener.onException(je);
+            }
+            else
+            {
+                _logger.error("Throwable Received but no listener set: " + cause);
+            }
+    
+            // if we are closing the connection, close sessions first
+            if (closer)
+            {
+                try
+                {
+                    closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with
RejectedExecutionException from executor.
+                }
+                catch (JMSException e)
+                {
+                    _logger.error("Error closing all sessions: " + e, e);
+                }
             }
-
-        }
-        else
-        {
-            _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Thu Jul 22 16:09:40 2010
@@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 
             _conn._connected = true;
             _conn.setUsername(_qpidConnection.getUserID());
             _conn._failoverPolicy.attainedConnection();
-        } catch (ProtocolVersionException pe)
+        }
+        catch (ProtocolVersionException pe)
         {
             return new ProtocolVersion(pe.getMajor(), pe.getMinor());
-        } catch (ConnectionException e)
+        }
+        catch (ConnectionException ce)
         {
-            throw new AMQException(AMQConstant.CHANNEL_ERROR,
-                    "cannot connect to broker", e);
+            AMQConstant code = AMQConstant.REPLY_SUCCESS;
+            if (ce.getClose() != null && ce.getClose().getReplyCode() != null)
+            {
+                code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
+            }
+            String msg = "Cannot connect to broker: " + ce.getMessage();
+            throw new AMQException(code, msg, ce);
         }
 
         return null;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Thu Jul 22 16:09:40 2010
@@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQ
      * The latest qpid Exception that has been raised.
      */
     private Object _currentExceptionLock = new Object();
-    private SessionException _currentException;
+    private AMQException _currentException;
 
     // a ref on the qpid connection
     protected org.apache.qpid.transport.Connection _qpidConnection;
@@ -827,20 +827,9 @@ public class AMQSession_0_10 extends AMQ
         {
             if (_currentException != null)
             {
-                SessionException se = _currentException;
+                AMQException amqe = _currentException;
                 _currentException = null;
-                ExecutionException ee = se.getException();
-                int code;
-                if (ee == null)
-                {
-                    code = 0;
-                }
-                else
-                {
-                    code = ee.getErrorCode().getValue();
-                }
-                throw new AMQException
-                    (AMQConstant.getConstant(code), se.getMessage(), se);
+                throw amqe;
             }
         }
     }
@@ -869,7 +858,19 @@ public class AMQSession_0_10 extends AMQ
     {
         synchronized (_currentExceptionLock)
         {
-            _currentException = exc;
+            ExecutionException ee = exc.getException();
+            int code;
+            if (ee == null)
+            {
+                code = AMQConstant.INTERNAL_ERROR.getCode();
+            }
+            else
+            {
+                code = ee.getErrorCode().getValue();
+            }
+            AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(),
exc.getCause());
+            _connection.exceptionReceived(amqe);
+            _currentException = amqe;
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Thu Jul 22 16:09:40 2010
@@ -139,36 +139,24 @@ public class BasicMessageConsumer_0_10 e
      */
     @Override public void notifyMessage(AbstractJMSMessage jmsMessage)
     {
-        boolean messageOk = false;
         try
         {
-            messageOk = checkPreConditions(jmsMessage);
-        }
-        catch (AMQException e)
-        {
-            _logger.error("Receivecd an Exception when receiving message",e);
-            try
-            {
-
-                getSession().getAMQConnection().getExceptionListener()
-                        .onException(new JMSAMQException("Error when receiving message",
e));
-            }
-            catch (Exception e1)
+            if (checkPreConditions(jmsMessage))
             {
-                // we should silently log thie exception as it only hanppens when the connection
is closed
-                _logger.error("Exception when receiving message", e1);
+                if (isMessageListenerSet() && capacity == 0)
+                {
+                    _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                              MessageCreditUnit.MESSAGE,
1,
+                                                              Option.UNRELIABLE);
+                }
+                _logger.debug("messageOk, trying to notify");
+                super.notifyMessage(jmsMessage);
             }
         }
-        if (messageOk)
+        catch (AMQException e)
         {
-            if (isMessageListenerSet() && capacity == 0)
-            {
-                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                          MessageCreditUnit.MESSAGE, 1,
-                                                          Option.UNRELIABLE);
-            }
-            _logger.debug("messageOk, trying to notify");
-            super.notifyMessage(jmsMessage);
+            _logger.error("Receivecd an Exception when receiving message",e);
+            getSession().getAMQConnection().exceptionReceived(e);
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Thu Jul 22 16:09:40 2010
@@ -29,6 +29,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -220,11 +221,11 @@ public class BasicMessageProducer_0_10 e
             if (sync)
             {
                 ssn.sync();
+                ((AMQSession_0_10) getSession()).getCurrentException();
             }
             
-            
         }
-        catch (RuntimeException e)
+        catch (Exception e)
         {
             JMSException jmse = new JMSException("Exception when sending message");
             jmse.setLinkedException(e);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
Thu Jul 22 16:09:40 2010
@@ -22,6 +22,7 @@ package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
@@ -72,12 +73,18 @@ public class ConnectionCloseMethodHandle
 
             if (errorCode != AMQConstant.REPLY_SUCCESS)
             {
-                if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
+                if (errorCode == AMQConstant.NOT_ALLOWED)
                 {
                     _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
 
                     error = new AMQAuthenticationException(errorCode, reason == null ? null
: reason.toString(), null);
                 }
+                else if (errorCode == AMQConstant.ACCESS_REFUSED)
+                {
+                    _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
+
+                    error = new AMQSecurityException(reason == null ? null : reason.toString(),
null);
+                }
                 else
                 {
                     _logger.info("Connection close received with error code " + errorCode);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
Thu Jul 22 16:09:40 2010
@@ -20,29 +20,19 @@
  */
 package org.apache.qpid.transport;
 
-import java.util.Collections;
-
+import static org.apache.qpid.transport.Connection.State.*;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import java.io.UnsupportedEncodingException;
 
 import org.apache.qpid.QpidException;
 
 import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 
-import static org.apache.qpid.transport.Connection.State.*;
-
-
 /**
  * ServerDelegate
  *
@@ -96,8 +86,7 @@ public class ServerDelegate extends Conn
             SaslServer ss = createSaslServer(mechanism);
             if (ss == null)
             {
-                conn.connectionClose
-                    (ConnectionCloseCode.CONNECTION_FORCED,
+                conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED,
                      "null SASL mechanism: " + mechanism);
                 return;
             }
@@ -107,14 +96,14 @@ public class ServerDelegate extends Conn
         catch (SaslException e)
         {
             conn.exception(e);
+            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
         }
     }
 
     protected SaslServer createSaslServer(String mechanism)
             throws SaslException
     {
-        SaslServer ss = Sasl.createSaslServer
-            (mechanism, "AMQP", "localhost", null, null);
+        SaslServer ss = Sasl.createSaslServer(mechanism, "AMQP", "localhost", null, null);
         return ss;
     }
 
@@ -141,6 +130,7 @@ public class ServerDelegate extends Conn
         catch (SaslException e)
         {
             conn.exception(e);
+            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
         }
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Jul
22 16:09:40 2010
@@ -734,11 +734,7 @@ public class Session extends SessionInvo
 
             if (lt(maxComplete, point))
             {
-                if (state == CLOSED)
-                {
-                    throw new SessionException(getException());
-                }
-                else
+                if (state != CLOSED)
                 {
                     throw new SessionException
                         (String.format

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Thu Jul 22 16:09:40 2010
@@ -139,6 +139,7 @@ public class SessionDelegate
     @Override public void executionException(Session ssn, ExecutionException exc)
     {
         ssn.setException(exc);
+        ssn.getSessionListener().exception(ssn, new SessionException(exc));
     }
 
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
Thu Jul 22 16:09:40 2010
@@ -38,6 +38,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.URLSyntaxException;
 
 /**
@@ -59,7 +60,7 @@ import org.apache.qpid.url.URLSyntaxExce
  */
 public class SimpleACLTest extends AbstractACLTestCase
 {    
-    public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
+    public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception
     {
         try
         {
@@ -78,7 +79,7 @@ public class SimpleACLTest extends Abstr
         }
     }
     
-    public void testAccessVhostAuthorisedGuest() throws IOException, Exception
+    public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception
     {
         //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(),
and so
         //is unable to perform actions such as connecting (and by extension, creating a queue,
and consuming 
@@ -117,8 +118,7 @@ public class SimpleACLTest extends Abstr
         }
     }
     
-    // XXX one
-    public void testAccessNoRights() throws Exception
+    public void testAccessNoRightsFailure() throws Exception
     {
         try
         {
@@ -131,13 +131,14 @@ public class SimpleACLTest extends Abstr
         }
         catch (JMSException e)
         {
-            // XXX JMSException -> linkedException -> cause = AMQException.403
+            // JMSException -> linkedException -> cause = AMQException (403 or 320)
             Exception linkedException = e.getLinkedException();
             assertNotNull("There was no linked exception", linkedException);
             Throwable cause = linkedException.getCause();
             assertNotNull("Cause was null", cause);
-            assertTrue("Wrong linked exception type",cause instanceof AMQException);
-            assertEquals("Incorrect error code received", 403, ((AMQException) cause).getErrorCode().getCode());
+            assertTrue("Wrong linked exception type", cause instanceof AMQException);
+            AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED;
+            assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode());
         }
     }
     
@@ -166,7 +167,6 @@ public class SimpleACLTest extends Abstr
         }
     }
     
-    // XXX two
     public void testServerDeleteQueueFailure() throws Exception
     {
         try
@@ -188,12 +188,12 @@ public class SimpleACLTest extends Abstr
         }
         catch (JMSException e)
         {
-            // XXX JMSException -> linedException = AMQException.403
+            // JMSException -> linedException = AMQException.403
             check403Exception(e.getLinkedException());
         }
     }
 
-    public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException,
Exception
+    public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException,
Exception
     {
         try
         {
@@ -213,7 +213,7 @@ public class SimpleACLTest extends Abstr
         }
     }
 
-    public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
+    public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception
     {
         try
         {
@@ -225,8 +225,6 @@ public class SimpleACLTest extends Abstr
 
             sess.createConsumer(sess.createQueue("IllegalQueue"));
             
-            conn.close();
-
             fail("Test failed as consumer was created.");
         }
         catch (JMSException e)
@@ -235,7 +233,7 @@ public class SimpleACLTest extends Abstr
         }
     }
 
-    public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException,
Exception
+    public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException,
Exception
     {
         try
         {
@@ -257,7 +255,7 @@ public class SimpleACLTest extends Abstr
         }
     }
 
-    public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException,
Exception
+    public void testClientCreateNamedQueueFailure() throws NamingException, JMSException,
AMQException, Exception
     {
         try
         {
@@ -275,7 +273,6 @@ public class SimpleACLTest extends Abstr
         }
         catch (AMQException e)
         {
-            // XXX AMQException.403
             check403Exception(e);
         }
     }
@@ -405,8 +402,6 @@ public class SimpleACLTest extends Abstr
             conn.start();
 
             sess.createConsumer(sess.createQueue("Invalid"));
-            
-            conn.close();
 
             fail("Test failed as consumer was created.");
         }
@@ -520,7 +515,7 @@ public class SimpleACLTest extends Abstr
 
     /**
      * This test uses both the cilent and sender to validate that the Server is able to publish
to a temporary queue.
-     * The reason the client must be in volved is that the Serve is unable to create its
own Temporary Queues.
+     * The reason the client must be involved is that the Server is unable to create its
own Temporary Queues.
      *
      * @throws AMQException
      * @throws URLSyntaxException

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
Thu Jul 22 16:09:40 2010
@@ -61,6 +61,7 @@ public class GlobalQueuesTest extends Te
 
      */
 
+
     /**
      *  VirtualHost Plugin Configuration
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
Thu Jul 22 16:09:40 2010
@@ -172,22 +172,8 @@ public class TestingBaseCase extends Qpi
         startPublisher(_destination);
 
         boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
-
-        if (!disconnected && isBroker010())
-        {
-            try
-            {
-                ((AMQSession_0_10) session).sync();
-            }
-            catch (AMQException amqe)
-            {
-                JMSException jmsException = new JMSException(amqe.getMessage());
-                jmsException.setLinkedException(amqe);
-                jmsException.initCause(amqe);
-                _connectionException = jmsException;
-            }
-        }
-
+        
+        assertTrue("Client was not disconnected", disconnected);
         assertTrue("Client was not disconnected.", _connectionException != null);
 
         Exception linked = _connectionException.getLinkedException();
@@ -209,11 +195,11 @@ public class TestingBaseCase extends Qpi
 
         assertNotNull("No linked exception set on:" + _connectionException.getMessage(),
linked);
 
-        assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class,
linked.getClass());
+        assertTrue("Incorrect linked exception received.", linked instanceof AMQException);
 
-        AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
+        AMQException amqException = (AMQException) linked;
 
-        assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR,
ccException.getErrorCode());
+        assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR,
amqException.getErrorCode());
     }
 
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=966722&r1=966721&r2=966722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
Thu Jul 22 16:09:40 2010
@@ -145,7 +145,7 @@ public class ConnectionTest extends Qpid
         catch (AMQConnectionFailureException amqe)
         {
             assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause());
-            assertEquals("Exception was wrong type", AMQAuthenticationException.class, amqe.getCause().getClass());
+            assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException);
         }
         finally
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message