qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [4/8] qpid-jms git commit: add support for closing all the links opened on the last session
Date Fri, 06 Mar 2015 16:35:46 GMT
add support for closing all the links opened on the last session


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/df219312
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/df219312
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/df219312

Branch: refs/heads/master
Commit: df219312638023ca078d9054db23113a82007dd6
Parents: d509154
Author: Robert Gemmell <robbie@apache.org>
Authored: Fri Mar 6 15:24:16 2015 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Fri Mar 6 15:24:16 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 68 +++++++++++++++++++-
 1 file changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df219312/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 90a9b40..8127a8f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -110,6 +111,7 @@ public class TestAmqpPeer implements AutoCloseable
     private byte[] _deferredBytes;
     private int _lastInitiatedChannel = -1;
     private UnsignedInteger _lastInitiatedLinkHandle = null;
+    private Map<Integer, List<UnsignedInteger>> _channelToLinkMap = new ConcurrentHashMap<Integer,
List<UnsignedInteger>>();
 
     public TestAmqpPeer() throws IOException
     {
@@ -547,8 +549,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -560,6 +563,7 @@ public class TestAmqpPeer implements AutoCloseable
                 attachResponse.setTarget(t);
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -625,8 +629,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
@@ -637,6 +642,7 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -720,8 +726,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setTarget(attachMatcher.getReceivedTarget());
@@ -732,6 +739,7 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -1224,6 +1232,50 @@ public class TestAmqpPeer implements AutoCloseable
         }
     }
 
+    /**
+     * All links and sessions must have been created before calling this method, unlike
+     * {@link #remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean, boolean)}
+     */
+    public void remotelyDetachLinksOnLastOpenedSession(boolean expectDetachResponse, boolean
closed) {
+        synchronized (_handlersLock) {
+            CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+            int channel = _lastInitiatedChannel;
+            List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+            if(links == null || links.isEmpty())
+            {
+                throw new IllegalStateException("No links found for channel: " + channel);
+            }
+
+            for (UnsignedInteger linkHandle : links)
+            {
+                // Now generate the Detach for the appropriate link on the appropriate session
+                final DetachFrame detachFrame = new DetachFrame();
+                detachFrame.setClosed(closed);
+                detachFrame.setHandle(linkHandle);
+                // TODO: add an optional error msg+condition?
+
+                final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel,
detachFrame, null);
+                comp.add(frameSender);
+
+                if (expectDetachResponse) {
+                    Matcher<Boolean> closeMatcher = null;
+                    if (closed) {
+                        closeMatcher = equalTo(true);
+                    } else {
+                        closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
+                    }
+
+                    // Expect a response to our Detach.
+                    final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
+                    detachMatcher.withHandle(equalTo(linkHandle));
+                    // TODO: enable matching on the channel number of the response.
+                    addHandler(detachMatcher);
+                }
+            }
+        }
+    }
+
     private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
         CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
         Handler h = getLastHandler();
@@ -1234,4 +1286,14 @@ public class TestAmqpPeer implements AutoCloseable
         h.onSuccess(comp);
         return comp;
     }
+
+    private void recordLinkCreation(int channel, UnsignedInteger handle) {
+        List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+        if(links == null) {
+            links = new ArrayList<UnsignedInteger>();
+            _channelToLinkMap.put(channel, links);
+        }
+
+        links.add(handle);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message