activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5602
Date Fri, 13 Mar 2015 19:48:24 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 10c47d69d -> 72839b78a


http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
new file mode 100644
index 0000000..7aa8c62
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.InetAddressUtil;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple TCP based transport used by the client.
+ */
+public class ClientTcpTransport implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientTcpTransport.class);
+
+    public interface TransportListener {
+
+        /**
+         * Called when new incoming data has become available.
+         *
+         * @param incoming
+         *        the next incoming packet of data.
+         */
+        void onData(Buffer incoming);
+
+        /**
+         * Called if the connection state becomes closed.
+         */
+        void onTransportClosed();
+
+        /**
+         * Called when an error occurs during normal Transport operations.
+         *
+         * @param cause
+         *        the error that triggered this event.
+         */
+        void onTransportError(Throwable cause);
+
+    }
+
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private final Socket socket;
+    private DataOutputStream dataOut;
+    private DataInputStream dataIn;
+    private Thread runner;
+    private TransportListener listener;
+
+    private int socketBufferSize = 64 * 1024;
+    private int soTimeout = 0;
+    private int soLinger = Integer.MIN_VALUE;
+    private Boolean keepAlive;
+    private Boolean tcpNoDelay = true;
+    private boolean useLocalHost = false;
+    private int ioBufferSize = 8 * 1024;
+
+    /**
+     * Create a new instance of the transport.
+     *
+     * @param listener
+     *        The TransportListener that will receive data from this Transport instance.
+     * @param remoteLocation
+     *        The remote location where this transport should connection to.
+     */
+    public ClientTcpTransport(URI remoteLocation) {
+        this.remoteLocation = remoteLocation;
+
+        Socket temp = null;
+        try {
+            temp = createSocketFactory().createSocket();
+        } catch (IOException e) {
+            connectionError.set(e);
+        }
+
+        this.socket = temp;
+    }
+
+    public void connect() throws IOException {
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+
+        if (listener == null) {
+            throw new IllegalStateException("Cannot connect until a listener has been set.");
+        }
+
+        if (socket == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        }
+
+        socket.connect(remoteAddress);
+
+        connected.set(true);
+
+        initialiseSocket(socket);
+        initializeStreams();
+
+        runner = new Thread(null, this, "ClientTcpTransport: " + toString());
+        runner.setDaemon(false);
+        runner.start();
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            if (socket == null) {
+                return;
+            }
+
+            // Closing the streams flush the sockets before closing.. if the socket
+            // is hung.. then this hangs the close so we perform an asynchronous close
+            // by default which will timeout if the close doesn't happen after a delay.
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final ExecutorService closer = Executors.newSingleThreadExecutor();
+            closer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.trace("Closing socket {}", socket);
+                    try {
+                        socket.close();
+                        LOG.debug("Closed socket {}", socket);
+                    } catch (IOException e) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+                        }
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+
+            try {
+                latch.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } finally {
+                closer.shutdownNow();
+            }
+        }
+    }
+
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        LOG.trace("Client Transport sending packet of size: {}", output.remaining());
+        WritableByteChannel channel = Channels.newChannel(dataOut);
+        channel.write(output);
+        dataOut.flush();
+    }
+
+    public void send(Buffer output) throws IOException {
+        checkConnected();
+        send(output.toByteBuffer());
+    }
+
+    public URI getRemoteURI() {
+        return this.remoteLocation;
+    }
+
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getIoBufferSize() {
+        return ioBufferSize;
+    }
+
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    //---------- Transport internal implementation ---------------------------//
+
+    @Override
+    public void run() {
+        LOG.trace("TCP consumer thread for {} starting", this);
+        try {
+            while (isConnected()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            connectionError.set(e);
+            onException(e);
+        } catch (Throwable e) {
+            IOException ioe = new IOException("Unexpected error occured: " + e);
+            connectionError.set(ioe);
+            ioe.initCause(e);
+            onException(ioe);
+        }
+    }
+
+    protected void doRun() throws IOException {
+        int size = dataIn.available();
+        if (size <= 0) {
+            try {
+                TimeUnit.NANOSECONDS.sleep(1);
+            } catch (InterruptedException e) {
+            }
+            return;
+        }
+
+        byte[] buffer = new byte[size];
+        dataIn.readFully(buffer);
+        Buffer incoming = new Buffer(buffer);
+        listener.onData(incoming);
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (listener != null) {
+            try {
+                listener.onTransportError(e);
+            } catch (RuntimeException e2) {
+                LOG.debug("Unexpected runtime exception: {}", e2.getMessage(), e2);
+            }
+        }
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+
+    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+        try {
+            sock.setReceiveBufferSize(socketBufferSize);
+            sock.setSendBufferSize(socketBufferSize);
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
+            LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+        }
+
+        sock.setSoTimeout(soTimeout);
+
+        if (keepAlive != null) {
+            sock.setKeepAlive(keepAlive.booleanValue());
+        }
+
+        if (soLinger > -1) {
+            sock.setSoLinger(true, soLinger);
+        } else if (soLinger == -1) {
+            sock.setSoLinger(false, 0);
+        }
+
+        if (tcpNoDelay != null) {
+            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+        }
+    }
+
+    protected void initializeStreams() throws IOException {
+        try {
+            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+            this.dataIn = new DataInputStream(buffIn);
+            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+            this.dataOut = new DataOutputStream(outputStream);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        if (isUseLocalHost()) {
+            String localName = InetAddressUtil.getLocalHostName();
+            if (localName != null && localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
new file mode 100644
index 0000000..b67b305
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Connection wrapper used to prevent test code from accidentally
+ * modifying Connection state.
+ */
+public class UnmodifiableConnection implements Connection {
+
+    private final Connection connection;
+
+    public UnmodifiableConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return connection.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return connection.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return connection.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return connection.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Session session() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Session head = connection.sessionHead(local, remote);
+        if (head != null) {
+            head = new UnmodifiableSession(head);
+        }
+
+        return head;
+    }
+
+    @Override
+    public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        // TODO - If implemented this method should return an unmodifiable link isntance.
+        return null;
+    }
+
+    @Override
+    public Delivery getWorkHead() {
+        // TODO - If implemented this method should return an unmodifiable delivery isntance.
+        return null;
+    }
+
+    @Override
+    public void setContainer(String container) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void setHostname(String hostname) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public String getHostname() {
+        return connection.getHostname();
+    }
+
+    @Override
+    public String getRemoteContainer() {
+        return connection.getRemoteContainer();
+    }
+
+    @Override
+    public String getRemoteHostname() {
+        return connection.getRemoteHostname();
+    }
+
+    @Override
+    public void setOfferedCapabilities(Symbol[] capabilities) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void setDesiredCapabilities(Symbol[] capabilities) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Symbol[] getRemoteOfferedCapabilities() {
+        return connection.getRemoteOfferedCapabilities();
+    }
+
+    @Override
+    public Symbol[] getRemoteDesiredCapabilities() {
+        return connection.getRemoteDesiredCapabilities();
+    }
+
+    @Override
+    public Map<Symbol, Object> getRemoteProperties() {
+        return connection.getRemoteProperties();
+    }
+
+    @Override
+    public void setProperties(Map<Symbol, Object> properties) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Object getContext() {
+        return connection.getContext();
+    }
+
+    @Override
+    public void setContext(Object context) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void collect(Collector collector) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
new file mode 100644
index 0000000..fd99665
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Unmodifiable Delivery wrapper used to prevent test code from accidentally
+ * modifying Delivery state.
+ */
+public class UnmodifiableDelivery implements Delivery {
+
+    private final Delivery delivery;
+
+    public UnmodifiableDelivery(Delivery delivery) {
+        this.delivery = delivery;
+    }
+
+    @Override
+    public byte[] getTag() {
+        return delivery.getTag();
+    }
+
+    @Override
+    public Link getLink() {
+        if (delivery.getLink() instanceof Sender) {
+            return new UnmodifiableSender((Sender) delivery.getLink());
+        } else if (delivery.getLink() instanceof Receiver) {
+            return new UnmodifiableReceiver((Receiver) delivery.getLink());
+        } else {
+            throw new IllegalStateException("Delivery has unknown link type");
+        }
+    }
+
+    @Override
+    public DeliveryState getLocalState() {
+        return delivery.getLocalState();
+    }
+
+    @Override
+    public DeliveryState getRemoteState() {
+        return delivery.getRemoteState();
+    }
+
+    @Override
+    public int getMessageFormat() {
+        return delivery.getMessageFormat();
+    }
+
+    @Override
+    public void disposition(DeliveryState state) {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public void settle() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public boolean isSettled() {
+        return delivery.isSettled();
+    }
+
+    @Override
+    public boolean remotelySettled() {
+        return delivery.remotelySettled();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public Delivery getWorkNext() {
+        return new UnmodifiableDelivery(delivery.getWorkNext());
+    }
+
+    @Override
+    public Delivery next() {
+        return new UnmodifiableDelivery(delivery.next());
+    }
+
+    @Override
+    public boolean isWritable() {
+        return delivery.isWritable();
+    }
+
+    @Override
+    public boolean isReadable() {
+        return delivery.isReadable();
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public Object getContext() {
+        return delivery.getContext();
+    }
+
+    @Override
+    public boolean isUpdated() {
+        return delivery.isUpdated();
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public boolean isPartial() {
+        return delivery.isPartial();
+    }
+
+    @Override
+    public int pending() {
+        return delivery.pending();
+    }
+
+    @Override
+    public boolean isBuffered() {
+        return delivery.isBuffered();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
new file mode 100644
index 0000000..70665c0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.amqp.transport.Source;
+import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Session wrapper used to prevent test code from accidentally
+ * modifying Session state.
+ */
+public class UnmodifiableLink implements Link {
+
+    private final Link link;
+
+    public UnmodifiableLink(Link link) {
+        this.link = link;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return link.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return link.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return link.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return link.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Object getContext() {
+        return link.getContext();
+    }
+
+    @Override
+    public String getName() {
+        return link.getName();
+    }
+
+    @Override
+    public Delivery delivery(byte[] tag) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Delivery delivery(byte[] tag, int offset, int length) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Delivery head() {
+        return new UnmodifiableDelivery(link.head());
+    }
+
+    @Override
+    public Delivery current() {
+        return new UnmodifiableDelivery(link.current());
+    }
+
+    @Override
+    public boolean advance() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Source getSource() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Target getTarget() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setSource(Source address) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void setTarget(Target address) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Source getRemoteSource() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Target getRemoteTarget() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Link next = link.next(local, remote);
+
+        if (next != null) {
+            if (next instanceof Sender) {
+                next = new UnmodifiableSender((Sender) next);
+            } else {
+                next = new UnmodifiableReceiver((Receiver) next);
+            }
+        }
+
+        return next;
+    }
+
+    @Override
+    public int getCredit() {
+        return link.getCredit();
+    }
+
+    @Override
+    public int getQueued() {
+        return link.getQueued();
+    }
+
+    @Override
+    public int getUnsettled() {
+        return link.getUnsettled();
+    }
+
+    @Override
+    public Session getSession() {
+        return new UnmodifiableSession(link.getSession());
+    }
+
+    @Override
+    public SenderSettleMode getSenderSettleMode() {
+        return link.getSenderSettleMode();
+    }
+
+    @Override
+    public void setSenderSettleMode(SenderSettleMode senderSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public SenderSettleMode getRemoteSenderSettleMode() {
+        return link.getRemoteSenderSettleMode();
+    }
+
+    @Override
+    public ReceiverSettleMode getReceiverSettleMode() {
+        return link.getReceiverSettleMode();
+    }
+
+    @Override
+    public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public ReceiverSettleMode getRemoteReceiverSettleMode() {
+        return link.getRemoteReceiverSettleMode();
+    }
+
+    @Override
+    public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int drained() {
+        return link.drained();  // TODO - Is this a mutating call?
+    }
+
+    @Override
+    public int getRemoteCredit() {
+        return link.getRemoteCredit();
+    }
+
+    @Override
+    public boolean getDrain() {
+        return link.getDrain();
+    }
+
+    @Override
+    public void detach() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
new file mode 100644
index 0000000..1b07ed0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.engine.Receiver;
+
+/**
+ * Unmodifiable Receiver wrapper used to prevent test code from accidentally
+ * modifying Receiver state.
+ */
+public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver {
+
+    private final Receiver receiver;
+
+    public UnmodifiableReceiver(Receiver receiver) {
+        super(receiver);
+
+        this.receiver = receiver;
+    }
+
+    @Override
+    public void flow(int credits) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int recv(byte[] bytes, int offset, int size) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void drain(int credit) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public boolean draining() {
+        return receiver.draining();
+    }
+
+    @Override
+    public void setDrain(boolean drain) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
new file mode 100644
index 0000000..1517a93
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Unmodifiable Sender wrapper used to prevent test code from accidentally
+ * modifying Sender state.
+ */
+public class UnmodifiableSender extends UnmodifiableLink implements Sender {
+
+    public UnmodifiableSender(Sender sender) {
+        super(sender);
+    }
+
+    @Override
+    public void offer(int credits) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int send(byte[] bytes, int offset, int length) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void abort() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
new file mode 100644
index 0000000..6a73e0f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Session wrapper used to prevent test code from accidentally
+ * modifying Session state.
+ */
+public class UnmodifiableSession implements Session {
+
+    private final Session session;
+
+    public UnmodifiableSession(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return session.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return session.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return session.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return session.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Object getContext() {
+        return session.getContext();
+    }
+
+    @Override
+    public Sender sender(String name) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Receiver receiver(String name) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Session next = session.next(local, remote);
+        if (next != null) {
+            next = new UnmodifiableSession(next);
+        }
+
+        return next;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return new UnmodifiableConnection(session.getConnection());
+    }
+
+    @Override
+    public int getIncomingCapacity() {
+        return session.getIncomingCapacity();
+    }
+
+    @Override
+    public void setIncomingCapacity(int bytes) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public int getIncomingBytes() {
+        return session.getIncomingBytes();
+    }
+
+    @Override
+    public int getOutgoingBytes() {
+        return session.getOutgoingBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
new file mode 100644
index 0000000..bb2d983
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Base class used to wrap one AsyncResult with another.
+ */
+public abstract class WrappedAsyncResult implements AsyncResult {
+
+    protected final AsyncResult wrapped;
+
+    /**
+     * Create a new WrappedAsyncResult for the target AsyncResult
+     */
+    public WrappedAsyncResult(AsyncResult wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public void onFailure(Throwable result) {
+        if (wrapped != null) {
+            wrapped.onFailure(result);
+        }
+    }
+
+    @Override
+    public void onSuccess() {
+        if (wrapped != null) {
+            wrapped.onSuccess();
+        }
+    }
+
+    @Override
+    public boolean isComplete() {
+        if (wrapped != null) {
+            return wrapped.isComplete();
+        }
+
+        return false;
+    }
+
+    public AsyncResult getWrappedRequest() {
+        return wrapped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
new file mode 100644
index 0000000..a35709d
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpStateInspector;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+
+/**
+ * Test broker handling of AMQP connections with various configurations.
+ */
+public class AmqpConnectionsTest extends AmqpClientTestSupport {
+
+    private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+    private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+
+    @Test(timeout = 60000)
+    public void testCanConnect() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection = client.connect();
+        assertNotNull(connection);
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection.close();
+
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testConnectionCarriesExpectedCapabilities() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        client.setStateInspector(new AmqpStateInspector() {
+
+            @Override
+            public void inspectOpenedResource(Connection connection) {
+
+                Symbol[] offered = connection.getRemoteOfferedCapabilities();
+                if (!contains(offered, ANONYMOUS_RELAY)) {
+                    markAsInvalid("Broker did not indicate it support anonymous relay");
+                }
+
+                Map<Symbol, Object> properties = connection.getRemoteProperties();
+                if (!properties.containsKey(QUEUE_PREFIX)) {
+                    markAsInvalid("Broker did not send a queue prefix value");
+                }
+
+                if (!properties.containsKey(TOPIC_PREFIX)) {
+                    markAsInvalid("Broker did not send a queue prefix value");
+                }
+            }
+        });
+
+        AmqpConnection connection = client.connect();
+        assertNotNull(connection);
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection.close();
+
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testCanConnectWithDifferentContainerIds() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection1 = client.createConnection();
+        AmqpConnection connection2 = client.createConnection();
+
+        connection1.setContainerId(getTestName() + "-Client:1");
+        connection2.setContainerId(getTestName() + "-Client:2");
+
+        connection1.connect();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.connect();
+        assertEquals(2, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection1.close();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.close();
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testCannotConnectWithSameContainerId() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection1 = client.createConnection();
+        AmqpConnection connection2 = client.createConnection();
+
+        connection1.setContainerId(getTestName());
+        connection2.setContainerId(getTestName());
+
+        connection1.connect();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.setStateInspector(new AmqpStateInspector() {
+
+            @Override
+            public void inspectOpenedResource(Connection connection) {
+                if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+                    markAsInvalid("Broker did not set connection establishment failed property");
+                }
+            }
+
+            @Override
+            public void inspectClosedResource(Connection connection) {
+                ErrorCondition remoteError = connection.getRemoteCondition();
+                if (remoteError == null) {
+                    markAsInvalid("Broker dd not add error condition for duplicate client ID");
+                }
+
+                if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
+                    markAsInvalid("Broker dd not set condition to " + AmqpError.INVALID_FIELD);
+                }
+            }
+        });
+
+        try {
+            connection2.connect();
+            //fail("Should not be able to connect with same container Id.");
+        } catch (Exception ex) {
+            LOG.info("Second connection with same container Id failed as expected.");
+        }
+
+        connection2.getStateInspector().assertIfStateChecksFailed();
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection1.close();
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
new file mode 100644
index 0000000..1245811
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverTest extends AmqpClientTestSupport {
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateQueueReceiver() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateTopicReceiver() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getTopics().length);
+
+        AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getTopics().length);
+        assertNotNull(getProxyToTopic(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getTopicSubscribers().length);
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueReceiverReadMessage() throws Exception {
+        sendMessages(getTestName(), 1, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(1, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.flow(1);
+        assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        receiver.close();
+
+        assertEquals(1, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
+        int MSG_COUNT = 4;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(2);
+        assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+        assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(2);
+        assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+        assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+
+        assertEquals(MSG_COUNT, queueView.getDispatchCount());
+        assertEquals(0, queueView.getDequeueCount());
+
+        receiver1.close();
+        receiver2.close();
+
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Ignore("Fails due to issues with accept and no credit")
+    @Test(timeout = 60000)
+    public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
+        int MSG_COUNT = 4;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(2);
+        AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(2);
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertEquals(MSG_COUNT, queueView.getDispatchCount());
+        assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 4;
+            }
+        }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        receiver1.close();
+        receiver2.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(20);
+
+        assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getInFlightCount() >= 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        receiver1.close();
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(MSG_COUNT * 2);
+        AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        receiver2.close();
+
+        assertEquals(MSG_COUNT - 2, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Ignore("Test fails currently due to improper implementation of drain.")
+    @Test(timeout = 60000)
+    public void testReceiverCanDrainMessages() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.drain(MSG_COUNT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+        }
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
new file mode 100644
index 0000000..3f6a454
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test broker behavior when creating AMQP senders
+ */
+public class AmqpSenderTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateQueueSender() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateTopicSender() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getTopics().length);
+
+        AmqpSender sender = session.createSender("topic://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getTopics().length);
+        assertNotNull(getProxyToTopic(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getTopicProducers().length);
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getTopicProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToQueue() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpMessage message = new AmqpMessage();
+
+        message.setText("Test-Message");
+
+        sender.send(message);
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        assertEquals(1, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
new file mode 100644
index 0000000..b8f456f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test for creation and configuration of AMQP sessions.
+ */
+public class AmqpSessionTest extends AmqpClientTestSupport {
+
+    @Test
+    public void testCreateSession() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        assertNotNull(session);
+        connection.close();
+    }
+}


Mime
View raw message