PROTON-853: dont return the cached links if they are already in the closed state, instead create
a new object and ensure the old links also get freed. Also fixes similar behaviour as in PROTON-850.
This closes #21
(cherry picked from commit f2d7d669155a2ca57606c9381f4f1720739be79b)
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/77c1ee07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/77c1ee07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/77c1ee07
Branch: refs/heads/0.9.x
Commit: 77c1ee076ae4a4bcbec850572deb276ebd0b8c7b
Parents: 1b1c07d
Author: Robert Gemmell <robbie@apache.org>
Authored: Mon Apr 20 17:41:10 2015 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Sat Apr 25 20:24:17 2015 +0100
----------------------------------------------------------------------
.../qpid/proton/engine/impl/SessionImpl.java | 51 +++++++++++++++++++-
.../qpid/proton/engine/impl/TransportImpl.java | 6 ---
2 files changed, 50 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77c1ee07/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
index 0b3524a..45fcb70 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.proton.engine.impl;
import java.util.*;
+
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.ProtonJSession;
import org.apache.qpid.proton.engine.Session;
@@ -32,6 +33,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
private Map<String, SenderImpl> _senders = new LinkedHashMap<String, SenderImpl>();
private Map<String, ReceiverImpl> _receivers = new LinkedHashMap<String, ReceiverImpl>();
+ private List<LinkImpl> _oldLinksToFree = new ArrayList<LinkImpl>();
private TransportSession _transportSession;
private int _incomingCapacity = 1024*1024;
private int _incomingBytes = 0;
@@ -58,6 +60,17 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
sender = new SenderImpl(this, name);
_senders.put(name, sender);
}
+ else
+ {
+ if(sender.getLocalState() == EndpointState.CLOSED
+ && sender.getRemoteState() == EndpointState.CLOSED)
+ {
+ _oldLinksToFree.add(sender);
+
+ sender = new SenderImpl(this, name);
+ _senders.put(name, sender);
+ }
+ }
return sender;
}
@@ -69,6 +82,17 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
receiver = new ReceiverImpl(this, name);
_receivers.put(name, receiver);
}
+ else
+ {
+ if(receiver.getLocalState() == EndpointState.CLOSED
+ && receiver.getRemoteState() == EndpointState.CLOSED)
+ {
+ _oldLinksToFree.add(receiver);
+
+ receiver = new ReceiverImpl(this, name);
+ _receivers.put(name, receiver);
+ }
+ }
return receiver;
}
@@ -115,6 +139,11 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
receiver.free();
}
_receivers.clear();
+
+ List<LinkImpl> links = new ArrayList<LinkImpl>(_oldLinksToFree);
+ for(LinkImpl link : links) {
+ link.free();
+ }
}
void modifyEndpoints() {
@@ -145,12 +174,32 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
void freeSender(SenderImpl sender)
{
- _senders.remove(sender.getName());
+ String name = sender.getName();
+ SenderImpl existing = _senders.get(name);
+ if (sender.equals(existing))
+ {
+ _senders.remove(name);
+ }
+ else
+ {
+ _oldLinksToFree.remove(sender);
+ }
}
void freeReceiver(ReceiverImpl receiver)
{
_receivers.remove(receiver.getName());
+
+ String name = receiver.getName();
+ ReceiverImpl existing = _receivers.get(name);
+ if (receiver.equals(existing))
+ {
+ _receivers.remove(name);
+ }
+ else
+ {
+ _oldLinksToFree.remove(receiver);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77c1ee07/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 8a7fb32..c40cdee 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -22,15 +22,12 @@ import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArr
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
-import org.apache.qpid.proton.amqp.security.SaslCode;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
@@ -55,12 +52,9 @@ import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
-import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
import org.apache.qpid.proton.engine.TransportResultFactory;
-import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
-import org.apache.qpid.proton.engine.impl.ssl.ProtonSslEngineProvider;
import org.apache.qpid.proton.engine.impl.ssl.SslImpl;
import org.apache.qpid.proton.framing.TransportFrame;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|