qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r887994 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/message/ systests/src/main/java/org/apache/qpid/test/client/message/ test-profiles/
Date Mon, 07 Dec 2009 16:47:55 GMT
Author: ritchiem
Date: Mon Dec  7 16:47:53 2009
New Revision: 887994

URL: http://svn.apache.org/viewvc?rev=887994&view=rev
Log:
QPID-2242 : Update to the 0-8/9 code path to use the 0-10 static lookup tables for the Destination
type when JMS_QPID_DESTTYPE has not been set.

Added:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
    qpid/trunk/qpid/java/test-profiles/08Excludes

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
Mon Dec  7 16:47:53 2009
@@ -22,7 +22,6 @@
 package org.apache.qpid.client.message;
 
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
Mon Dec  7 16:47:53 2009
@@ -41,12 +41,13 @@
 import javax.jms.MessageFormatException;
 import javax.jms.DeliveryMode;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import org.apache.qpid.exchange.ExchangeDefaults;
 
-public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
+/**
+ * This extends AbstractAMQMessageDelegate which contains common code between
+ * both the 0_8 and 0_10 Message types.
+ *
+ */
+public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
 {
     private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new
ReferenceMap());
 
@@ -64,27 +65,6 @@
     private AMQSession _session;
     private final long _deliveryTag;
 
-    private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString,
Integer>();
-    private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String,
Integer>();
-    private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String,
Integer>();;
-
-    static
-    {
-        _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
-        _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
-
-        _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
-        _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
-        _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-        _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-
-
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE);
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
-    }
 
     protected AMQMessageDelegate_0_10()
     {
@@ -92,80 +72,49 @@
         _readableProperties = false;
     }
 
-    private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
-    {
-        AMQDestination dest;
-        switch(getExchangeType(exchange))
-        {
-            case AMQDestination.QUEUE_TYPE:
-                dest = new AMQQueue(exchange, routingKey, routingKey);
-                break;
-            case  AMQDestination.TOPIC_TYPE:
-                dest = new AMQTopic(exchange, routingKey, null);
-                break;
-            default:
-                dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
-        }
-
-        return dest;
-    }
-
-    private int getExchangeType(AMQShortString exchange)
+    protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties
deliveryProps, long deliveryTag)
     {
-        Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING
: exchange);
-
-        if(type == null)
-        {
-            return AMQDestination.UNKNOWN_TYPE;
-        }
+        _messageProps = messageProps;
+        _deliveryProps = deliveryProps;
+        _deliveryTag = deliveryTag;
+        _readableProperties = (_messageProps != null);
 
+        AMQDestination dest;
 
-        return type;
+        dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+                                   new AMQShortString(_deliveryProps.getRoutingKey()));
+        setJMSDestination(dest);        
     }
 
-
+    /**
+     * Use the 0-10 ExchangeQuery call to validate the exchange type.
+     *
+     * This is used primarily to provide the correct JMSDestination value.
+     *
+     * The query is performed synchronously iff the map exchange is not already
+     * present in the exchange Map.
+     *
+     * @param header The message headers, from which the exchange name can be extracted
+     * @param session The 0-10 session to use to call ExchangeQuery
+     */
     public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session
session)
     {
         DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
-        if(deliveryProps != null)
+        if (deliveryProps != null)
         {
             String exchange = deliveryProps.getExchange();
 
-            if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange))
+            if (exchange != null && !exchangeMapContains(exchange))
             {
-
-                AMQShortString exchangeShortString = new AMQShortString(exchange);
                 Future<ExchangeQueryResult> future =
-                                session.exchangeQuery(exchange.toString());
+                        session.exchangeQuery(exchange.toString());
                 ExchangeQueryResult res = future.get();
 
-                Integer type = _exchangeTypeToDestinationType.get(res.getType());
-                if(type == null)
-                {
-                    type = AMQDestination.UNKNOWN_TYPE;
-                }
-                _exchangeTypeStringMap.put(exchange, type);
-                _exchangeTypeMap.put(exchangeShortString, type);
-
+                updateExchangeType(exchange, res.getType());
             }
         }
     }
 
-    protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties
deliveryProps, long deliveryTag)
-    {
-        _messageProps = messageProps;
-        _deliveryProps = deliveryProps;
-        _deliveryTag = deliveryTag;
-        _readableProperties = (_messageProps != null);
-
-        AMQDestination dest;
-
-        dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
-                                   new AMQShortString(_deliveryProps.getRoutingKey()));
-        setJMSDestination(dest);        
-    }
-
 
     public String getJMSMessageID() throws JMSException
     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
Mon Dec  7 16:47:53 2009
@@ -46,7 +46,7 @@
 import java.util.UUID;
 import java.net.URISyntaxException;
 
-public class AMQMessageDelegate_0_8 implements AMQMessageDelegate
+public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
 
@@ -65,6 +65,16 @@
     private AMQSession _session;
     private final long _deliveryTag;
 
+    // The base set of items that needs to be set. 
+    private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
+    {
+        _contentHeaderProperties = properties;
+        _deliveryTag = deliveryTag;
+        _readableProperties = (_contentHeaderProperties != null);
+        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+    }
+
+    // Used for the creation of new messages
     protected AMQMessageDelegate_0_8()
     {
         this(new BasicContentHeaderProperties(), -1);
@@ -73,6 +83,7 @@
 
     }
 
+    // Used when generating a received message object
     protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader,
AMQShortString exchange,
                                      AMQShortString routingKey) 
     {
@@ -80,41 +91,33 @@
 
         Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
 
-        if(type == null)
+        AMQDestination dest = null;
+
+        // If we have a type set the attempt to use that.
+        if (type != null)
         {
-            type = AMQDestination.UNKNOWN_TYPE;
+            switch (type.intValue())
+            {
+                case AMQDestination.QUEUE_TYPE:
+                    dest = new AMQQueue(exchange, routingKey, routingKey);
+                    break;
+                case AMQDestination.TOPIC_TYPE:
+                    dest = new AMQTopic(exchange, routingKey, null);
+                    break;
+                default:
+                    // Use the generateDestination method
+                    dest = null;
+            }
         }
 
-        AMQDestination dest;
-
-        switch(type.intValue())
+        if (dest == null)
         {
-            case AMQDestination.QUEUE_TYPE:
-                dest = new AMQQueue(exchange, routingKey, routingKey);
-                break;
-            case  AMQDestination.TOPIC_TYPE:
-                dest = new AMQTopic(exchange, routingKey, null);
-                break;
-            default:
-                dest = new AMQUndefinedDestination(exchange, routingKey, null);
+            dest = generateDestination(exchange, routingKey);
         }
-        
 
-
-        // Destination dest = AMQDestination.createDestination(url);
         setJMSDestination(dest);
-
-
-
     }
 
-    protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
-    {
-        _contentHeaderProperties = properties;
-        _deliveryTag = deliveryTag;
-        _readableProperties = (_contentHeaderProperties != null);
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
-    }
 
 
     public String getJMSMessageID() throws JMSException

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=887994&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
(added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
Mon Dec  7 16:47:53 2009
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This abstract class provides exchange lookup functionality that is shared
+ * between all MessageDelegates. Update facilities are provided so that the 0-10
+ * code base can update the mappings. The 0-8 code base does not have the
+ * facility to update the exchange map so it can only use the default mappings.
+ *
+ * That said any updates that a 0-10 client performs will also benefit any 0-8
+ * connections in this VM.
+ *
+ */
+public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
+{
+
+    private static Map<AMQShortString, Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString,
Integer>();
+    private static Map<String, Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String,
Integer>();
+    private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String,
Integer>();
+
+    /**
+     * Add default Mappings for the Direct, Default, Topic and Fanout exchanges.
+     */
+    static
+    {
+        _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
+        _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
+        _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+        _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+
+        _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
+        _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
+        _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
AMQDestination.QUEUE_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+    }
+
+    /**
+     * Called when a Destination is requried.
+     *
+     * This will create the AMQDestination that is the correct type and value
+     * based on the incomming values.
+     * @param exchange The exchange name
+     * @param routingKey The routing key to be used for the Destination
+     * @return AMQDestination of the correct subtype
+     */
+    protected AMQDestination generateDestination(AMQShortString exchange, AMQShortString
routingKey)
+    {
+        AMQDestination dest;
+        switch (getExchangeType(exchange))
+        {
+            case AMQDestination.QUEUE_TYPE:
+                dest = new AMQQueue(exchange, routingKey, routingKey);
+                break;
+            case AMQDestination.TOPIC_TYPE:
+                dest = new AMQTopic(exchange, routingKey, null);
+                break;
+            default:
+                dest = new AMQUndefinedDestination(exchange, routingKey, null);
+        }
+
+        return dest;
+    }
+
+    /**
+     * Update the exchange name to type mapping.
+     *
+     * If the newType is not known then an UNKNOWN_TYPE is created. Only if the
+     * exchange is of a known type: amq.direct, amq.topic, amq.fanout can we
+     * create a suitable AMQDestination representation
+     *
+     * @param exchange the name of the exchange
+     * @param newtype the AMQP exchange class name i.e. amq.direct
+     */
+    protected static void updateExchangeType(String exchange, String newtype)
+    {
+        Integer type = _exchangeTypeToDestinationType.get(newtype);
+        if (type == null)
+        {
+            type = AMQDestination.UNKNOWN_TYPE;
+        }
+        _exchangeTypeStringMap.put(exchange, type);
+        _exchangeTypeMap.put(new AMQShortString(exchange), type);
+    }
+
+    /**
+     * Accessor method to allow lookups of the given exchange name.
+     *
+     * This check allows the prevention of extra work required such as asking
+     * the broker for the exchange class name.
+     *
+     * @param exchange the exchange name to lookup
+     * @return true if there is a mapping for this exchange
+     */
+    protected static boolean exchangeMapContains(String exchange)
+    {
+        return _exchangeTypeStringMap.containsKey(exchange);
+    }
+
+    /**
+     * Returns an int representing the exchange type. This is used in the
+     * createDestination method to ensure the correct AMQDestiation is created. 
+     *
+     * @param exchange the exchange name to lookup
+     * @return int representing the Exchange type
+     */
+    private int getExchangeType(AMQShortString exchange)
+    {
+        Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING
: exchange);
+
+        if (type == null)
+        {
+            return AMQDestination.UNKNOWN_TYPE;
+        }
+
+        return type;
+    }
+
+}

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
Mon Dec  7 16:47:53 2009
@@ -57,6 +57,9 @@
 
     public void setUp() throws Exception
     {
+        //Ensure JMX management is enabled for MovedToQueue test 
+        setConfigurationProperty("management.enabled", "true");
+        
         super.setUp();
 
         _connection = getConnection();

Modified: qpid/trunk/qpid/java/test-profiles/08Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/08Excludes?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/08Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/08Excludes Mon Dec  7 16:47:53 2009
@@ -18,5 +18,3 @@
 org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
 org.apache.qpid.server.queue.ModelTest#*
 
-// QPID-2242 exclude till issue has been resolved
-org.apache.qpid.test.client.message.JMSDestinationTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message