qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1226930 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/test/java/org/apache/qpid/server/queue/ client/src/main/java/org/apache/qpid/client...
Date Tue, 03 Jan 2012 19:48:47 GMT
Author: rgodfrey
Date: Tue Jan  3 19:48:46 2012
New Revision: 1226930

URL: http://svn.apache.org/viewvc?rev=1226930&view=rev
Log:
QPID-3720 : [Java Broker] Implement Message Grouping

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
      - copied, changed from r1226752, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jan  3 19:48:46 2012
@@ -149,7 +149,13 @@ public interface AMQQueue extends Managa
 
     void removeMessagesFromQueue(long fromMessageId, long toMessageId);
 
-
+    static interface Visitor
+    {
+        boolean visit(QueueEntry entry);
+    }
+    
+    void visit(Visitor visitor);
+    
 
     long getMaximumMessageSize();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Jan  3 19:48:46 2012
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comp
         {
             return State.AVAILABLE;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comp
         {
             return State.DEQUEUED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comp
         {
             return State.DELETED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
     public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comp
         {
             return State.EXPIRED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comp
         {
             return State.ACQUIRED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
     public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comp
         {
             return _subscription;
         }
+
+        public String toString()
+        {
+            return "{" + getState().name() + " : " + _subscription +"}";
+        }
     }
 
     public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comp
         {
             return _subscription;
         }
+
+
+        public String toString()
+        {
+            return "{" + getState().name() + " : " + _subscription +"}";
+        }
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Jan  3 19:48:46 2012
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extend
     Q getHead();
 
     void entryDeleted(Q queueEntry);
+    
+    int getPriorities();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jan  3 19:48:46 2012
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityExcept
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.pool.ReadWriteRunnable;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.QueueConfigType;
 import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -45,6 +43,7 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.MessageGroupManager;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -68,11 +67,11 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+    private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
 
 
     private final VirtualHost _virtualHost;
@@ -189,6 +188,7 @@ public class SimpleAMQQueue implements A
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+    private final MessageGroupManager _messageGroupManager;
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
@@ -242,25 +242,15 @@ public class SimpleAMQQueue implements A
         _logSubject = new QueueLogSubject(this);
         _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
 
-        // Log the correct creation message
-
-        // Extract the number of priorities for this Queue.
-        // Leave it as 0 if we are a SimpleQueueEntryList
-        int priorities = 0;
-        if (entryListFactory instanceof PriorityQueueList.Factory)
-        {
-            priorities = ((PriorityQueueList)_entries).getPriorities();
-        }
-
         // Log the creation of this Queue.
         // The priorities display is toggled on if we set priorities > 0
         CurrentActor.get().message(_logSubject,
                                    QueueMessages.CREATED(String.valueOf(_owner),
-                                                          priorities,
-                                                          _owner != null,
-                                                          autoDelete,
-                                                          durable, !durable,
-                                                          priorities > 0));
+                                                         _entries.getPriorities(),
+                                                         _owner != null,
+                                                         autoDelete,
+                                                         durable, !durable,
+                                                         _entries.getPriorities() > 0));
 
         getConfigStore().addConfiguredObject(this);
 
@@ -274,6 +264,15 @@ public class SimpleAMQQueue implements A
             _logger.error("AMQQueue MBean creation has failed ", e);
         }
 
+        if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+        {
+            _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+        }
+        else
+        {
+            _messageGroupManager = null;
+        }
+
         resetNotifications();
 
     }
@@ -488,6 +487,32 @@ public class SimpleAMQQueue implements A
             setExclusiveSubscriber(null);
             subscription.setQueueContext(null);
 
+            if(_messageGroupManager != null)
+            {
+                QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+                _messageGroupManager.clearAssignments(subscription);
+                
+                if(entry != null)
+                {
+                    SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+                    // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+                    while (subscriberIter.advance())
+                    {
+                        Subscription sub = subscriberIter.getNode().getSubscription();
+
+                        // we don't make browsers send the same stuff twice
+                        if (sub.seesRequeues())
+                        {
+                            updateSubRequeueEntry(sub, entry);
+                        }
+                    }
+
+                    deliverAsync();
+
+                }
+                
+            }
+
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0  )
@@ -691,21 +716,20 @@ public class SimpleAMQQueue implements A
         {
             try
             {
-                if (subscriptionReadyAndHasInterest(sub, entry)
-                    && !sub.isSuspended())
+                if (!sub.isSuspended() 
+                    && subscriptionReadyAndHasInterest(sub, entry) 
+                    && mightAssign(sub, entry)
+                    && !sub.wouldSuspend(entry))
                 {
-                    if (!sub.wouldSuspend(entry))
+                    if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
                     {
-                        if (sub.acquires() && !entry.acquire(sub))
-                        {
-                            // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this subscription
-                            sub.restoreCredit(entry);
-                        }
-                        else
-                        {
-                            deliverMessage(sub, entry, false);
-                        }
+                        // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+                        // to acquire the entry for this subscription
+                        sub.restoreCredit(entry);
+                    }
+                    else
+                    {
+                        deliverMessage(sub, entry, false);
                     }
                 }
             }
@@ -716,6 +740,20 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    private boolean assign(final Subscription sub, final QueueEntry entry)
+    {
+        return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+    }
+
+
+    private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+    {
+        if(_messageGroupManager == null || !sub.acquires())
+            return true;
+        Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+        return (assigned == null) || (assigned == sub);
+    }
+
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
         // This method is only required for queues which mess with ordering
@@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements A
         public boolean filterComplete();
     }
 
+
+
     public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
         return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public void visit(final Visitor visitor)
+    {
+        QueueEntryIterator queueListIterator = _entries.iterator();
+
+        while(queueListIterator.advance())
+        {
+            QueueEntry node = queueListIterator.getNode();
+
+            if(!node.isDispensed())
+            {
+                if(visitor.visit(node))
+                {
+                    break;
+                }
+            }
+        }
+    }
+
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
      *
@@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements A
 
             if (node != null && node.isAvailable())
             {
-                if (sub.hasInterest(node))
+                if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (sub.acquires() && !node.acquire(sub))
+                        if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
                         {
                             // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                             // to acquire the entry for this subscription
@@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+                                    !mightAssign(sub,node)))
             {
                 if (expired)
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Jan  3 19:48:46 2012
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implem
         advanceHead();
     }
 
+    public int getPriorities()
+    {
+        return 0;
+    }
+
     static class Factory implements QueueEntryListFactory
     {
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Tue Jan  3 19:48:46 2012
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implem
         _propertyName = propertyName;
     }
 
-    @Override
     public AMQQueue getQueue()
     {
         return _queue;
     }
 
-    @Override
     public SortedQueueEntryImpl add(final ServerMessage message)
     {
         synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implem
         return (node == null ? Colour.BLACK : node.getColour()) == colour;
     }
 
-    @Override
     public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
     {
         synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implem
         }
     }
 
-    @Override
     public QueueEntryIterator<SortedQueueEntryImpl> iterator()
     {
         return new QueueEntryIteratorImpl(_head);
     }
 
-    @Override
     public SortedQueueEntryImpl getHead()
     {
         return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implem
         return _root;
     }
 
-    @Override
     public void entryDeleted(final SortedQueueEntryImpl entry)
     {
         synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implem
         }
     }
 
+    public int getPriorities()
+    {
+        return 0;
+    }
+
     /**
      * Swaps the position of the node in the tree with it's successor
      * (that is the node with the next highest key)

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java?rev=1226930&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java Tue Jan  3 19:48:46 2012
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MessageGroupManager
+{
+    private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class);
+
+
+    private final String _groupId;
+    private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
+    private final int _groupMask;
+
+    public MessageGroupManager(final String groupId, final int maxGroups)
+    {
+        _groupId = groupId;
+        _groupMask = pow2(maxGroups)-1;
+    }
+
+    private static int pow2(final int i)
+    {
+        int val = 1;
+        while(val < i) val<<=1;
+        return val;
+    }
+
+    public Subscription getAssignedSubscription(final QueueEntry entry)
+    {
+        Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+        return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+    }
+
+    public boolean acceptMessage(Subscription sub, QueueEntry entry)
+    {
+        Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+        if(groupVal == null)
+        {
+            return true;
+        }
+        else
+        {
+            Integer group = groupVal.hashCode() & _groupMask;
+            Subscription assignedSub = _groupMap.get(group);
+            if(assignedSub == sub)
+            {
+                return true;
+            }
+            else
+            {
+                if(assignedSub == null)
+                {
+                    if(_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Assigning group " + groupVal + " to sub " + sub);
+                    }
+                    assignedSub = _groupMap.putIfAbsent(group, sub);
+                    return assignedSub == null || assignedSub == sub;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+    }
+    
+    public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+    {
+        EntryFinder visitor = new EntryFinder(sub);
+        sub.getQueue().visit(visitor);
+        return visitor.getEntry();
+    }
+
+    private class EntryFinder implements AMQQueue.Visitor
+    {
+        private QueueEntry _entry;
+        private Subscription _sub;
+
+        public EntryFinder(final Subscription sub)
+        {
+            _sub = sub;
+        }
+
+        public boolean visit(final QueueEntry entry)
+        {
+            if(!entry.isAvailable())
+                return false;
+
+            Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+            if(groupId == null)
+                return false;
+
+            Integer group = groupId.hashCode() & _groupMask;
+            Subscription assignedSub = _groupMap.get(group);
+            if(assignedSub == _sub)
+            {
+                _entry = entry;
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public QueueEntry getEntry()
+        {
+            return _entry;
+        }
+    }
+
+    public void clearAssignments(Subscription sub)
+    {
+        Iterator<Subscription> subIter = _groupMap.values().iterator();
+        while(subIter.hasNext())
+        {
+            if(subIter.next() == sub)
+            {
+                subIter.remove();
+            }
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Jan  3 19:48:46 2012
@@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQ
 
     }
 
-    @Override
     public int getMaximumDeliveryCount()
     {
         return 0;
     }
 
-    @Override
     public void setMaximumDeliveryCount(int maximumDeliveryCount)
     {
     }
 
-    @Override
     public void setAlternateExchange(String exchangeName)
     {
     }
 
+    public void visit(final Visitor visitor)
+    {
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan  3 19:48:46 2012
@@ -1583,6 +1583,11 @@ public abstract class AMQSession<C exten
         return _prefetchLowMark;
     }
 
+    public int getPrefetch()
+    {
+        return _prefetchHighMark;
+    }
+
     public AMQShortString getDefaultQueueExchangeName()
     {
         return _connection.getDefaultQueueExchangeName();
@@ -3047,7 +3052,7 @@ public abstract class AMQSession<C exten
      */
     public boolean prefetch()
     {
-        return getAMQConnection().getMaxPrefetch() > 0;
+        return _prefetchHighMark > 0;
     }
 
     /** Signifies that the session has pending sends to commit. */

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Jan  3 19:48:46 2012
@@ -545,7 +545,7 @@ public class BasicMessageConsumer_0_10 e
         }
         else if (getSession().prefetch())
         {
-            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+            capacity = getSession().getPrefetch();
         }
         return capacity;
     }

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Tue Jan  3 19:48:46 2012
@@ -588,7 +588,7 @@ public class AMQSession_0_10Test extends
         }
         boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false;
         AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode,
-                 1, 1, "test");
+                 0, 0, "test");
         return session;
     }
 
@@ -600,7 +600,6 @@ public class AMQSession_0_10Test extends
         connection.setSessionFactory(new SessionFactory()
         {
 
-            @Override
             public Session newSession(Connection conn, Binary name, long expiry)
             {
                 return new MockSession(conn, new SessionDelegate(), name, expiry, throwException);
@@ -611,7 +610,6 @@ public class AMQSession_0_10Test extends
 
     private final class MockMessageListener implements MessageListener
     {
-        @Override
         public void onMessage(Message arg0)
         {
         }
@@ -710,23 +708,19 @@ public class AMQSession_0_10Test extends
     {
         private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
 
-        @Override
         public void setIdleTimeout(int i)
         {
         }
 
-        @Override
         public void send(ProtocolEvent msg)
         {
             _sendEvents.add(msg);
         }
 
-        @Override
         public void flush()
         {
         }
 
-        @Override
         public void close()
         {
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Tue Jan  3 19:48:46 2012
@@ -166,6 +166,11 @@ public abstract class AMQTypedValue
 
     private static final class IntTypedValue extends AMQTypedValue
     {
+        @Override
+        public String toString()
+        {
+            return "[INT: " + String.valueOf(_value) + "]";
+        }
 
         private final int _value;
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1226930&r1=1226929&r2=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Jan  3 19:48:46 2012
@@ -44,6 +44,7 @@ import static org.apache.qpid.util.Seria
 import static org.apache.qpid.util.Strings.toUTF8;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -828,8 +829,17 @@ public class Session extends SessionInvo
             Waiter w = new Waiter(commands, timeout);
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
-                checkFailoverRequired("Session sync was interrupted by failover.");
-                log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands));
+                checkFailoverRequired("Session sync was interrupted by failover.");                               
+                if(log.isDebugEnabled())
+                {
+                    List<Method> waitingFor =
+                            Arrays.asList(commands)
+                                  .subList(mod(maxComplete,commands.length),
+                                           mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length)
+                                             ? commands.length-1
+                                             : mod(commandsOut-1, commands.length));
+                    log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor);
+                }
                 w.await();
             }
 

Copied: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (from r1226752, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?p2=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java&r1=1226752&r2=1226930&rev=1226930&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Tue Jan  3 19:48:46 2012
@@ -20,8 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -30,17 +35,14 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.NamingException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.HashMap;
+import java.util.Map;
 
-public class PriorityQueueTest extends QpidBrokerTestCase
+public class MessageGroupQueueTest extends QpidBrokerTestCase
 {
     private static final int TIMEOUT = 1500;
 
-    protected final String QUEUE = "PriorityQueue";
+    protected final String QUEUE = "MessageGroupQueue";
 
     private static final int MSG_COUNT = 50;
 
@@ -49,10 +51,8 @@ public class PriorityQueueTest extends Q
     private Session producerSession;
     private Queue queue;
     private Connection consumerConnection;
-    private Session consumerSession;
-
-    private MessageConsumer consumer;
-
+    
+    
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -63,8 +63,7 @@ public class PriorityQueueTest extends Q
         producerConnection.start();
 
         consumerConnection = getConnection();
-        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+        
     }
 
     protected void tearDown() throws Exception
@@ -74,125 +73,275 @@ public class PriorityQueueTest extends Q
         super.tearDown();
     }
 
-    public void testPriority() throws JMSException, NamingException, AMQException
+    /**
+     * Pre populate the queue with messages with groups as follows
+     * 
+     *  ONE
+     *  TWO
+     *  ONE
+     *  TWO
+     *  
+     *  Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
+     *  consumer assigned group TWO if they are started in sequence.
+     *  
+     *  Thus doing
+     *  
+     *  c1 <--- (ONE)
+     *  c2 <--- (TWO)
+     *  c2 ack --->
+     *  
+     *  c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
+     *  
+     *  i.e.
+     *  
+     *  c2 <--- (TWO)
+     *  c2 ack --->
+     *  c1 <--- (ONE)
+     *  c1 ack --->
+     *  
+     */
+    public void testSimpleGroupAssignment() throws Exception
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-priorities",10);
+        arguments.put("qpid.group_header_key","group");
+        arguments.put("qpid.shared_msg_group","1");
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
         queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
 
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
-        for (int msg = 0; msg < MSG_COUNT; msg++)
+        String[] groups = { "ONE", "TWO"};
+        
+        for (int msg = 0; msg < 4; msg++)
         {
-            producer.setPriority(msg % 10);
-            producer.send(nextMessage(msg, false, producerSession, producer));
+            producer.send(createMessage(msg, groups[msg % groups.length]));
         }
         producerSession.commit();
         producer.close();
         producerSession.close();
         producerConnection.close();
 
-        consumer = consumerSession.createConsumer(queue);
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+        
+        MessageConsumer consumer1 = cs1.createConsumer(queue);
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+
         consumerConnection.start();
-        Message received;
-        int receivedCount = 0;
-        Message previous = null;
-        int messageCount = 0;
-        while((received = consumer.receive(1000))!=null)
-        {
-            messageCount++;
-            if(previous != null)
-            {
-                assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
-            }
+        Message cs1Received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received first message", cs1Received);
 
-            previous = received;
-            receivedCount++;
-        }
+        Message cs2Received = consumer2.receive(1000);
 
-        assertEquals("Incorrect number of message received", 50, receivedCount);
+        assertNotNull("Consumer 2 should have received first message", cs2Received);
+
+        cs1Received.acknowledge();
+        cs2Received.acknowledge();
+
+        Message cs2Received2 = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received second message", cs2Received2);
+        assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
+                     cs2Received.getStringProperty("group"));
+
+        Message cs1Received2 = consumer1.receive(1000);
+
+        assertNotNull("Consumer 1 should have received second message", cs1Received2);
+        assertEquals("Differing groups", cs1Received2.getStringProperty("group"),
+                     cs1Received.getStringProperty("group"));
+
+        cs1Received2.acknowledge();
+        cs2Received2.acknowledge();
+        
+        assertNull(consumer1.receive(1000));
+        assertNull(consumer2.receive(1000));
     }
 
-    public void testOddOrdering() throws AMQException, JMSException
+    /**
+     * 
+     * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
+     * consumer.
+     * 
+     * Pre-populate the queue as ONE, ONE, TWO, ONE
+     * 
+     * create in sequence two consumers
+     * 
+     * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+     * 
+     * Then close c1 before acking.
+     * 
+     * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
+     * requires c2 to go "backwards" in the queue).
+     * 
+     * */
+    public void testConsumerCloseGroupAssignment() throws Exception
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-priorities",3);
+        arguments.put("qpid.group_header_key","group");
+        arguments.put("qpid.shared_msg_group","1");
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
 
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
-        // In order ABC
-        producer.setPriority(9);
-        producer.send(nextMessage(1, false, producerSession, producer));
-        producer.setPriority(4);
-        producer.send(nextMessage(2, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(3, false, producerSession, producer));
-
-        // Out of order BAC
-        producer.setPriority(4);
-        producer.send(nextMessage(4, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(5, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(6, false, producerSession, producer));
-
-        // Out of order BCA
-        producer.setPriority(4);
-        producer.send(nextMessage(7, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(8, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(9, false, producerSession, producer));
-
-        // Reverse order CBA
-        producer.setPriority(1);
-        producer.send(nextMessage(10, false, producerSession, producer));
-        producer.setPriority(4);
-        producer.send(nextMessage(11, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(12, false, producerSession, producer));
+        producer.send(createMessage(1, "ONE"));
+        producer.send(createMessage(2, "ONE"));
+        producer.send(createMessage(3, "TWO"));
+        producer.send(createMessage(4, "ONE"));
+
         producerSession.commit();
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+        MessageConsumer consumer1 = cs1.createConsumer(queue);
 
-        consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+        Message cs1Received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+        Message cs2Received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received first message", cs2Received);
+        cs2Received.acknowledge();
+
+        Message cs2Received2 = consumer2.receive(1000);
+
+        assertNull("Consumer 2 should not have received second message", cs2Received2);
+
+        consumer1.close();
+
+        cs1Received.acknowledge();
+        Message cs2Received3 = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received second message", cs2Received3);
+        assertEquals("Unexpected group", cs2Received3.getStringProperty("group"),
+                     "ONE");
+
+        cs2Received3.acknowledge();
+
+
+        Message cs2Received4 = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received third message", cs2Received4);
+        assertEquals("Unexpected group", cs2Received4.getStringProperty("group"),
+                     "ONE");
+
+        cs2Received4.acknowledge();
+
+        assertNull(consumer2.receive(1000));
+    }
+
+
+    /**
+     *
+     * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
+     * toa different consumer, including messages which were previously delivered but have now been released.
+     *
+     * Pre-populate the queue as ONE, ONE, TWO, ONE
+     *
+     * create in sequence two consumers
+     *
+     * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+     *
+     * Then close c1 and its session without acking.
+     *
+     * If we now attempt to receive from c2, then the all messages in group ONE should be available (which
+     * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
+     *
+     */
+    
+    public void testConsumerCloseWithRelease() throws Exception
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("qpid.group_header_key","group");
+        arguments.put("qpid.shared_msg_group","1");
+
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+        producer.send(createMessage(1, "ONE"));
+        producer.send(createMessage(2, "ONE"));
+        producer.send(createMessage(3, "TWO"));
+        producer.send(createMessage(4, "ONE"));
+
+        producerSession.commit();
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+        MessageConsumer consumer1 = cs1.createConsumer(queue);
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+        consumerConnection.start();
+        Message cs1Received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+        Message received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received first message", received);
+        Message first = received;
+
+        received = consumer2.receive(1000);
+
+        assertNull("Consumer 2 should not have received second message", received);
+
+        consumer1.close();
+        cs1.close();
+        first.acknowledge();
+        received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received second message", received);
+        assertEquals("Unexpected group", received.getStringProperty("group"),
+                     "ONE");
+        assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
+                   received.getJMSRedelivered());
+
+        received.acknowledge();
+
+
+        received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received third message", received);
+        assertEquals("Unexpected group", received.getStringProperty("group"),
+                     "ONE");
+
+        received.acknowledge();
+
+        received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received fourth message", received);
+        assertEquals("Unexpected group", received.getStringProperty("group"),
+                     "ONE");
+
+        received.acknowledge();
+
 
-        Message msg = consumer.receive(TIMEOUT);
-        assertEquals(1, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(5, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(9, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(12, msg.getIntProperty("msg"));
-
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(2, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(4, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(7, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(11, msg.getIntProperty("msg"));
-
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(3, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(6, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(8, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(10, msg.getIntProperty("msg"));
+        assertNull(consumer2.receive(1000));
     }
 
-    private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+    
+    private Message createMessage(int msg, String group) throws JMSException
     {
         Message send = producerSession.createTextMessage("Message: " + msg);
         send.setIntProperty("msg", msg);
+        send.setStringProperty("group", group);
 
         return send;
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message