activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1043076 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
Date Tue, 07 Dec 2010 15:00:32 GMT
Author: gtully
Date: Tue Dec  7 15:00:32 2010
New Revision: 1043076

URL: http://svn.apache.org/viewvc?rev=1043076&view=rev
Log:
resolve https://issues.apache.org/jira/browse/AMQ-3071 and add test

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java?rev=1043076&r1=1043075&r2=1043076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java
Tue Dec  7 15:00:32 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.activemq.broker.region.MessageReference;
@@ -47,10 +48,11 @@ public class PriorityNetworkDispatchPoli
                 ConsumerInfo info = sub.getConsumerInfo();
                 if (info.isNetworkSubscription()) {    
                     boolean highestPrioritySub = true;
-                    for (Subscription candidate: duplicateFreeSubs) {
+                    for (Iterator<Subscription> it =  duplicateFreeSubs.iterator();
it.hasNext(); ) {
+                        Subscription candidate = it.next();
                         if (matches(candidate, info)) {
                             if (hasLowerPriority(candidate, info)) {
-                                duplicateFreeSubs.remove(candidate);
+                                it.remove();
                             } else {
                                 // higher priority matching sub exists
                                 highestPrioritySub = false;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java?rev=1043076&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
Tue Dec  7 15:00:32 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class PriorityNetworkDispatchPolicyTest {
+
+    PriorityNetworkDispatchPolicy underTest = new PriorityNetworkDispatchPolicy();
+    SystemUsage usageManager = new SystemUsage();
+    ConsumerInfo info = new ConsumerInfo();
+    ActiveMQMessage node = new ActiveMQMessage();
+    ConsumerId id = new ConsumerId();
+    ConnectionContext context = new ConnectionContext();
+
+    @Before
+    public void init() throws Exception {
+        info.setDestination(ActiveMQDestination.createDestination("test", ActiveMQDestination.TOPIC_TYPE));
+        info.setConsumerId(id);
+        info.setNetworkSubscription(true);
+        info.setNetworkConsumerPath(new ConsumerId[]{id});
+    }
+
+    @Test
+    public void testRemoveLowerPriorityDup() throws Exception {
+        List<Subscription> consumers = new ArrayList<Subscription>();
+
+        for (int i=0; i<3; i++) {
+            ConsumerInfo instance = info.copy();
+            instance.setPriority((byte)i);
+            consumers.add(new TopicSubscription(null, context, instance, usageManager));
+        }
+        underTest.dispatch(node, null, consumers);
+
+        long count = 0;
+        for (Subscription consumer : consumers) {
+            count += consumer.getEnqueueCounter();
+        }
+        assertEquals("only one sub got message", 1, count);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message