activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r743521 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq: flow/FlowController.java queue/SharedQueue.java
Date Wed, 11 Feb 2009 21:44:12 GMT
Author: chirino
Date: Wed Feb 11 21:44:12 2009
New Revision: 743521

URL: http://svn.apache.org/viewvc?rev=743521&view=rev
Log:
Fixing the 10_1_1 p2p case

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743521&r1=743520&r2=743521&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Wed Feb 11 21:44:12 2009
@@ -225,14 +225,14 @@
             if (okToAdd(elem)) {
                 ok = true;
                 if (limiter.add(elem)) {
-                    blockSource(sourceController);
                     setUnThrottleListener();
+                    blockSource(sourceController);
                 }
             } else {
                 // Add to overflow queue and block source:
                 overflowQueue.add(elem);
-                blockSource(sourceController);
                 setUnThrottleListener();
+                blockSource(sourceController);
             }
         }
         if (ok) {
@@ -315,8 +315,7 @@
         waitForResume();
 
         if (!blockedSources.contains(source)) {
-            // System.out.println("BLOCKING  : SINK["+this + "], SOURCE[" +
-            // source+"]");
+//            System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" + source + "]");
             blockedSources.add(source);
             source.onFlowBlock(this);
         }
@@ -341,7 +340,7 @@
         // If we've exceeded the the throttle threshold, register
         // a listener so we can resume the blocked sources after
         // the limiter falls below the threshold:
-        if (!overflowQueue.isEmpty()) {
+        if (!overflowQueue.isEmpty() || limiter.getThrottled()) {
             setUnThrottleListener();
         } else if (notifyUnblock) {
             mutex.notifyAll();
@@ -397,8 +396,7 @@
                     try {
                         Thread.currentThread().setName(name);
                         for (ISourceController<E> source : blockedSources) {
-                            // System.out.println("UNBLOCKING: SINK["+FlowController.this
-                            // + "], SOURCE[" + source+"]");
+//                            System.out.println("UNBLOCKING: SINK[" + FlowController.this
+ "], SOURCE[" + source + "]");
                             source.onFlowResume(FlowController.this);
                         }
                         for (FlowUnblockListener<E> listener : unblockListeners) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=743521&r1=743520&r2=743521&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
Wed Feb 11 21:44:12 2009
@@ -75,14 +75,15 @@
                     boolean notify = false;
                     if (node.cursor == null) {
                         readyDirectSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready direct
-> ready direct: "+node);
+//                         System.out.println("Subscription state change: un-ready direct
-> ready direct: "+node);
                     } else {
                         if (readyPollingSubs.isEmpty()) {
                             notify = !store.isEmpty();
                         }
                         readyPollingSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready polling
-> ready polling: "+node);
+//                         System.out.println("Subscription state change: un-ready polling
-> ready polling: "+node);
                     }
+                    
                     if (notify) {
                         notifyReady();
                     }
@@ -169,7 +170,7 @@
                         sub.resumeAt(node);
                         unreadyPollingSubs.addLast(sub);
                         matchCount++;
-                        // System.out.println("Subscription state change: un-ready direct
-> un-ready polling: "+sub);
+//                         System.out.println("Subscription state change: un-ready direct
-> un-ready polling: "+sub);
                     }
                     sub = next;
                 }
@@ -181,7 +182,7 @@
                     subNode.unlink();
                     subNode.resumeAt(node);
                     unreadyPollingSubs.addLast(subNode);
-                    // System.out.println("Subscription state change: ready direct ->
un-ready polling: "+subNode);
+//                     System.out.println("Subscription state change: ready direct ->
un-ready polling: "+subNode);
                 }
                 matchCount += matches.size();
 
@@ -236,6 +237,8 @@
 
     public boolean pollingDispatch() {
 
+//        System.out.println("polling dispatch");
+        
         // Keep looping until we can find one subscription that we can
         // dispatch a message to.
         while (true) {
@@ -261,7 +264,7 @@
                     } else {
                         // Cursor dried up... this subscriber can now be direct
                         // dispatched.
-                        // System.out.println("Subscription state change: ready polling ->
ready direct: "+subNode);
+//                        System.out.println("Subscription state change: ready polling ->
ready direct: "+subNode);
                         subNode.unlink();
                         readyDirectSubs.addLast(subNode);
                     }
@@ -291,7 +294,7 @@
                     }
                     return true;
                 } else {
-                    // System.out.println("Subscription state change: ready polling ->
un-ready polling: "+subNode);
+//                     System.out.println("Subscription state change: ready polling ->
un-ready polling: "+subNode);
                     // Subscription is no longer ready..
                     subNode.cursorUnPeek(storeNode);
                     subNode.unlink();



Mime
View raw message