qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [10/10] qpid-broker-j git commit: QPID-7531: [Java Broker, AMQP 1.0] Improve reporting of error condition on link attachments
Date Thu, 05 Oct 2017 19:20:12 GMT
QPID-7531: [Java Broker, AMQP 1.0] Improve reporting of error condition on link attachments


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/fb93bae6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/fb93bae6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/fb93bae6

Branch: refs/heads/master
Commit: fb93bae6368dffce01b6e0261617c9b3ce6c48fe
Parents: 2736b3b
Author: Alex Rudyy <orudyy@apache.org>
Authored: Thu Oct 5 20:09:50 2017 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Thu Oct 5 20:17:36 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 52 +++++++++++++-------
 1 file changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fb93bae6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ce8e401..f54a19f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1294,8 +1294,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0,
ConsumerTarget
         }
         while(++i != 0);
 
-        // TODO
-        throw new RuntimeException();
+        return null;
     }
 
 
@@ -1353,33 +1352,50 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0,
ConsumerTarget
         @Override
         public void onSuccess(final T endpoint)
         {
-            doOnIOThreadAsync(() -> {
+            doOnIOThreadAsync(() ->
+            {
                 _associatedLinkEndpoints.add(endpoint);
                 _inputHandleToEndpoint.put(_attach.getHandle(), endpoint);
-                endpoint.setLocalHandle(findNextAvailableOutputHandle());
-                if (endpoint instanceof ErrantLinkEndpoint)
+                UnsignedInteger nextAvailableOutputHandle = findNextAvailableOutputHandle();
+                if (nextAvailableOutputHandle == null)
                 {
-                    endpoint.sendAttach();
-                    ((ErrantLinkEndpoint) endpoint).closeWithError();
+                    endpoint.close(new Error(AmqpError.RESOURCE_LIMIT_EXCEEDED,
+                                             String.format(
+                                                     "Cannot find free handle for endpoint
'%d' on session '%s'",
+                                                     _attach.getHandle(),
+                                                     endpoint.getSession().toLogString())));
                 }
                 else
                 {
-                    if (endpoint instanceof StandardReceivingLinkEndpoint
-                        && (_blockingEntities.contains(Session_1_0.this)
-                            || _blockingEntities.contains(((StandardReceivingLinkEndpoint)
endpoint).getReceivingDestination())))
-                    {
-                        endpoint.setStopped(true);
-                    }
-                    _inputHandleToEndpoint.put(_attach.getHandle(), endpoint);
-                    if (!_endpointToOutputHandle.containsKey(endpoint))
+                    endpoint.setLocalHandle(nextAvailableOutputHandle);
+                    if (endpoint instanceof ErrantLinkEndpoint)
                     {
-                        _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
                         endpoint.sendAttach();
-                        endpoint.start();
+                        ((ErrantLinkEndpoint) endpoint).closeWithError();
                     }
                     else
                     {
-                        // TODO - close connection or session with internal error
+                        if (endpoint instanceof StandardReceivingLinkEndpoint
+                            && (_blockingEntities.contains(Session_1_0.this)
+                                || _blockingEntities.contains(((StandardReceivingLinkEndpoint)
endpoint)
+                                                                      .getReceivingDestination())))
+                        {
+                            endpoint.setStopped(true);
+                        }
+
+                        if (!_endpointToOutputHandle.containsKey(endpoint))
+                        {
+                            _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
+                            endpoint.sendAttach();
+                            endpoint.start();
+                        }
+                        else
+                        {
+                            final End end = new End();
+                            end.setError(new Error(AmqpError.INTERNAL_ERROR,
+                                                   "Endpoint is already registered with session."));
+                            endpoint.getSession().end(end);
+                        }
                     }
                 }
             });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message