qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r593486 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Date Fri, 09 Nov 2007 10:43:47 GMT
Author: arnaudsimon
Date: Fri Nov  9 02:43:47 2007
New Revision: 593486

URL: http://svn.apache.org/viewvc?rev=593486&view=rev
Log:
removed remote exchange class querying when standard exchanges are used

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=593486&r1=593485&r2=593486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Fri Nov  9 02:43:47 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.nclient.Session;
@@ -57,7 +58,7 @@
     /**
      * Number of received message so far
      */
-      private final AtomicLong _messagesReceived = new AtomicLong(0);
+    private final AtomicLong _messagesReceived = new AtomicLong(0);
 
     /**
      * This class logger
@@ -95,7 +96,7 @@
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory,
session, protocolHandler,
               rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
noConsume, autoClose);
         _0_10session = (AMQSession_0_10) session;
-        if (messageSelector != null && ! messageSelector.equals("") )
+        if (messageSelector != null && !messageSelector.equals(""))
         {
             try
             {
@@ -110,7 +111,7 @@
                 _preAcquire = false;
             }
         }
-         _isStarted  = connection.started();
+        _isStarted = connection.started();
     }
 
     // ----- Interface org.apache.qpidity.client.util.MessageListener
@@ -136,7 +137,7 @@
             catch (Exception e1)
             {
                 // the receiver may be waiting for a message
-                if( _messageCounter.get() >= 0)
+                if (_messageCounter.get() >= 0)
                 {
                     _messageCounter.decrementAndGet();
                     _synchronousQueue.add(new NullTocken());
@@ -153,13 +154,13 @@
 
     public void onMessage(Message message)
     {
-        if( isMessageListenerSet())
+        if (isMessageListenerSet())
         {
             _messagesReceived.incrementAndGet();
-            if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+            if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
             {
                 // require more credit
-                 _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
                                                           AMQSession_0_10.MAX_PREFETCH);
                 _messagesReceived.set(0);
@@ -182,11 +183,9 @@
             getSession().getAMQConnection().exceptionReceived(e);
         }
         Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-         // if there is a replyto destination then we need to request the exchange info
+        // if there is a replyto destination then we need to request the exchange info
         ReplyTo replyTo = message.getMessageProperties().getReplyTo();
-        if (replyTo != null &&
-            replyTo.getExchangeName() != null &&
-            !replyTo.getExchangeName().equals(""))
+        if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals(""))
         {
             // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
             // the exchnage class will be set later from within the sesion thread
@@ -234,20 +233,41 @@
         super.postDeliver(msg);
     }
 
-  void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+    void notifyMessage(UnprocessedMessage messageFrame, int channelId)
     {
-       // if there is a replyto destination then we need to request the exchange info
-        String replyToURL = messageFrame.getReplyToURL() ;
-        if (replyToURL != null && ! replyToURL.equals(""))
-        {
-            String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/'));
-            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName);
-            ExchangeQueryResult res = future.get();
-            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-            String replyToUrl = res.getType() +  "://" + replyToURL;
+        // if there is a replyto destination then we need to request the exchange info
+        String replyToURL = messageFrame.getReplyToURL();
+        if (replyToURL != null && !replyToURL.equals(""))
+        {
+            AMQShortString  shortExchangeName = new AMQShortString( replyToURL.substring(0,
replyToURL.indexOf('/')));
+            String replyToUrl = "://" + replyToURL;
+            if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
+            {
+                replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
+            }
+            else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
+            {
+                replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
+            }
+            else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
+            {
+                replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
+            }
+            else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
+            {
+                replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
+            }
+            else
+            {
+                Future<ExchangeQueryResult> future =
+                        ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
+                ExchangeQueryResult res = future.get();
+                // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+                replyToUrl = res.getType() + replyToUrl;
+            }
             ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
         }
-      super.notifyMessage(messageFrame, channelId);
+        super.notifyMessage(messageFrame, channelId);
     }
 
     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
@@ -273,7 +293,7 @@
         // TODO Use a tag for fiding out if message filtering is done here or by the broker.
         try
         {
-            if (getMessageSelector() != null && ! getMessageSelector().equals(""))
+            if (getMessageSelector() != null && !getMessageSelector().equals(""))
             {
                 messageOk = _filter.matches(message);
             }
@@ -283,7 +303,7 @@
             throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message
selector", e);
         }
 
-       if (_logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
@@ -308,7 +328,7 @@
         }
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
-        if (!_preAcquire && messageOk && ! isNoConsume())
+        if (!_preAcquire && messageOk && !isNoConsume())
         {
             if (_logger.isDebugEnabled())
             {
@@ -316,7 +336,7 @@
             }
             messageOk = acquireMessage(message);
         }
-        if( ! messageOk )
+        if (!messageOk)
         {
             requestCreditIfCreditMode();
         }
@@ -330,14 +350,17 @@
             // the current message received is not good, so we need to get a message.
             if (getMessageListener() == null)
             {
-                  int oldval = _messageCounter.intValue();
+                int oldval = _messageCounter.intValue();
                 _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                    org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
+                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                                          1);
+                _0_10session.getQpidSession()
+                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
                 _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                 _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
-                if( _messageCounter.intValue() <= oldval )
+                _0_10session.getQpidSession()
+                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
+                if (_messageCounter.intValue() <= oldval)
                 {
                     // we haven't received a message so tell the receiver to return null
                     _synchronousQueue.add(new NullTocken());
@@ -350,9 +373,11 @@
             // we now need to check if we have received a message
 
         }
-        catch(Exception e)
+        catch (Exception e)
         {
-            _logger.error("Error getting message listener, couldn't request credit after
releasing a message that failed the selector test",e);
+            _logger.error(
+                    "Error getting message listener, couldn't request credit after releasing
a message that failed the selector test",
+                    e);
         }
     }
 
@@ -390,10 +415,10 @@
         }
     }
 
-      protected void rollbackReceivedMessages()
-      {
-          // do nothing as the rollback operation will do the job.
-      }
+    protected void rollbackReceivedMessages()
+    {
+        // do nothing as the rollback operation will do the job.
+    }
 
     /**
      * Acquire a message
@@ -430,17 +455,19 @@
         if (messageListener == null)
         {
             _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
-            _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+            _0_10session.getQpidSession()
+                    .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
             _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                    org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
-                    0xFFFFFFFF);
+                                                      org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+                                                      0xFFFFFFFF);
             _0_10session.getQpidSession().sync();
         }
         else
         {
             if (_connection.started())
             {
-                _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_WINDOW);
+                _0_10session.getQpidSession()
+                        .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
                 _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
                                                           AMQSession_0_10.MAX_PREFETCH);
@@ -448,20 +475,21 @@
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
-                _messagesReceived.set(0);;
+                _messagesReceived.set(0);
+                ;
             }
         }
     }
 
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if( !_isStarted )
+        if (!_isStarted)
         {
             return null;
         }
         Object o;
         _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+                                                  org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
1);
 
         if (l == 0)
         {
@@ -480,24 +508,25 @@
             if (o == null)
             {
                 _logger.debug("Message Didn't arrive in time, checking if one is inflight");
-               // checking if one is inflight
+                // checking if one is inflight
                 _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                 _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
-                if( _messageCounter.get() > 0 )
+                _0_10session.getQpidSession()
+                        .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
+                if (_messageCounter.get() > 0)
                 {
-                      o = _synchronousQueue.take();
+                    o = _synchronousQueue.take();
                 }
             }
         }
-        if( o instanceof NullTocken )
+        if (o instanceof NullTocken)
         {
             o = null;
         }
         return o;
     }
 
-     protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
     {
         _messageCounter.decrementAndGet();
         super.preApplicationProcessing(jmsMsg);
@@ -507,14 +536,14 @@
     {
 
     }
-        
+
     public void start()
     {
-       _isStarted = true;
+        _isStarted = true;
     }
 
     public void stop()
     {
-       _isStarted = false;
+        _isStarted = false;
     }
 }



Mime
View raw message