cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [2/3] cxf git commit: [CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list
Date Tue, 20 Jun 2017 06:48:17 GMT
[CXF-7414] More updates to resume the "next" message instead of whichever was first in teh
continuation list


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/b8093370
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/b8093370
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/b8093370

Branch: refs/heads/3.1.x-fixes
Commit: b80933704fdd12e9e7c872f9ffbae2d554b2b18d
Parents: 0a6ff4c
Author: Daniel Kulp <dkulp@apache.org>
Authored: Mon Jun 19 11:49:03 2017 -0400
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/DestinationSequence.java   | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/b8093370/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 87c7ecb..7ac5128 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -24,10 +24,11 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -67,7 +68,8 @@ public class DestinationSequence extends AbstractSequence {
     private volatile long inProcessNumber;
     private volatile long highNumberCompleted;
     private long nextInOrder;
-    private List<Continuation> continuations = new LinkedList<Continuation>();
+    //be careful, must be used in sync block
+    private Map<Long, Continuation> continuations = new TreeMap<Long, Continuation>();
     // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd
messages
     private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
@@ -383,7 +385,7 @@ public class DestinationSequence extends AbstractSequence {
             if (continuation != null) {
                 continuation.setObject(message);
                 if (continuation.suspend(-1)) {
-                    continuations.add(continuation);
+                    continuations.put(mn, continuation);
                     throw new SuspendedInvocationException();
                 }
             }
@@ -396,17 +398,28 @@ public class DestinationSequence extends AbstractSequence {
             }
         }
     }
-    synchronized void wakeupAll() {
-        if (!continuations.isEmpty()) {
-            continuations.remove(0).resume();
+    synchronized void wakeupNext(long i) {
+        try {
+            Continuation c = continuations.remove(i + 1);
+            if (c != null) {
+                //next was found, don't resume everything, just the next one
+                c.resume();
+                return;
+            }
+            //next wasn't found, just resume whatever is first...
+            for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
+                entry.getValue().resume();
+                return;
+            }
+        } finally {
+            notifyAll();
         }
-        notifyAll();
     }
     
     synchronized void processingComplete(long mn) {
         inProcessNumber = 0;
         highNumberCompleted = mn;
-        wakeupAll();
+        wakeupNext(mn);
     }
     
     void purgeAcknowledged(long messageNr) {


Mime
View raw message