qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r532786 - /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Date Thu, 26 Apr 2007 15:59:25 GMT
Author: ritchiem
Date: Thu Apr 26 08:59:24 2007
New Revision: 532786

URL: http://svn.apache.org/viewvc?view=rev&rev=532786
Log:
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client. 

This disables the JMS features that rely upon Qpid Java broker specific features.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=532786&r1=532785&r2=532786
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Apr 26 08:59:24 2007
@@ -202,11 +202,20 @@
     /** Boolean to control immediate prefetch . Records the first call to the dispatcher
to prevent further flow(true) */
     private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_DEFAULT = "false";
+
+    private final boolean _strictAMQP;
+
+
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
     /** Immediate message prefetch default */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
+    private final boolean _immediatePrefetch;
 
     private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
 
@@ -435,6 +444,10 @@
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark,
int defaultPrefetchLowMark)
     {
+
+        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP,
STRICT_AMQP_DEFAULT));
+        _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT));
+
         _connection = con;
         _transacted = transacted;
         if (transacted)
@@ -921,15 +934,27 @@
                 _dispatcher.rollback();
             }
 
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation
for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                       getProtocolMajorVersion(),
-                                                                                       getProtocolMinorVersion(),
-                                                                                       false)
   // requeue
-                    , BasicRecoverOkBody.class);
+            if (isStrictAMQP())
+            {
+                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+                _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                        
   getProtocolMajorVersion(),
+                                                                                        
   getProtocolMinorVersion(),
+                                                                                        
   false));    // requeue
+                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages
may arrive out of order.");                
+            }
+            else
+            {
 
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation
for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                        
  getProtocolMajorVersion(),
+                                                                                        
  getProtocolMinorVersion(),
+                                                                                        
  false)    // requeue
+                        , BasicRecoverOkBody.class);
+            }
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1433,7 +1458,6 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler,
                                   boolean nowait, String messageSelector) throws AMQException
     {
-        //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
 
@@ -1709,11 +1733,21 @@
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         return createBrowser(queue, null);
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         checkNotClosed();
         checkValidQueue(queue);
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString
routingKey) throws JMSException
     {
+        if (isStrictAMQP())
+        {
+            throw new UnsupportedOperationException();
+        }
+
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
                                                                getProtocolMajorVersion(),
getProtocolMinorVersion(),    // AMQP version (major, minor)
@@ -1940,7 +1979,7 @@
     synchronized void startDistpatcherIfNecessary()
     {
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching          
-        if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
             if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
-        if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT)))
+        if (!_immediatePrefetch)
         {
             // The dispatcher will be null if we have just created this session
             // so suspend the channel before we register our consumer so that we don't
@@ -2390,6 +2429,11 @@
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
+    }
+
+    public boolean isStrictAMQP()
+    {
+        return _strictAMQP;
     }
 
 }



Mime
View raw message