qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1643197 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/java/org/apache/qpid/server/queue/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
Date Fri, 05 Dec 2014 08:47:22 GMT
Author: kwall
Date: Fri Dec  5 08:47:22 2014
New Revision: 1643197

URL: http://svn.apache.org/viewvc?rev=1643197&view=rev
Log:
QPID-6258: [Java Broker] Remove SubFlushRunner leaving QueueRunner solely responsible for
asynchronous message delivery

Removed:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Fri Dec  5 08:47:22 2014
@@ -244,6 +244,8 @@ public abstract class AbstractQueue<X ex
     private final AtomicBoolean _recovering = new AtomicBoolean(true);
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
 
+    private final QueueRunner _queueRunner = new QueueRunner(this);
+
     protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
     {
         super(parentsMap(virtualHost), attributes);
@@ -745,7 +747,7 @@ public abstract class AbstractQueue<X ex
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
 
-        deliverAsync(consumer);
+        deliverAsync();
 
         return consumer;
 
@@ -1006,14 +1008,7 @@ public abstract class AbstractQueue<X ex
             {
                 checkConsumersNotAheadOfDelivery(entry);
 
-                if (exclusiveSub != null)
-                {
-                    deliverAsync(exclusiveSub);
-                }
-                else
-                {
-                    deliverAsync();
-                }
+                deliverAsync();
             }
 
             checkForNotification(entry.getMessage());
@@ -1490,7 +1485,7 @@ public abstract class AbstractQueue<X ex
                 _activeSubscriberCount.incrementAndGet();
 
             }
-            deliverAsync(sub);
+            deliverAsync();
         }
     }
 
@@ -1859,8 +1854,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private final QueueRunner _queueRunner = new QueueRunner(this);
-
     public void deliverAsync()
     {
         _stateChangeCount.incrementAndGet();
@@ -1869,20 +1862,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    public void deliverAsync(QueueConsumer<?> sub)
-    {
-        if(_exclusiveSubscriber == null)
-        {
-            deliverAsync();
-        }
-        else
-        {
-            SubFlushRunner flusher = sub.getRunner();
-            flusher.execute();
-        }
-
-    }
-
     void flushConsumer(QueueConsumer<?> sub)
     {
 
@@ -2100,10 +2079,7 @@ public abstract class AbstractQueue<X ex
      *
      * A queue Runner is started whenever a state change occurs, e.g when a new
      * message arrives on the queue and cannot be immediately delivered to a
-     * consumer (i.e. asynchronous delivery is required). Unless there are
-     * SubFlushRunners operating (due to consumers unsuspending) which are
-     * capable of accepting/delivering all messages then these messages would
-     * otherwise remain on the queue.
+     * consumer (i.e. asynchronous delivery is required).
      *
      * processQueue should be running while there are messages on the queue AND
      * there are consumers that can deliver them. If there are no
@@ -2412,7 +2388,7 @@ public abstract class AbstractQueue<X ex
         public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State
newState)
         {
             entry.removeStateChangeListener(this);
-            deliverAsync(_sub);
+            deliverAsync();
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
Fri Dec  5 08:47:22 2014
@@ -44,8 +44,6 @@ public interface QueueConsumer<X extends
 
     void queueDeleted();
 
-    SubFlushRunner getRunner();
-
     AMQQueue getQueue();
 
     boolean resend(QueueEntry e);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
Fri Dec  5 08:47:22 2014
@@ -86,7 +86,6 @@ class QueueConsumerImpl
     }
 
     private final ConsumerTarget _target;
-    private final SubFlushRunner _runner = new SubFlushRunner(this);
     private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
             _listener;
     private volatile QueueContext _queueContext;
@@ -210,7 +209,7 @@ class QueueConsumerImpl
     @Override
     public void externalStateChange()
     {
-        _queue.deliverAsync(this);
+        _queue.deliverAsync();
     }
 
     @Override
@@ -324,11 +323,6 @@ class QueueConsumerImpl
         return getQueue().resend(entry, this);
     }
 
-    public final SubFlushRunner getRunner()
-    {
-        return _runner;
-    }
-
     public final long getConsumerNumber()
     {
         return _consumerNumber;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Fri Dec  5 08:47:22 2014
@@ -261,7 +261,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction);
         _queue.enqueue(messageC, postEnqueueAction);
 
-        Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150);  // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase ext
 
         queueEntries.get(0).release();
 
-        Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150); // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      4,
@@ -311,7 +311,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageA, postEnqueueAction);
 
         int subFlushWaitTime = 150;
-        Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      1,
@@ -322,7 +322,7 @@ abstract class AbstractQueueTestBase ext
         Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
         queueEntries.get(0).release();
 
-        Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
 
         assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
         assertEquals("Total number of messages sent should not have changed",
@@ -360,7 +360,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, postEnqueueAction);
         _queue.enqueue(messageC, postEnqueueAction);
 
-        Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150);  // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      3,
@@ -374,7 +374,7 @@ abstract class AbstractQueueTestBase ext
         queueEntries.get(2).release();
         queueEntries.get(0).release();
 
-        Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150); // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to consumer",
                      5,
@@ -417,7 +417,7 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageA, postEnqueueAction);
         _queue.enqueue(messageB, postEnqueueAction);
 
-        Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150);  // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to both after enqueue",
                      2,
@@ -426,7 +426,7 @@ abstract class AbstractQueueTestBase ext
         /* Now release the first message only, causing it to be requeued */
         queueEntries.get(0).release();
 
-        Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+        Thread.sleep(150); // Work done by QueueRunner Thread
 
         assertEquals("Unexpected total number of messages sent to both consumers after release",
                      3,
@@ -645,88 +645,6 @@ abstract class AbstractQueueTestBase ext
         assertEquals("Message ID was wrong", msgID, 10L);
     }
 
-
-    /**
-     * processQueue() is used when asynchronously delivering messages to
-     * consumers which could not be delivered immediately during the
-     * enqueue() operation.
-     *
-     * A defect within the method would mean that delivery of these messages may
-     * not occur should the Runner stop before all messages have been processed.
-     * Such a defect was discovered when Selectors were used such that one and
-     * only one consumer can/will accept any given messages, but multiple
-     * consumers are present, and one of the earlier consumers receives
-     * more messages than the others.
-     *
-     * This test is to validate that the processQueue() method is able to
-     * correctly deliver all of the messages present for asynchronous delivery
-     * to consumers in such a scenario.
-     */
-    public void testProcessQueueWithUniqueSelectors() throws Exception
-    {
-        AbstractQueue testQueue = createNonAsyncDeliverQueue();
-        testQueue.open();
-
-        // retrieve the QueueEntryList the queue creates and insert the test
-        // messages, thus avoiding straight-through delivery attempts during
-        //enqueue() process.
-        QueueEntryList list = testQueue.getEntries();
-        assertNotNull("QueueEntryList should have been created", list);
-
-        QueueEntry msg1 = list.add(createMessage(1L));
-        QueueEntry msg2 = list.add(createMessage(2L));
-        QueueEntry msg3 = list.add(createMessage(3L));
-        QueueEntry msg4 = list.add(createMessage(4L));
-        QueueEntry msg5 = list.add(createMessage(5L));
-
-        // Create lists of the entries each consumer should be interested
-        // in.Bias over 50% of the messages to the first consumer so that
-        // the later consumers reject them and report being done before
-        // the first consumer as the processQueue method proceeds.
-        List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
-        List<String> msgListSub2 = createEntriesList(msg4);
-        List<String> msgListSub3 = createEntriesList(msg5);
-
-        MockConsumer sub1 = new MockConsumer(msgListSub1);
-        MockConsumer sub2 = new MockConsumer(msgListSub2);
-        MockConsumer sub3 = new MockConsumer(msgListSub3);
-
-        // register the consumers
-        testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
-                              EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
-        testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
-                              EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
-        testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
-                              EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
-
-        //check that no messages have been delivered to the
-        //consumers during registration
-        assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
-        assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
-        assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
-
-        // call processQueue to deliver the messages
-        testQueue.processQueue(new QueueRunner(testQueue)
-        {
-            @Override
-            public void run()
-            {
-                // we don't actually want/need this runner to do any work
-                // because we we are already doing it!
-            }
-        });
-
-        // check expected messages delivered to correct consumers
-        verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages());
-        verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages());
-        verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
-    }
-
-    private AbstractQueue createNonAsyncDeliverQueue()
-    {
-        return new NonAsyncDeliverQueue(getVirtualHost());
-    }
-
     /**
      * Tests that dequeued message is not present in the list returned form
      * {@link AbstractQueue#getMessagesOnTheQueue()}
@@ -1055,16 +973,6 @@ abstract class AbstractQueueTestBase ext
         return entry;
     }
 
-    private List<String> createEntriesList(QueueEntry... entries)
-    {
-        ArrayList<String> entriesList = new ArrayList<String>();
-        for (QueueEntry entry : entries)
-        {
-            entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
-        }
-        return entriesList;
-    }
-
     protected void verifyReceivedMessages(List<MessageInstance> expected,
                                         List<MessageInstance> delivered)
     {
@@ -1210,90 +1118,4 @@ abstract class AbstractQueueTestBase ext
         return _consumerTarget;
     }
 
-    private static class NonAsyncDeliverEntry extends OrderedQueueEntry
-    {
-
-        public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
-        {
-            super(queueEntryList);
-        }
-
-        public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
-                                    final ServerMessage message,
-                                    final long entryId)
-        {
-            super(queueEntryList, message, entryId);
-        }
-
-        public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage
message)
-        {
-            super(queueEntryList, message);
-        }
-    }
-
-    private static class NonAsyncDeliverList extends OrderedQueueEntryList
-    {
-
-        private static final HeadCreator HEAD_CREATOR =
-                new HeadCreator()
-                {
-
-                    @Override
-                    public NonAsyncDeliverEntry createHead(final QueueEntryList list)
-                    {
-                        return new NonAsyncDeliverEntry((NonAsyncDeliverList) list);
-                    }
-                };
-
-        public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
-        {
-            super(queue, HEAD_CREATOR);
-        }
-
-        @Override
-        protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message)
-        {
-            return new NonAsyncDeliverEntry(this,message);
-        }
-    }
-
-
-    private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue>
-    {
-        private QueueEntryList _entries = new NonAsyncDeliverList(this);
-
-        public NonAsyncDeliverQueue(VirtualHostImpl vhost)
-        {
-            super(attributes(), vhost);
-        }
-
-        @Override
-        protected void onOpen()
-        {
-            super.onOpen();
-        }
-
-        @Override
-        QueueEntryList getEntries()
-        {
-            return _entries;
-        }
-
-        private static Map<String,Object> attributes()
-        {
-            Map<String,Object> attributes = new HashMap<String, Object>();
-            attributes.put(Queue.ID, UUID.randomUUID());
-            attributes.put(Queue.NAME, "test");
-            attributes.put(Queue.DURABLE, false);
-            attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
-            return attributes;
-        }
-
-        @Override
-        public void deliverAsync(QueueConsumer<?> sub)
-        {
-            // do nothing, i.e prevent deliveries by the SubFlushRunner
-            // when registering the new consumers
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
Fri Dec  5 08:47:22 2014
@@ -180,14 +180,7 @@ public class StandardQueueTest extends A
         queueAttributes.put(Queue.ID, UUID.randomUUID());
         queueAttributes.put(Queue.NAME, "test");
         // create queue with overridden method deliverAsync
-        StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost())
-        {
-            @Override
-            public void deliverAsync(QueueConsumer sub)
-            {
-                // do nothing
-            }
-        };
+        StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost());
         testQueue.create();
 
         // put messages

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri Dec  5 08:47:22 2014
@@ -307,7 +307,7 @@ public class ServerConnectionDelegate ex
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
     {
         // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
-        // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before
the stop
+        // that any in-progress delivery (QueueRunner) is completed before the stop
         // completes.
         stopAllSubscriptions(conn, dtc);
         Session ssn = conn.getSession(dtc.getChannel());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message