activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [8/9] activemq-artemis git commit: ARTEMIS-637 Port 5.x AMQP test client
Date Wed, 20 Jul 2016 09:35:40 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
new file mode 100644
index 0000000..320d174
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessage {
+
+   private final AmqpReceiver receiver;
+   private final Message message;
+   private final Delivery delivery;
+
+   private Map<Symbol, Object> deliveryAnnotationsMap;
+   private Map<Symbol, Object> messageAnnotationsMap;
+   private Map<String, Object> applicationPropertiesMap;
+
+   /**
+    * Creates a new AmqpMessage that wraps the information necessary to handle
+    * an outgoing message.
+    */
+   public AmqpMessage() {
+      receiver = null;
+      delivery = null;
+
+      message = Proton.message();
+   }
+
+   /**
+    * Creates a new AmqpMessage that wraps the information necessary to handle
+    * an outgoing message.
+    *
+    * @param message the Proton message that is to be sent.
+    */
+   public AmqpMessage(Message message) {
+      this(null, message, null);
+   }
+
+   /**
+    * Creates a new AmqpMessage that wraps the information necessary to handle
+    * an incoming delivery.
+    *
+    * @param receiver the AmqpReceiver that received this message.
+    * @param message  the Proton message that was received.
+    * @param delivery the Delivery instance that produced this message.
+    */
+   @SuppressWarnings("unchecked")
+   public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
+      this.receiver = receiver;
+      this.message = message;
+      this.delivery = delivery;
+
+      if (message.getMessageAnnotations() != null) {
+         messageAnnotationsMap = message.getMessageAnnotations().getValue();
+      }
+
+      if (message.getApplicationProperties() != null) {
+         applicationPropertiesMap = message.getApplicationProperties().getValue();
+      }
+
+      if (message.getDeliveryAnnotations() != null) {
+         deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
+      }
+   }
+
+   //----- Access to interal client resources -------------------------------//
+
+   /**
+    * @return the AMQP Delivery object linked to a received message.
+    */
+   public Delivery getWrappedDelivery() {
+      if (delivery != null) {
+         return new UnmodifiableDelivery(delivery);
+      }
+
+      return null;
+   }
+
+   /**
+    * @return the AMQP Message that is wrapped by this object.
+    */
+   public Message getWrappedMessage() {
+      return message;
+   }
+
+   /**
+    * @return the AmqpReceiver that consumed this message.
+    */
+   public AmqpReceiver getAmqpReceiver() {
+      return receiver;
+   }
+
+   //----- Message disposition control --------------------------------------//
+
+   /**
+    * Accepts the message marking it as consumed on the remote peer.
+    *
+    * @throws Exception if an error occurs during the accept.
+    */
+   public void accept() throws Exception {
+      if (receiver == null) {
+         throw new IllegalStateException("Can't accept non-received message.");
+      }
+
+      receiver.accept(delivery);
+   }
+
+   /**
+    * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
+    *
+    * @param deliveryFailed    indicates that the delivery failed for some reason.
+    * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
+    * @throws Exception if an error occurs during the process.
+    */
+   public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
+      if (receiver == null) {
+         throw new IllegalStateException("Can't modify non-received message.");
+      }
+
+      receiver.modified(delivery, deliveryFailed, undeliverableHere);
+   }
+
+   /**
+    * Release the message, remote can redeliver it elsewhere.
+    *
+    * @throws Exception if an error occurs during the reject.
+    */
+   public void release() throws Exception {
+      if (receiver == null) {
+         throw new IllegalStateException("Can't release non-received message.");
+      }
+
+      receiver.release(delivery);
+   }
+
+   //----- Convenience methods for constructing outbound messages -----------//
+
+   /**
+    * Sets the MessageId property on an outbound message using the provided String
+    *
+    * @param messageId the String message ID value to set.
+    */
+   public void setMessageId(String messageId) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setMessageId(messageId);
+   }
+
+   /**
+    * Return the set MessageId value in String form, if there are no properties
+    * in the given message return null.
+    *
+    * @return the set message ID in String form or null if not set.
+    */
+   public String getMessageId() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getMessageId().toString();
+   }
+
+   /**
+    * Return the set MessageId value in the original form, if there are no properties
+    * in the given message return null.
+    *
+    * @return the set message ID in its original form or null if not set.
+    */
+   public Object getRawMessageId() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getMessageId();
+   }
+
+   /**
+    * Sets the MessageId property on an outbound message using the provided value
+    *
+    * @param messageId the message ID value to set.
+    */
+   public void setRawMessageId(Object messageId) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setMessageId(messageId);
+   }
+
+   /**
+    * Sets the CorrelationId property on an outbound message using the provided String
+    *
+    * @param correlationId the String Correlation ID value to set.
+    */
+   public void setCorrelationId(String correlationId) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setCorrelationId(correlationId);
+   }
+
+   /**
+    * Return the set CorrelationId value in String form, if there are no properties
+    * in the given message return null.
+    *
+    * @return the set correlation ID in String form or null if not set.
+    */
+   public String getCorrelationId() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getCorrelationId().toString();
+   }
+
+   /**
+    * Return the set CorrelationId value in the original form, if there are no properties
+    * in the given message return null.
+    *
+    * @return the set message ID in its original form or null if not set.
+    */
+   public Object getRawCorrelationId() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getCorrelationId();
+   }
+
+   /**
+    * Sets the CorrelationId property on an outbound message using the provided value
+    *
+    * @param correlationId the correlation ID value to set.
+    */
+   public void setRawCorrelationId(Object correlationId) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setCorrelationId(correlationId);
+   }
+
+   /**
+    * Sets the GroupId property on an outbound message using the provided String
+    *
+    * @param groupId the String Group ID value to set.
+    */
+   public void setGroupId(String groupId) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setGroupId(groupId);
+   }
+
+   /**
+    * Return the set GroupId value in String form, if there are no properties
+    * in the given message return null.
+    *
+    * @return the set GroupID in String form or null if not set.
+    */
+   public String getGroupId() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getGroupId();
+   }
+
+   /**
+    * Sets the durable header on the outgoing message.
+    *
+    * @param durable the boolean durable value to set.
+    */
+   public void setDurable(boolean durable) {
+      checkReadOnly();
+      lazyCreateHeader();
+      getWrappedMessage().setDurable(durable);
+   }
+
+   /**
+    * Checks the durable value in the Message Headers to determine if
+    * the message was sent as a durable Message.
+    *
+    * @return true if the message is marked as being durable.
+    */
+   public boolean isDurable() {
+      if (message.getHeader() == null) {
+         return false;
+      }
+
+      return message.getHeader().getDurable();
+   }
+
+   /**
+    * Sets a given application property on an outbound message.
+    *
+    * @param key   the name to assign the new property.
+    * @param value the value to set for the named property.
+    */
+   public void setApplicationProperty(String key, Object value) {
+      checkReadOnly();
+      lazyCreateApplicationProperties();
+      applicationPropertiesMap.put(key, value);
+   }
+
+   /**
+    * Gets the application property that is mapped to the given name or null
+    * if no property has been set with that name.
+    *
+    * @param key the name used to lookup the property in the application properties.
+    * @return the propety value or null if not set.
+    */
+   public Object getApplicationProperty(String key) {
+      if (applicationPropertiesMap == null) {
+         return null;
+      }
+
+      return applicationPropertiesMap.get(key);
+   }
+
+   /**
+    * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+    * the target value to append to the current annotations.
+    *
+    * @param key   The name of the Symbol whose value is being set.
+    * @param value The new value to set in the annotations of this message.
+    */
+   public void setMessageAnnotation(String key, Object value) {
+      checkReadOnly();
+      lazyCreateMessageAnnotations();
+      messageAnnotationsMap.put(Symbol.valueOf(key), value);
+   }
+
+   /**
+    * Given a message annotation name, lookup and return the value associated with
+    * that annotation name.  If the message annotations have not been created yet
+    * then this method will always return null.
+    *
+    * @param key the Symbol name that should be looked up in the message annotations.
+    * @return the value of the annotation if it exists, or null if not set or not accessible.
+    */
+   public Object getMessageAnnotation(String key) {
+      if (messageAnnotationsMap == null) {
+         return null;
+      }
+
+      return messageAnnotationsMap.get(Symbol.valueOf(key));
+   }
+
+   /**
+    * Perform a proper delivery annotation set on the AMQP Message based on a Symbol
+    * key and the target value to append to the current delivery annotations.
+    *
+    * @param key   The name of the Symbol whose value is being set.
+    * @param value The new value to set in the delivery annotations of this message.
+    */
+   public void setDeliveryAnnotation(String key, Object value) {
+      checkReadOnly();
+      lazyCreateDeliveryAnnotations();
+      deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
+   }
+
+   /**
+    * Given a message annotation name, lookup and return the value associated with
+    * that annotation name.  If the message annotations have not been created yet
+    * then this method will always return null.
+    *
+    * @param key the Symbol name that should be looked up in the message annotations.
+    * @return the value of the annotation if it exists, or null if not set or not accessible.
+    */
+   public Object getDeliveryAnnotation(String key) {
+      if (deliveryAnnotationsMap == null) {
+         return null;
+      }
+
+      return deliveryAnnotationsMap.get(Symbol.valueOf(key));
+   }
+
+   //----- Methods for manipulating the Message body ------------------------//
+
+   /**
+    * Sets a String value into the body of an outgoing Message, throws
+    * an exception if this is an incoming message instance.
+    *
+    * @param value the String value to store in the Message body.
+    * @throws IllegalStateException if the message is read only.
+    */
+   public void setText(String value) throws IllegalStateException {
+      checkReadOnly();
+      AmqpValue body = new AmqpValue(value);
+      getWrappedMessage().setBody(body);
+   }
+
+   /**
+    * Sets a byte array value into the body of an outgoing Message, throws
+    * an exception if this is an incoming message instance.
+    *
+    * @param bytes the byte array value to store in the Message body.
+    * @throws IllegalStateException if the message is read only.
+    */
+   public void setBytes(byte[] bytes) throws IllegalStateException {
+      checkReadOnly();
+      Data body = new Data(new Binary(bytes));
+      getWrappedMessage().setBody(body);
+   }
+
+   /**
+    * Sets a byte array value into the body of an outgoing Message, throws
+    * an exception if this is an incoming message instance.
+    *
+    * @param described the byte array value to store in the Message body.
+    * @throws IllegalStateException if the message is read only.
+    */
+   public void setDescribedType(DescribedType described) throws IllegalStateException {
+      checkReadOnly();
+      AmqpValue body = new AmqpValue(described);
+      getWrappedMessage().setBody(body);
+   }
+
+   /**
+    * Attempts to retrieve the message body as an DescribedType instance.
+    *
+    * @return an DescribedType instance if one is stored in the message body.
+    * @throws NoSuchElementException if the body does not contain a DescribedType.
+    */
+   public DescribedType getDescribedType() throws NoSuchElementException {
+      DescribedType result = null;
+
+      if (getWrappedMessage().getBody() == null) {
+         return null;
+      }
+      else {
+         if (getWrappedMessage().getBody() instanceof AmqpValue) {
+            AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
+
+            if (value.getValue() == null) {
+               result = null;
+            }
+            else if (value.getValue() instanceof DescribedType) {
+               result = (DescribedType) value.getValue();
+            }
+            else {
+               throw new NoSuchElementException("Message does not contain a DescribedType body");
+            }
+         }
+      }
+
+      return result;
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   private void checkReadOnly() throws IllegalStateException {
+      if (delivery != null) {
+         throw new IllegalStateException("Message is read only.");
+      }
+   }
+
+   private void lazyCreateMessageAnnotations() {
+      if (messageAnnotationsMap == null) {
+         messageAnnotationsMap = new HashMap<>();
+         message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+      }
+   }
+
+   private void lazyCreateDeliveryAnnotations() {
+      if (deliveryAnnotationsMap == null) {
+         deliveryAnnotationsMap = new HashMap<>();
+         message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
+      }
+   }
+
+   private void lazyCreateApplicationProperties() {
+      if (applicationPropertiesMap == null) {
+         applicationPropertiesMap = new HashMap<>();
+         message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+      }
+   }
+
+   private void lazyCreateHeader() {
+      if (message.getHeader() == null) {
+         message.setHeader(new Header());
+      }
+   }
+
+   private void lazyCreateProperties() {
+      if (message.getProperties() == null) {
+         message.setProperties(new Properties());
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
new file mode 100644
index 0000000..2e36e84
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalFilter implements DescribedType {
+
+   public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
+
+   private final String noLocal;
+
+   public AmqpNoLocalFilter() {
+      this.noLocal = "NoLocalFilter{}";
+   }
+
+   @Override
+   public Object getDescriptor() {
+      return NO_LOCAL_CODE;
+   }
+
+   @Override
+   public Object getDescribed() {
+      return this.noLocal;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
new file mode 100644
index 0000000..9f3bff2
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -0,0 +1,946 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.InvalidDestinationException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+/**
+ * Receiver class that manages a Proton receiver endpoint.
+ */
+public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
+
+   private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
+
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>();
+
+   private final AmqpSession session;
+   private final String address;
+   private final String receiverId;
+   private final Source userSpecifiedSource;
+
+   private String subscriptionName;
+   private String selector;
+   private boolean presettle;
+   private boolean noLocal;
+
+   private AsyncResult pullRequest;
+   private AsyncResult stopRequest;
+
+   /**
+    * Create a new receiver instance.
+    *
+    * @param session    The parent session that created the receiver.
+    * @param address    The address that this receiver should listen on.
+    * @param receiverId The unique ID assigned to this receiver.
+    */
+   public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+
+      if (address != null && address.isEmpty()) {
+         throw new IllegalArgumentException("Address cannot be empty.");
+      }
+
+      this.userSpecifiedSource = null;
+      this.session = session;
+      this.address = address;
+      this.receiverId = receiverId;
+   }
+
+   /**
+    * Create a new receiver instance.
+    *
+    * @param session    The parent session that created the receiver.
+    * @param source     The Source instance to use instead of creating and configuring one.
+    * @param receiverId The unique ID assigned to this receiver.
+    */
+   public AmqpReceiver(AmqpSession session, Source source, String receiverId) {
+
+      if (source == null) {
+         throw new IllegalArgumentException("User specified Source cannot be null");
+      }
+
+      this.session = session;
+      this.userSpecifiedSource = source;
+      this.address = source.getAddress();
+      this.receiverId = receiverId;
+   }
+
+   /**
+    * Close the receiver, a closed receiver will throw exceptions if any further send
+    * calls are made.
+    *
+    * @throws IOException if an error occurs while closing the receiver.
+    */
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         final ClientFuture request = new ClientFuture();
+         session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               checkClosed();
+               close(request);
+               session.pumpToProtonTransport(request);
+            }
+         });
+
+         request.sync();
+      }
+   }
+
+   /**
+    * Detach the receiver, a closed receiver will throw exceptions if any further send
+    * calls are made.
+    *
+    * @throws IOException if an error occurs while closing the receiver.
+    */
+   public void detach() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         final ClientFuture request = new ClientFuture();
+         session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               checkClosed();
+               detach(request);
+               session.pumpToProtonTransport(request);
+            }
+         });
+
+         request.sync();
+      }
+   }
+
+   /**
+    * @return this session's parent AmqpSession.
+    */
+   public AmqpSession getSession() {
+      return session;
+   }
+
+   /**
+    * @return the address that this receiver has been configured to listen on.
+    */
+   public String getAddress() {
+      return address;
+   }
+
+   /**
+    * Attempts to wait on a message to be delivered to this receiver.  The receive
+    * call will wait indefinitely for a message to be delivered.
+    *
+    * @return a newly received message sent to this receiver.
+    * @throws Exception if an error occurs during the receive attempt.
+    */
+   public AmqpMessage receive() throws Exception {
+      checkClosed();
+      return prefetch.take();
+   }
+
+   /**
+    * Attempts to receive a message sent to this receiver, waiting for the given
+    * timeout value before giving up and returning null.
+    *
+    * @param timeout the time to wait for a new message to arrive.
+    * @param unit    the unit of time that the timeout value represents.
+    * @return a newly received message or null if the time to wait period expires.
+    * @throws Exception if an error occurs during the receive attempt.
+    */
+   public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception {
+      checkClosed();
+      return prefetch.poll(timeout, unit);
+   }
+
+   /**
+    * If a message is already available in this receiver's prefetch buffer then
+    * it is returned immediately otherwise this methods return null without waiting.
+    *
+    * @return a newly received message or null if there is no currently available message.
+    * @throws Exception if an error occurs during the receive attempt.
+    */
+   public AmqpMessage receiveNoWait() throws Exception {
+      checkClosed();
+      return prefetch.poll();
+   }
+
+   /**
+    * Request a remote peer send a Message to this client waiting until one arrives.
+    *
+    * @return the pulled AmqpMessage or null if none was pulled from the remote.
+    * @throws IOException if an error occurs
+    */
+   public AmqpMessage pull() throws IOException {
+      return pull(-1, TimeUnit.MILLISECONDS);
+   }
+
+   /**
+    * Request a remote peer send a Message to this client using an immediate drain request.
+    *
+    * @return the pulled AmqpMessage or null if none was pulled from the remote.
+    * @throws IOException if an error occurs
+    */
+   public AmqpMessage pullImmediate() throws IOException {
+      return pull(0, TimeUnit.MILLISECONDS);
+   }
+
+   /**
+    * Request a remote peer send a Message to this client.
+    *
+    * {@literal timeout < 0} then it should remain open until a message is received.
+    * {@literal timeout = 0} then it returns a message or null if none available
+    * {@literal timeout > 0} then it should remain open for timeout amount of time.
+    *
+    * The timeout value when positive is given in milliseconds.
+    *
+    * @param timeout the amount of time to tell the remote peer to keep this pull request valid.
+    * @param unit    the unit of measure that the timeout represents.
+    * @return the pulled AmqpMessage or null if none was pulled from the remote.
+    * @throws IOException if an error occurs
+    */
+   public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
+      checkClosed();
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+
+            long timeoutMills = unit.toMillis(timeout);
+
+            try {
+               LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills);
+               if (timeoutMills < 0) {
+                  // Wait until message arrives. Just give credit if needed.
+                  if (getEndpoint().getCredit() == 0) {
+                     LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                     getEndpoint().flow(1);
+                  }
+
+                  // Await the message arrival
+                  pullRequest = request;
+               }
+               else if (timeoutMills == 0) {
+                  // If we have no credit then we need to issue some so that we can
+                  // try to fulfill the request, then drain down what is there to
+                  // ensure we consume what is available and remove all credit.
+                  if (getEndpoint().getCredit() == 0) {
+                     LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                     getEndpoint().flow(1);
+                  }
+
+                  // Drain immediately and wait for the message(s) to arrive,
+                  // or a flow indicating removal of the remaining credit.
+                  stop(request);
+               }
+               else if (timeoutMills > 0) {
+                  // If we have no credit then we need to issue some so that we can
+                  // try to fulfill the request, then drain down what is there to
+                  // ensure we consume what is available and remove all credit.
+                  if (getEndpoint().getCredit() == 0) {
+                     LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+                     getEndpoint().flow(1);
+                  }
+
+                  // Wait for the timeout for the message(s) to arrive, then drain if required
+                  // and wait for remaining message(s) to arrive or a flow indicating
+                  // removal of the remaining credit.
+                  stopOnSchedule(timeoutMills, request);
+               }
+
+               session.pumpToProtonTransport(request);
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+
+      return prefetch.poll();
+   }
+
+   /**
+    * Controls the amount of credit given to the receiver link.
+    *
+    * @param credit the amount of credit to grant.
+    * @throws IOException if an error occurs while sending the flow.
+    */
+   public void flow(final int credit) throws IOException {
+      checkClosed();
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               getEndpoint().flow(credit);
+               session.pumpToProtonTransport(request);
+               request.onSuccess();
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Attempts to drain a given amount of credit from the link.
+    *
+    * @param credit the amount of credit to drain.
+    * @throws IOException if an error occurs while sending the drain.
+    */
+   public void drain(final int credit) throws IOException {
+      checkClosed();
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               getEndpoint().drain(credit);
+               session.pumpToProtonTransport(request);
+               request.onSuccess();
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
+    *
+    * @throws IOException if an error occurs while sending the drain.
+    */
+   public void stop() throws IOException {
+      checkClosed();
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               stop(request);
+               session.pumpToProtonTransport(request);
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Accepts a message that was dispatched under the given Delivery instance.
+    *
+    * @param delivery the Delivery instance to accept.
+    * @throws IOException if an error occurs while sending the accept.
+    */
+   public void accept(final Delivery delivery) throws IOException {
+      checkClosed();
+
+      if (delivery == null) {
+         throw new IllegalArgumentException("Delivery to accept cannot be null");
+      }
+
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               if (!delivery.isSettled()) {
+                  if (session.isInTransaction()) {
+                     Binary txnId = session.getTransactionId().getRemoteTxId();
+                     if (txnId != null) {
+                        TransactionalState txState = new TransactionalState();
+                        txState.setOutcome(Accepted.getInstance());
+                        txState.setTxnId(txnId);
+                        delivery.disposition(txState);
+                        delivery.settle();
+                        session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
+                     }
+                  }
+                  else {
+                     delivery.disposition(Accepted.getInstance());
+                     delivery.settle();
+                  }
+               }
+               session.pumpToProtonTransport(request);
+               request.onSuccess();
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Mark a message that was dispatched under the given Delivery instance as Modified.
+    *
+    * @param delivery          the Delivery instance to mark modified.
+    * @param deliveryFailed    indicates that the delivery failed for some reason.
+    * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
+    * @throws IOException if an error occurs while sending the reject.
+    */
+   public void modified(final Delivery delivery,
+                        final Boolean deliveryFailed,
+                        final Boolean undeliverableHere) throws IOException {
+      checkClosed();
+
+      if (delivery == null) {
+         throw new IllegalArgumentException("Delivery to reject cannot be null");
+      }
+
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               if (!delivery.isSettled()) {
+                  Modified disposition = new Modified();
+                  disposition.setUndeliverableHere(undeliverableHere);
+                  disposition.setDeliveryFailed(deliveryFailed);
+                  delivery.disposition(disposition);
+                  delivery.settle();
+                  session.pumpToProtonTransport(request);
+               }
+               request.onSuccess();
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * Release a message that was dispatched under the given Delivery instance.
+    *
+    * @param delivery the Delivery instance to release.
+    * @throws IOException if an error occurs while sending the release.
+    */
+   public void release(final Delivery delivery) throws IOException {
+      checkClosed();
+
+      if (delivery == null) {
+         throw new IllegalArgumentException("Delivery to release cannot be null");
+      }
+
+      final ClientFuture request = new ClientFuture();
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            try {
+               if (!delivery.isSettled()) {
+                  delivery.disposition(Released.getInstance());
+                  delivery.settle();
+                  session.pumpToProtonTransport(request);
+               }
+               request.onSuccess();
+            }
+            catch (Exception e) {
+               request.onFailure(e);
+            }
+         }
+      });
+
+      request.sync();
+   }
+
+   /**
+    * @return an unmodifiable view of the underlying Receiver instance.
+    */
+   public Receiver getReceiver() {
+      return new UnmodifiableReceiver(getEndpoint());
+   }
+
+   //----- Receiver configuration properties --------------------------------//
+
+   public boolean isPresettle() {
+      return presettle;
+   }
+
+   public void setPresettle(boolean presettle) {
+      this.presettle = presettle;
+   }
+
+   public boolean isDurable() {
+      return subscriptionName != null;
+   }
+
+   public String getSubscriptionName() {
+      return subscriptionName;
+   }
+
+   public void setSubscriptionName(String subscriptionName) {
+      this.subscriptionName = subscriptionName;
+   }
+
+   public String getSelector() {
+      return selector;
+   }
+
+   public void setSelector(String selector) {
+      this.selector = selector;
+   }
+
+   public boolean isNoLocal() {
+      return noLocal;
+   }
+
+   public void setNoLocal(boolean noLocal) {
+      this.noLocal = noLocal;
+   }
+
+   public long getDrainTimeout() {
+      return session.getConnection().getDrainTimeout();
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   @Override
+   protected void doOpen() {
+
+      Source source = userSpecifiedSource;
+      Target target = new Target();
+
+      if (source == null && address != null) {
+         source = new Source();
+         source.setAddress(address);
+         configureSource(source);
+      }
+
+      String receiverName = receiverId + ":" + address;
+
+      if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+         // In the case of Durable Topic Subscriptions the client must use the same
+         // receiver name which is derived from the subscription name property.
+         receiverName = getSubscriptionName();
+      }
+
+      Receiver receiver = session.getEndpoint().receiver(receiverName);
+      receiver.setSource(source);
+      receiver.setTarget(target);
+      if (isPresettle()) {
+         receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+      }
+      else {
+         receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+      }
+      receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+      setEndpoint(receiver);
+
+      super.doOpen();
+   }
+
+   @Override
+   protected void doOpenCompletion() {
+      // Verify the attach response contained a non-null Source
+      org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+      if (s != null) {
+         super.doOpenCompletion();
+      }
+      else {
+         // No link terminus was created, the peer will now detach/close us.
+      }
+   }
+
+   @Override
+   protected void doClose() {
+      getEndpoint().close();
+   }
+
+   @Override
+   protected void doDetach() {
+      getEndpoint().detach();
+   }
+
+   @Override
+   protected Exception getOpenAbortException() {
+      // Verify the attach response contained a non-null Source
+      org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+      if (s != null) {
+         return super.getOpenAbortException();
+      }
+      else {
+         // No link terminus was created, the peer has detach/closed us, create IDE.
+         return new InvalidDestinationException("Link creation was refused");
+      }
+   }
+
+   @Override
+   protected void doOpenInspection() {
+      try {
+         getStateInspector().inspectOpenedResource(getReceiver());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doClosedInspection() {
+      try {
+         getStateInspector().inspectClosedResource(getReceiver());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doDetachedInspection() {
+      try {
+         getStateInspector().inspectDetachedResource(getReceiver());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   protected void configureSource(Source source) {
+      Map<Symbol, DescribedType> filters = new HashMap<>();
+      Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
+
+      if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+         source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+         source.setDurable(TerminusDurability.UNSETTLED_STATE);
+         source.setDistributionMode(COPY);
+      }
+      else {
+         source.setDurable(TerminusDurability.NONE);
+         source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+      }
+
+      source.setOutcomes(outcomes);
+
+      Modified modified = new Modified();
+      modified.setDeliveryFailed(true);
+      modified.setUndeliverableHere(false);
+
+      source.setDefaultOutcome(modified);
+
+      if (isNoLocal()) {
+         filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
+      }
+
+      if (getSelector() != null && !getSelector().trim().equals("")) {
+         filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector()));
+      }
+
+      if (!filters.isEmpty()) {
+         source.setFilter(filters);
+      }
+   }
+
+   @Override
+   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+      Delivery incoming = null;
+      do {
+         incoming = getEndpoint().current();
+         if (incoming != null) {
+            if (incoming.isReadable() && !incoming.isPartial()) {
+               LOG.trace("{} has incoming Message(s).", this);
+               try {
+                  processDelivery(incoming);
+               }
+               catch (Exception e) {
+                  throw IOExceptionSupport.create(e);
+               }
+               getEndpoint().advance();
+            }
+            else {
+               LOG.trace("{} has a partial incoming Message(s), deferring.", this);
+               incoming = null;
+            }
+         }
+         else {
+            // We have exhausted the locally queued messages on this link.
+            // Check if we tried to stop and have now run out of credit.
+            if (getEndpoint().getRemoteCredit() <= 0) {
+               if (stopRequest != null) {
+                  stopRequest.onSuccess();
+                  stopRequest = null;
+               }
+            }
+         }
+      } while (incoming != null);
+
+      super.processDeliveryUpdates(connection);
+   }
+
+   private void processDelivery(Delivery incoming) throws Exception {
+      Message message = null;
+      try {
+         message = decodeIncomingMessage(incoming);
+      }
+      catch (Exception e) {
+         LOG.warn("Error on transform: {}", e.getMessage());
+         deliveryFailed(incoming, true);
+         return;
+      }
+
+      AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming);
+      // Store reference to envelope in delivery context for recovery
+      incoming.setContext(amqpMessage);
+      prefetch.add(amqpMessage);
+
+      // We processed a message, signal completion
+      // of a message pull request if there is one.
+      if (pullRequest != null) {
+         pullRequest.onSuccess();
+         pullRequest = null;
+      }
+   }
+
+   @Override
+   public void processFlowUpdates(AmqpConnection connection) throws IOException {
+      if (pullRequest != null || stopRequest != null) {
+         Receiver receiver = getEndpoint();
+         if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
+            if (pullRequest != null) {
+               pullRequest.onSuccess();
+               pullRequest = null;
+            }
+
+            if (stopRequest != null) {
+               stopRequest.onSuccess();
+               stopRequest = null;
+            }
+         }
+      }
+
+      LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit());
+
+      super.processFlowUpdates(connection);
+   }
+
+   protected Message decodeIncomingMessage(Delivery incoming) {
+      int count;
+
+      byte[] chunk = new byte[2048];
+      ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+      while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) {
+         stream.write(chunk, 0, count);
+      }
+
+      byte[] messageBytes = stream.toByteArray();
+
+      try {
+         Message protonMessage = Message.Factory.create();
+         protonMessage.decode(messageBytes, 0, messageBytes.length);
+         return protonMessage;
+      }
+      finally {
+         try {
+            stream.close();
+         }
+         catch (IOException e) {
+         }
+      }
+   }
+
+   protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+      Modified disposition = new Modified();
+      disposition.setUndeliverableHere(true);
+      disposition.setDeliveryFailed(true);
+      incoming.disposition(disposition);
+      incoming.settle();
+      if (expandCredit) {
+         getEndpoint().flow(1);
+      }
+   }
+
+   private void stop(final AsyncResult request) {
+      Receiver receiver = getEndpoint();
+      if (receiver.getRemoteCredit() <= 0) {
+         if (receiver.getQueued() == 0) {
+            // We have no remote credit and all the deliveries have been processed.
+            request.onSuccess();
+         }
+         else {
+            // There are still deliveries to process, wait for them to be.
+            stopRequest = request;
+         }
+      }
+      else {
+         // TODO: We don't actually want the additional messages that could be sent while
+         // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
+         // of drain if it was supported. We would first need to understand what happens
+         // if we reduce credit below the number of messages already in-flight before
+         // the peer sees the update.
+         stopRequest = request;
+         receiver.drain(0);
+
+         if (getDrainTimeout() > 0) {
+            // If the remote doesn't respond we will close the consumer and break any
+            // blocked receive or stop calls that are waiting.
+            final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+               @Override
+               public void run() {
+                  LOG.trace("Consumer {} drain request timed out", this);
+                  Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+                  locallyClosed(session.getConnection(), cause);
+                  stopRequest.onFailure(cause);
+                  session.pumpToProtonTransport(stopRequest);
+               }
+            }, getDrainTimeout(), TimeUnit.MILLISECONDS);
+
+            stopRequest = new ScheduledRequest(future, stopRequest);
+         }
+      }
+   }
+
+   private void stopOnSchedule(long timeout, final AsyncResult request) {
+      LOG.trace("Receiver {} scheduling stop", this);
+      // We need to drain the credit if no message(s) arrive to use it.
+      final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+         @Override
+         public void run() {
+            LOG.trace("Receiver {} running scheduled stop", this);
+            if (getEndpoint().getRemoteCredit() != 0) {
+               stop(request);
+               session.pumpToProtonTransport(request);
+            }
+         }
+      }, timeout, TimeUnit.MILLISECONDS);
+
+      stopRequest = new ScheduledRequest(future, request);
+   }
+
+   @Override
+   public String toString() {
+      return getClass().getSimpleName() + "{ address = " + address + "}";
+   }
+
+   private void checkClosed() {
+      if (isClosed()) {
+         throw new IllegalStateException("Receiver is already closed");
+      }
+   }
+
+   //----- Internal Transaction state callbacks -----------------------------//
+
+   void preCommit() {
+   }
+
+   void preRollback() {
+   }
+
+   void postCommit() {
+   }
+
+   void postRollback() {
+   }
+
+   //----- Inner classes used in message pull operations --------------------//
+
+   protected static final class ScheduledRequest implements AsyncResult {
+
+      private final ScheduledFuture<?> sheduledTask;
+      private final AsyncResult origRequest;
+
+      public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
+         this.sheduledTask = completionTask;
+         this.origRequest = origRequest;
+      }
+
+      @Override
+      public void onFailure(Throwable cause) {
+         sheduledTask.cancel(false);
+         origRequest.onFailure(cause);
+      }
+
+      @Override
+      public void onSuccess() {
+         boolean cancelled = sheduledTask.cancel(false);
+         if (cancelled) {
+            // Signal completion. Otherwise wait for the scheduled task to do it.
+            origRequest.onSuccess();
+         }
+      }
+
+      @Override
+      public boolean isComplete() {
+         return origRequest.isComplete();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
new file mode 100644
index 0000000..0c9bb81
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+/**
+ * {@link IOException} derivative that defines that the remote peer has requested that this
+ * connection be redirected to some alternative peer.
+ */
+public class AmqpRedirectedException extends IOException {
+
+   private static final long serialVersionUID = 5872211116061710369L;
+
+   private final String hostname;
+   private final String networkHost;
+   private final int port;
+
+   public AmqpRedirectedException(String reason, String hostname, String networkHost, int port) {
+      super(reason);
+
+      this.hostname = hostname;
+      this.networkHost = networkHost;
+      this.port = port;
+   }
+
+   /**
+    * @return the host name of the container being redirected to.
+    */
+   public String getHostname() {
+      return hostname;
+   }
+
+   /**
+    * @return the DNS host name or IP address of the peer this connection is being redirected to.
+    */
+   public String getNetworkHost() {
+      return networkHost;
+   }
+
+   /**
+    * @return the port number on the peer this connection is being redirected to.
+    */
+   public int getPort() {
+      return port;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
new file mode 100644
index 0000000..bd66659
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource extends AmqpEventSink {
+
+   /**
+    * Perform all the work needed to open this resource and store the request
+    * until such time as the remote peer indicates the resource has become active.
+    *
+    * @param request The initiating request that triggered this open call.
+    */
+   void open(AsyncResult request);
+
+   /**
+    * @return if the resource has moved to the opened state on the remote.
+    */
+   boolean isOpen();
+
+   /**
+    * Called to indicate that this resource is now remotely opened.  Once opened a
+    * resource can start accepting incoming requests.
+    */
+   void opened();
+
+   /**
+    * Perform all work needed to close this resource and store the request
+    * until such time as the remote peer indicates the resource has been closed.
+    *
+    * @param request The initiating request that triggered this close call.
+    */
+   void close(AsyncResult request);
+
+   /**
+    * Perform all work needed to detach this resource and store the request
+    * until such time as the remote peer indicates the resource has been detached.
+    *
+    * @param request The initiating request that triggered this detach call.
+    */
+   void detach(AsyncResult request);
+
+   /**
+    * @return if the resource has moved to the closed state on the remote.
+    */
+   boolean isClosed();
+
+   /**
+    * Called to indicate that this resource is now remotely closed.  Once closed a
+    * resource can not accept any incoming requests.
+    */
+   void closed();
+
+   /**
+    * Sets the failed state for this Resource and triggers a failure signal for
+    * any pending ProduverRequest.
+    */
+   void failed();
+
+   /**
+    * Called to indicate that the remote end has become closed but the resource
+    * was not awaiting a close.  This could happen during an open request where
+    * the remote does not set an error condition or during normal operation.
+    *
+    * @param connection The connection that owns this resource.
+    */
+   void remotelyClosed(AmqpConnection connection);
+
+   /**
+    * Called to indicate that the local end has become closed but the resource
+    * was not awaiting a close.  This could happen during an open request where
+    * the remote does not set an error condition or during normal operation.
+    *
+    * @param connection The connection that owns this resource.
+    * @param error      The error that triggered the local close of this resource.
+    */
+   void locallyClosed(AmqpConnection connection, Exception error);
+
+   /**
+    * Sets the failed state for this Resource and triggers a failure signal for
+    * any pending ProduverRequest.
+    *
+    * @param cause The Exception that triggered the failure.
+    */
+   void failed(Exception cause);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
new file mode 100644
index 0000000..404b943
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.InvalidDestinationException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sender class that manages a Proton sender endpoint.
+ */
+public class AmqpSender extends AmqpAbstractResource<Sender> {
+
+   private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
+   private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+
+   public static final long DEFAULT_SEND_TIMEOUT = 15000;
+
+   private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
+   private final AtomicBoolean closed = new AtomicBoolean();
+
+   private final AmqpSession session;
+   private final String address;
+   private final String senderId;
+   private final Target userSpecifiedTarget;
+
+   private boolean presettle;
+   private long sendTimeout = DEFAULT_SEND_TIMEOUT;
+
+   private final Set<Delivery> pending = new LinkedHashSet<>();
+   private byte[] encodeBuffer = new byte[1024 * 8];
+
+   /**
+    * Create a new sender instance.
+    *
+    * @param session  The parent session that created the session.
+    * @param address  The address that this sender produces to.
+    * @param senderId The unique ID assigned to this sender.
+    */
+   public AmqpSender(AmqpSession session, String address, String senderId) {
+
+      if (address != null && address.isEmpty()) {
+         throw new IllegalArgumentException("Address cannot be empty.");
+      }
+
+      this.session = session;
+      this.address = address;
+      this.senderId = senderId;
+      this.userSpecifiedTarget = null;
+   }
+
+   /**
+    * Create a new sender instance using the given Target when creating the link.
+    *
+    * @param session  The parent session that created the session.
+    * @param address  The address that this sender produces to.
+    * @param senderId The unique ID assigned to this sender.
+    */
+   public AmqpSender(AmqpSession session, Target target, String senderId) {
+
+      if (target == null) {
+         throw new IllegalArgumentException("User specified Target cannot be null");
+      }
+
+      this.session = session;
+      this.userSpecifiedTarget = target;
+      this.address = target.getAddress();
+      this.senderId = senderId;
+   }
+
+   /**
+    * Sends the given message to this senders assigned address.
+    *
+    * @param message the message to send.
+    * @throws IOException if an error occurs during the send.
+    */
+   public void send(final AmqpMessage message) throws IOException {
+      checkClosed();
+      final ClientFuture sendRequest = new ClientFuture();
+
+      session.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            try {
+               doSend(message, sendRequest);
+               session.pumpToProtonTransport(sendRequest);
+            }
+            catch (Exception e) {
+               sendRequest.onFailure(e);
+               session.getConnection().fireClientException(e);
+            }
+         }
+      });
+
+      if (sendTimeout <= 0) {
+         sendRequest.sync();
+      }
+      else {
+         sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS);
+      }
+   }
+
+   /**
+    * Close the sender, a closed sender will throw exceptions if any further send
+    * calls are made.
+    *
+    * @throws IOException if an error occurs while closing the sender.
+    */
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         final ClientFuture request = new ClientFuture();
+         session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               checkClosed();
+               close(request);
+               session.pumpToProtonTransport(request);
+            }
+         });
+
+         request.sync();
+      }
+   }
+
+   /**
+    * @return this session's parent AmqpSession.
+    */
+   public AmqpSession getSession() {
+      return session;
+   }
+
+   /**
+    * @return an unmodifiable view of the underlying Sender instance.
+    */
+   public Sender getSender() {
+      return new UnmodifiableSender(getEndpoint());
+   }
+
+   /**
+    * @return the assigned address of this sender.
+    */
+   public String getAddress() {
+      return address;
+   }
+
+   //----- Sender configuration ---------------------------------------------//
+
+   /**
+    * @return will messages be settle on send.
+    */
+   public boolean isPresettle() {
+      return presettle;
+   }
+
+   /**
+    * Configure is sent messages are marked as settled on send, defaults to false.
+    *
+    * @param presettle configure if this sender will presettle all sent messages.
+    */
+   public void setPresettle(boolean presettle) {
+      this.presettle = presettle;
+   }
+
+   /**
+    * @return the currently configured send timeout.
+    */
+   public long getSendTimeout() {
+      return sendTimeout;
+   }
+
+   /**
+    * Sets the amount of time the sender will block on a send before failing.
+    *
+    * @param sendTimeout time in milliseconds to wait.
+    */
+   public void setSendTimeout(long sendTimeout) {
+      this.sendTimeout = sendTimeout;
+   }
+
+   //----- Private Sender implementation ------------------------------------//
+
+   private void checkClosed() {
+      if (isClosed()) {
+         throw new IllegalStateException("Sender is already closed");
+      }
+   }
+
+   @Override
+   protected void doOpen() {
+
+      Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
+      Source source = new Source();
+      source.setAddress(senderId);
+      source.setOutcomes(outcomes);
+
+      Target target = userSpecifiedTarget;
+      if (target == null) {
+         target = new Target();
+         target.setAddress(address);
+      }
+
+      String senderName = senderId + ":" + address;
+
+      Sender sender = session.getEndpoint().sender(senderName);
+      sender.setSource(source);
+      sender.setTarget(target);
+      if (presettle) {
+         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+      }
+      else {
+         sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+      }
+      sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+      setEndpoint(sender);
+
+      super.doOpen();
+   }
+
+   @Override
+   protected void doOpenCompletion() {
+      // Verify the attach response contained a non-null target
+      org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+      if (t != null) {
+         super.doOpenCompletion();
+      }
+      else {
+         // No link terminus was created, the peer will now detach/close us.
+      }
+   }
+
+   @Override
+   protected void doOpenInspection() {
+      try {
+         getStateInspector().inspectOpenedResource(getSender());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doClosedInspection() {
+      try {
+         getStateInspector().inspectClosedResource(getSender());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected void doDetachedInspection() {
+      try {
+         getStateInspector().inspectDetachedResource(getSender());
+      }
+      catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
+
+   @Override
+   protected Exception getOpenAbortException() {
+      // Verify the attach response contained a non-null target
+      org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+      if (t != null) {
+         return super.getOpenAbortException();
+      }
+      else {
+         // No link terminus was created, the peer has detach/closed us, create IDE.
+         return new InvalidDestinationException("Link creation was refused");
+      }
+   }
+
+   private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+      LOG.trace("Producer sending message: {}", message);
+
+      Delivery delivery = null;
+      if (presettle) {
+         delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+      }
+      else {
+         byte[] tag = tagGenerator.getNextTag();
+         delivery = getEndpoint().delivery(tag, 0, tag.length);
+      }
+
+      delivery.setContext(request);
+
+      if (session.isInTransaction()) {
+         Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+         TransactionalState state = new TransactionalState();
+         state.setTxnId(amqpTxId);
+         delivery.disposition(state);
+      }
+
+      encodeAndSend(message.getWrappedMessage(), delivery);
+
+      if (presettle) {
+         delivery.settle();
+         request.onSuccess();
+      }
+      else {
+         pending.add(delivery);
+         getEndpoint().advance();
+      }
+   }
+
+   private void encodeAndSend(Message message, Delivery delivery) throws IOException {
+
+      int encodedSize;
+      while (true) {
+         try {
+            encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
+            break;
+         }
+         catch (java.nio.BufferOverflowException e) {
+            encodeBuffer = new byte[encodeBuffer.length * 2];
+         }
+      }
+
+      int sentSoFar = 0;
+
+      while (true) {
+         int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
+         if (sent > 0) {
+            sentSoFar += sent;
+            if ((encodedSize - sentSoFar) == 0) {
+               break;
+            }
+         }
+         else {
+            LOG.warn("{} failed to send any data from current Message.", this);
+         }
+      }
+   }
+
+   @Override
+   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+      List<Delivery> toRemove = new ArrayList<>();
+
+      for (Delivery delivery : pending) {
+         DeliveryState state = delivery.getRemoteState();
+         if (state == null) {
+            continue;
+         }
+
+         Outcome outcome = null;
+         if (state instanceof TransactionalState) {
+            LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
+            outcome = ((TransactionalState) state).getOutcome();
+         }
+         else if (state instanceof Outcome) {
+            outcome = (Outcome) state;
+         }
+         else {
+            LOG.warn("Message send updated with unsupported state: {}", state);
+            outcome = null;
+         }
+
+         AsyncResult request = (AsyncResult) delivery.getContext();
+         Exception deliveryError = null;
+
+         if (outcome instanceof Accepted) {
+            LOG.trace("Outcome of delivery was accepted: {}", delivery);
+            if (request != null && !request.isComplete()) {
+               request.onSuccess();
+            }
+         }
+         else if (outcome instanceof Rejected) {
+            LOG.trace("Outcome of delivery was rejected: {}", delivery);
+            ErrorCondition remoteError = ((Rejected) outcome).getError();
+            if (remoteError == null) {
+               remoteError = getEndpoint().getRemoteCondition();
+            }
+
+            deliveryError = AmqpSupport.convertToException(remoteError);
+         }
+         else if (outcome instanceof Released) {
+            LOG.trace("Outcome of delivery was released: {}", delivery);
+            deliveryError = new IOException("Delivery failed: released by receiver");
+         }
+         else if (outcome instanceof Modified) {
+            LOG.trace("Outcome of delivery was modified: {}", delivery);
+            deliveryError = new IOException("Delivery failed: failure at remote");
+         }
+
+         if (deliveryError != null) {
+            if (request != null && !request.isComplete()) {
+               request.onFailure(deliveryError);
+            }
+            else {
+               connection.fireClientException(deliveryError);
+            }
+         }
+
+         tagGenerator.returnTag(delivery.getTag());
+         delivery.settle();
+         toRemove.add(delivery);
+      }
+
+      pending.removeAll(toRemove);
+   }
+
+   @Override
+   public String toString() {
+      return getClass().getSimpleName() + "{ address = " + address + "}";
+   }
+}


Mime
View raw message