qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tom...@apache.org
Subject svn commit: r537019 - in /incubator/qpid/branches/M2/dotnet: Qpid.Client.Tests/undeliverable/ Qpid.Client/ Qpid.Client/Client/ Qpid.Client/Client/Handler/ Qpid.Common/ Qpid.Common/Protocol/
Date Thu, 10 May 2007 22:25:04 GMT
Author: tomasr
Date: Thu May 10 15:25:01 2007
New Revision: 537019

URL: http://svn.apache.org/viewvc?view=rev&rev=537019
Log:
QPID-441 Fix handling of bounced messages

Modified:
    incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
    incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
(original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
Thu May 10 15:25:01 2007
@@ -26,69 +26,103 @@
 
 namespace Qpid.Client.Tests
 {
-    [TestFixture]
-    public class UndeliverableTest : BaseMessagingTestFixture
-    {
-        private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
-
-        [SetUp]
-        public override void Init()
-        {
-            base.Init();
-
-            try
-            {
-                _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
-            }
-            catch (QpidException e)
-            {
-                _logger.Error("Could not add ExceptionListener", e);
-            }
-        }
-
-        public static void OnException(Exception e)
-        {
-            // Here we dig out the AMQUndelivered exception (if present) in order to log
the returned message.
-
-            _logger.Error("OnException handler received connection-level exception", e);
-            if (e is QpidException)
-            {
-                QpidException qe = (QpidException)e;
-                if (qe.InnerException is AMQUndeliveredException)
-                {
-                    AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
-                    _logger.Error("inner exception is AMQUndeliveredException", ue);
-                    _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
-
-                }
-            }
-        }
-
-        [Test]
-        public void SendUndeliverableMessage()
-        {
-            SendOne("default exchange", null);
-            SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
-            SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
-            SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
-
-            Thread.Sleep(1000); // Wait for message returns!
-        }
-
-        private void SendOne(string exchangeNameFriendly, string exchangeName)
-        {
-            _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
-
-            // Send a test message to a non-existant queue on the default exchange. See if
message is returned!
-            MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
-                .WithRoutingKey("Non-existant route key!")
-                .WithMandatory(true);
-            if (exchangeName != null)
+   /// <summary>
+   /// Tests that when sending undeliverable messages with the 
+   /// mandatory flag set, an exception is raised on the connection
+   /// as the message is bounced back by the broker
+   /// </summary>
+   [TestFixture]
+   public class UndeliverableTest : BaseMessagingTestFixture
+   {
+      private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
+      private ManualResetEvent _event;
+      public const int TIMEOUT = 1000;
+      private Exception _lastException;
+
+      [SetUp]
+      public override void Init()
+      {
+         base.Init();
+         _event = new ManualResetEvent(false);
+         _lastException = null;
+
+         try
+         {
+            _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
+         } catch ( QpidException e )
+         {
+            _logger.Error("Could not add ExceptionListener", e);
+         }
+      }
+
+      public void OnException(Exception e)
+      {
+         // Here we dig out the AMQUndelivered exception (if present) in order to log the
returned message.
+
+         _lastException = e;
+         _logger.Error("OnException handler received connection-level exception", e);
+         if ( e is QpidException )
+         {
+            QpidException qe = (QpidException)e;
+            if ( qe.InnerException is AMQUndeliveredException )
             {
-                builder.WithExchangeName(exchangeName);
+               AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
+               _logger.Error("inner exception is AMQUndeliveredException", ue);
+               _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
             }
-            IMessagePublisher publisher = builder.Create();
-            publisher.Send(_channel.CreateTextMessage("Hiya!"));
-        }
-    }
+         }
+         _event.Set();
+      }
+
+      [Test]
+      public void SendUndeliverableMessageOnDefaultExchange()
+      {
+         SendOne("default exchange", null);
+      }
+      [Test]
+      public void SendUndeliverableMessageOnDirectExchange()
+      {
+         SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
+      }
+      [Test]
+      public void SendUndeliverableMessageOnTopicExchange()
+      {
+         SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
+      }
+      [Test]
+      public void SendUndeliverableMessageOnHeadersExchange()
+      {
+         SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+      }
+
+      private void SendOne(string exchangeNameFriendly, string exchangeName)
+      {
+         _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
+
+         // Send a test message to a non-existant queue 
+         // on the specified exchange. See if message is returned!
+         MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
+             .WithRoutingKey("Non-existant route key!")
+             .WithMandatory(true); // necessary so that the server bounces the message back
+         if ( exchangeName != null )
+         {
+            builder.WithExchangeName(exchangeName);
+         }
+         IMessagePublisher publisher = builder.Create();
+         publisher.Send(_channel.CreateTextMessage("Hiya!"));
+
+         // check we received an exception on the connection
+         // and that it is of the right type
+         _event.WaitOne(TIMEOUT, true);
+
+         Type expectedException = typeof(AMQUndeliveredException);
+         Exception ex = _lastException;
+         Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
+
+         if ( ex.InnerException != null )
+            ex = ex.InnerException;
+
+         Assert.IsInstanceOfType(expectedException, ex);
+      }
+   }
 }

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May 10 15:25:01
2007
@@ -28,6 +28,7 @@
 using Qpid.Collections;
 using Qpid.Framing;
 using Qpid.Messaging;
+using Qpid.Protocol;
 
 namespace Qpid.Client
 {
@@ -568,8 +569,14 @@
             if (_logger.IsDebugEnabled)
             {
                 _logger.Debug("Message received in session with channel id " + _channelId);
-            }            
-            _queue.EnqueueBlocking(message);         
+            }
+            if ( message.DeliverBody == null )
+            {
+               ReturnBouncedMessage(message);
+            } else
+            {
+               _queue.EnqueueBlocking(message);
+            }
         }
 
         public int DefaultPrefetch
@@ -986,5 +993,42 @@
             // FIXME: lock FailoverMutex here?
             _connection.ProtocolWriter.Write(ackFrame);
         }
+
+       /// <summary>
+       /// Handle a message that bounced from the server, creating
+       /// the corresponding exception and notifying the connection about it
+       /// </summary>
+       /// <param name="message">Unprocessed message</param>
+       private void ReturnBouncedMessage(UnprocessedMessage message)
+       {
+          try
+          {
+             AbstractQmsMessage bouncedMessage =
+                _messageFactoryRegistry.CreateMessage(
+                     0, false, message.ContentHeader,
+                     message.Bodies
+                );
+
+             int errorCode = message.BounceBody.ReplyCode;
+             string reason = message.BounceBody.ReplyText;
+             _logger.Debug("Message returned with error code " + errorCode + " (" + reason
+ ")");
+             AMQException exception;
+             if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+             {
+                exception = new AMQNoConsumersException(reason, bouncedMessage);
+             } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+             {
+                exception = new AMQNoRouteException(reason, bouncedMessage);
+             } else
+             {
+                exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+             }
+             _connection.ExceptionReceived(exception);
+          } catch ( Exception ex )
+          {
+             _logger.Error("Caught exception trying to raise undelivered message exception
(dump follows) - ignoring...", ex);
+          }
+
+       }
     }
 }

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
(original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
Thu May 10 15:25:01 2007
@@ -32,7 +32,7 @@
 
         public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) 
         {
-            _logger.Debug("New JmsBounce method received");
+            _logger.Debug("New Basic.Return method received");
             UnprocessedMessage msg = new UnprocessedMessage();
             msg.DeliverBody = null;
             msg.BounceBody = (BasicReturnBody) evt.Method;

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
(original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
Thu May 10 15:25:01 2007
@@ -44,11 +44,20 @@
 
             AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
             evt.ProtocolSession.WriteFrame(frame);
-            // HACK
+
             if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
             {
-                _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing
exception");
-                evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode,
"Error: " + reason));
+               _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing
exception");
+               if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+                  throw new AMQNoConsumersException(reason);
+               if ( errorCode == AMQConstant.NO_ROUTE.Code )
+                  throw new AMQNoRouteException(reason);
+               if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+                  throw new AMQInvalidArgumentException(reason);
+               if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+                  throw new AMQInvalidRoutingKeyException(reason);
+               // any other
+               throw new AMQChannelClosedException(errorCode, "Error: " + reason);
             }
             evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
         }

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 10 15:25:01 2007
@@ -43,6 +43,8 @@
     <Compile Include="Client\AMQDestination.cs" />
     <Compile Include="Client\AmqChannel.cs" />
     <Compile Include="Client\AMQAuthenticationException.cs" />
+    <Compile Include="Client\AMQNoConsumersException.cs" />
+    <Compile Include="Client\AMQNoRouteException.cs" />
     <Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs"
/>
     <Compile Include="Client\Message\QpidHeaders.cs" />
     <Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@
   <Target Name="AfterBuild">
   </Target>
   -->
-</Project>
+</Project>
\ No newline at end of file

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs Thu May 10 15:25:01
2007
@@ -77,17 +77,20 @@
       public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
       public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers",
true);
       public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in
use", true);
-      public static readonly AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context
unknown", true);
-      public static readonly AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector
invalid", true);
       public static readonly AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path",
true);
       public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused",
true);
       public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+      public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405, "already exists",
true);
+      public static readonly AMQConstant IN_USE = new AMQConstant(406, "in use", true);
+      public static readonly AMQConstant INVALID_ROUTING_KEY = new AMQConstant(407, "routing
key invalid", true);
+      public static readonly AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "request
timeout", true);
+      public static readonly AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument
invalid", true);
       public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error",
true);
       public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error",
true);
       public static readonly AMQConstant COMMAND_INVALID = new AMQConstant(503, "command
invalid", true);
       public static readonly AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error",
true);
       public static readonly AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource
error", true);
-      public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed",
true);
+      public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed",
true);
       public static readonly AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented",
true);
       public static readonly AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal
error", true);
 

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj Thu May 10 15:25:01 2007
@@ -44,6 +44,8 @@
     <Compile Include="AMQConnectionClosedException.cs" />
     <Compile Include="AMQDisconnectedException.cs" />
     <Compile Include="AMQException.cs" />
+    <Compile Include="AMQInvalidArgumentException.cs" />
+    <Compile Include="AMQInvalidRoutingKeyException.cs" />
     <Compile Include="AMQUndeliveredException.cs" />
     <Compile Include="AssemblySettings.cs" />
     <Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@
   <Target Name="AfterBuild">
   </Target>
   -->
-</Project>
+</Project>
\ No newline at end of file



Mime
View raw message