qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r584728 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Date Mon, 15 Oct 2007 10:59:03 GMT
Author: arnaudsimon
Date: Mon Oct 15 03:59:02 2007
New Revision: 584728

URL: http://svn.apache.org/viewvc?rev=584728&view=rev
Log:
Changed handling of replyTo

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=584728&r1=584727&r2=584728&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 15 03:59:02 2007
@@ -147,17 +147,15 @@
             getSession().getAMQConnection().exceptionReceived(e);
         }
         Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-        // if there is a replyto destination then we need to request the exchange info
+         // if there is a replyto destination then we need to request the exchange info
         ReplyTo replyTo = message.getMessageProperties().getReplyTo();
         if (replyTo != null &&
             replyTo.getExchangeName() != null &&
             !replyTo.getExchangeName().equals(""))
         {
-            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
-                    .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
-            ExchangeQueryResult res = future.get();
             // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-            String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+            // the exchnage class will be set later from within the sesion thread
+            String replyToUrl =  message.getMessageProperties().getReplyTo()
                     .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
                     .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
             newMessage.setReplyToURL(replyToUrl);
@@ -199,6 +197,21 @@
         super.postDeliver(msg);
     }
 
+  void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+    {
+       // if there is a replyto destination then we need to request the exchange info
+        String replyToURL = messageFrame.getReplyToURL() ;
+        if (replyToURL != null && ! replyToURL.equals(""))
+        {
+            String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/'));
+            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName);
+            ExchangeQueryResult res = future.get();
+            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+            String replyToUrl = res.getType() +  "://" + replyToURL;
+            ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
+        }
+      super.notifyMessage(messageFrame, channelId);
+    }
 
     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
             UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception



Mime
View raw message