qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [3/3] qpid-jms git commit: update anonymous producers to use a connection capability to signal support of the anonymous relay node
Date Tue, 18 Nov 2014 17:34:10 GMT
update anonymous producers to use a connection capability to signal support of the anonymous
relay node


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/03be3ff0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/03be3ff0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/03be3ff0

Branch: refs/heads/master
Commit: 03be3ff087b9de9e3f864f84c24b2c5a3c6fbfce
Parents: 428c782
Author: Robert Gemmell <robbie@apache.org>
Authored: Tue Nov 18 17:08:39 2014 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Tue Nov 18 17:08:39 2014 +0000

----------------------------------------------------------------------
 .../amqp/AmqpAnonymousProducerWrapper.java      | 115 -------------------
 .../qpid/jms/provider/amqp/AmqpConnection.java  |   1 -
 .../provider/amqp/AmqpConnectionProperties.java |  17 ++-
 .../qpid/jms/provider/amqp/AmqpSession.java     |   6 +-
 .../jms/integration/SessionIntegrationTest.java |  73 +++++++++---
 5 files changed, 75 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
deleted file mode 100644
index cc63ffa..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.io.IOException;
-
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
-import org.apache.qpid.jms.meta.JmsProducerInfo;
-import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.WrappedAsyncResult;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles the case of anonymous JMS MessageProducers.
- *
- * In order to simulate the anonymous producer we must create a sender for each message
- * send attempt and close it following a successful send.
- */
-public class AmqpAnonymousProducerWrapper extends AmqpProducer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducerWrapper.class);
-    AmqpProducer delegate;
-
-    /**
-     * Creates the Anonymous Producer object.
-     *
-     * @param session
-     *        the session that owns this producer
-     * @param info
-     *        the JmsProducerInfo for this producer.
-     */
-    public AmqpAnonymousProducerWrapper(AmqpSession session, JmsProducerInfo info) {
-        super(session, info);
-
-        delegate = new AmqpFixedProducer(session, info);
-    }
-
-    @Override
-    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws
IOException, JMSException {
-        LOG.trace("Delegating anonymous send to underlying producer: {}", getProducerId());
-
-       return delegate.send(envelope, request);
-    }
-
-    @Override
-    public void open(AsyncResult request) {
-        AnonymousRelayRequest anonRelayRequest = new AnonymousRelayRequest(request);
-        delegate.open(anonRelayRequest);
-    }
-
-    @Override
-    public void close(AsyncResult request) {
-        delegate.close(request);
-    }
-
-    @Override
-    public boolean isAnonymous() {
-        return true;
-    }
-
-    @Override
-    public EndpointState getLocalState() {
-        return delegate.getLocalState();
-    }
-
-    @Override
-    public EndpointState getRemoteState() {
-        return delegate.getRemoteState();
-    }
-
-    @Override
-    public void setPresettle(boolean presettle) {
-        delegate.setPresettle(presettle);
-    };
-
-    private class AnonymousRelayRequest extends WrappedAsyncResult {
-
-        public AnonymousRelayRequest(AsyncResult openResult) {
-            super(openResult);
-        }
-
-        /**
-         * If creation of the producer to the anonymous-relay failed, we try to
-         * enter fallback mode rather than immediately failing.
-         */
-        @Override
-        public void onFailure(Throwable result) {
-            LOG.debug("Attempt to open producer to anonymous relay failed, entering fallback
mode");
-
-            AmqpProducer newProducer = new AmqpAnonymousFallbackProducer(session, getJmsResource());
-            newProducer.setPresettle(delegate.isPresettle());
-            delegate = newProducer;
-
-            delegate.open(getWrappedRequest());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 1cc5005..2badbcd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -31,7 +31,6 @@ import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory;
 import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sasl;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index 814c0f0..c3b0297 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Symbol;
@@ -27,6 +29,10 @@ import org.apache.qpid.proton.amqp.Symbol;
  */
 public class AmqpConnectionProperties {
 
+    public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+
+    private boolean anonymousRelaySupported = false;
+
     /**
      * Creates a new instance of this class from the given remote capabilities and properties.
      *
@@ -46,10 +52,19 @@ public class AmqpConnectionProperties {
     }
 
     protected void processCapabilities(Symbol[] capabilities) {
-        // TODO - Inspect capabilities for configuration options
+        List<Symbol> list = Arrays.asList(capabilities);
+        if (list.contains(ANONYMOUS_RELAY)) {
+            anonymousRelaySupported = true;
+        }
+
+        // TODO - Inspect capabilities for any other configuration options
     }
 
     protected void processProperties(Map<Symbol, Object> properties) {
         // TODO - Inspect properties for configuration options
     }
+
+    public boolean isAnonymousRelaySupported() {
+        return anonymousRelaySupported;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index d707f35..d6e0abc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -102,12 +102,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo,
Session> {
     public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
         AmqpProducer producer = null;
 
-        if (producerInfo.getDestination() != null) {
+        if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported())
{
             LOG.debug("Creating AmqpFixedProducer for: {}", producerInfo.getDestination());
             producer = new AmqpFixedProducer(this, producerInfo);
         } else {
-            LOG.debug("Creating an AmqpAnonymousProducerWrapper");
-            producer = new AmqpAnonymousProducerWrapper(this, producerInfo);
+            LOG.debug("Creating an AmqpAnonymousFallbackProducer");
+            producer = new AmqpAnonymousFallbackProducer(this, producerInfo);
         }
 
         producer.setPresettle(connection.isPresettleProducers());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index ed97272..93fae3c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -41,6 +41,7 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
+import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.DescriptorMatcher;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -58,6 +59,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.Test;
 
 public class SessionIntegrationTest extends QpidJmsTestCase {
@@ -184,7 +186,10 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     @Test(timeout = 5000)
     public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsSupported() throws Exception
{
         try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            //Add capability to indicate support for ANONYMOUS-RELAY
+            Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
             connection.start();
 
             testPeer.expectBegin(true);
@@ -227,16 +232,54 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 5000)
-    public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsNotDeferred()
throws Exception {
+    public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsNotDeferred()
throws Exception {
+        doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(false);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsDeferred()
throws Exception {
+        doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(true);
+    }
+
+    private void doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(boolean
deferAttachFrameWrite) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            //Add capability to indicate support for ANONYMOUS-RELAY
+            Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            //Expect and refuse a link to the anonymous relay node
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(nullValue());
+            targetMatcher.withDynamic(nullValue());//default = false
+            targetMatcher.withDurable(nullValue());//default = none/0
+
+            testPeer.expectSenderAttach(targetMatcher, true, false);
+
+            try {
+                session.createProducer(null);
+                fail("Expected producer creation to fail if anonymous-relay link refused");
+            } catch (JMSException jmse) {
+                //expected
+            }
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred()
throws Exception {
         doCreateProducerFailsWhenLinkRefusedTestImpl(false);
     }
 
     @Test(timeout = 5000)
-    public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsDeferred() throws
Exception {
+    public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred()
throws Exception {
         doCreateProducerFailsWhenLinkRefusedTestImpl(true);
     }
 
-    private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachFrameWrite)
throws JMSException, InterruptedException, Exception, IOException {
+    private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite)
throws JMSException, InterruptedException, Exception, IOException {
         try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -253,7 +296,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             targetMatcher.withDynamic(nullValue());//default = false
             targetMatcher.withDurable(nullValue());//default = none/0
 
-            testPeer.expectSenderAttach(targetMatcher, true, deferAttachFrameWrite);
+            testPeer.expectSenderAttach(targetMatcher, true, deferAttachResponseWrite);
             //Expect the detach response to the test peer closing the producer link after
refusal.
             testPeer.expectDetach(true, false, false);
 
@@ -272,6 +315,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     @Test(timeout = 5000)
     public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+
+            //DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
 
@@ -281,15 +327,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             String topicName = "myTopic";
             Topic dest = session.createTopic(topicName);
 
-            //Expect and refuse a link to the anonymous relay node
-            TargetMatcher targetMatcher = new TargetMatcher();
-            targetMatcher.withAddress(nullValue());
-            targetMatcher.withDynamic(nullValue());//default = false
-            targetMatcher.withDurable(nullValue());//default = none/0
-
-            testPeer.expectSenderAttach(targetMatcher, true, false);
-            //Expect the detach response to the test peer closing the producer link after
refusal.
-            testPeer.expectDetach(true, false, false);
+            // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+            // for an actual send to occur on the producer before anything occurs on the
wire
 
             //Create an anonymous producer
             MessageProducer producer = session.createProducer(null);
@@ -297,7 +336,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             //Expect a new message sent by the above producer to cause creation of a new
             //sender link to the given destination, then closing the link after the message
is sent.
-            TargetMatcher targetMatcher2 = new TargetMatcher();
+            TargetMatcher targetMatcher = new TargetMatcher();
             targetMatcher.withAddress(equalTo("topic://" + topicName)); //TODO: remove prefix
             targetMatcher.withDynamic(nullValue());//default = false
             targetMatcher.withDurable(nullValue());//default = none/0
@@ -308,7 +347,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
 
-            testPeer.expectSenderAttach(targetMatcher2, false, false);
+            testPeer.expectSenderAttach(targetMatcher, false, false);
             testPeer.expectTransfer(messageMatcher);
             testPeer.expectDetach(true, true, true);
 
@@ -316,7 +355,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             producer.send(dest, message);
 
             //Repeat the send and observe another attach->transfer->detach.
-            testPeer.expectSenderAttach(targetMatcher2, false, false);
+            testPeer.expectSenderAttach(targetMatcher, false, false);
             testPeer.expectTransfer(messageMatcher);
             testPeer.expectDetach(true, true, true);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message