activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1408852 [2/2] - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/...
Date Tue, 13 Nov 2012 17:41:05 GMT
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+    AT_MOST_ONCE,
+    AT_LEAST_ONCE,
+    EXACTLY_ONCE
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum TransportState {
+    CREATED,
+    CONNECTING,
+    CONNECTED,
+    DISCONNECTING,
+    DISCONNECTED
+}

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java&r1=1406782&r2=1408852&rev=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java Tue Nov 13 17:41:01 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.amqp.hawtdispatch;
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
 
 import org.fusesource.hawtbuf.Buffer;
 

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java&r1=1406782&r2=1408852&rev=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java Tue Nov 13 17:41:01 2012
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.amqp.hawtdispatch;
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
 
 import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.EndpointImpl;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.type.messaging.Accepted;
 import org.fusesource.hawtdispatch.Task;
@@ -29,7 +30,7 @@ import java.io.IOException;
 */
 public class AmqpListener {
 
-    public Sasl processSaslConnect(TransportImpl protonTransport) {
+    public Sasl processSaslConnect(TransportImpl transport) {
         return null;
     }
 
@@ -37,46 +38,25 @@ public class AmqpListener {
         return sasl;
     }
 
-    public void processConnectionOpen(Connection conn, Task onComplete) {
-        conn.open();
-        onComplete.run();
-    }
-    public void processConnectionClose(Connection conn, Task onComplete){
-        conn.close();
-        onComplete.run();
-    }
-
-    public void proccessSessionOpen(Session session, Task onComplete){
-        session.open();
-        onComplete.run();
-    }
-    public void processSessionClose(Session session, Task onComplete){
-        session.close();
+    public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+        ((EndpointImpl)endpoint).setLocalError(new EndpointError("error", "Not supported"));
+        endpoint.close();
         onComplete.run();
     }
 
-    public void processSenderOpen(Sender sender, Task onComplete) {
-        sender.close();
-        onComplete.run();
-    }
-    public void processSenderClose(Sender sender, Task onComplete){
-        sender.close();
+    public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+        endpoint.close();
         onComplete.run();
     }
 
-    public void processReceiverOpen(Receiver receiver, Task onComplete) {
-        receiver.open();
-        onComplete.run();
-    }
-    public void processReceiverClose(Receiver receiver, Task onComplete) {
-        receiver.close();
-        onComplete.run();
+    public void processDelivery(Delivery delivery){
     }
 
-    public void processDelivery(Receiver receiver, Delivery delivery){
+    public void processTransportConnected() {
     }
 
-    public void processDelivery(Sender sender, Delivery delivery) {
+    public void processTransportFailure(IOException e) {
+        this.processFailure(e);
     }
 
     public void processFailure(Throwable e) {
@@ -86,10 +66,4 @@ public class AmqpListener {
     public void processRefill() {
     }
 
-    public void processTransportConnected() {
-    }
-
-    public void processTransportFailure(IOException e) {
-        e.printStackTrace();
-    }
 }

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java&r1=1406782&r2=1408852&rev=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java Tue Nov 13 17:41:01 2012
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.amqp.hawtdispatch;
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
 
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
@@ -29,6 +29,8 @@ import java.io.IOException;
  */
 public class AmqpProtocolCodec extends AbstractProtocolCodec {
 
+    int maxFrameSize = 4*1024*1024;
+
     @Override
     protected void encode(Object object) throws IOException {
         nextWriteBuffer.write((Buffer) object);
@@ -55,8 +57,12 @@ public class AmqpProtocolCodec extends A
             if (sizeBytes != null) {
                 int size = sizeBytes.bigEndianEditor().readInt();
                 if (size < 8) {
-                    throw new IOException(String.format("specified frame size %d smaller than minimum frame size", size));
+                    throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
+                }
+                if( size > maxFrameSize ) {
+                    throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
                 }
+
                 // TODO: check frame min and max size..
                 nextDecodeAction = readFrame(size);
                 return nextDecodeAction.apply();

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.activemq.apollo.amqp.hawtdispatch.api.Callback;
+import org.apache.activemq.apollo.amqp.hawtdispatch.api.ChainedCallback;
+import org.apache.activemq.apollo.amqp.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.type.transport.Flow;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.activemq.apollo.amqp.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+    private TransportState state = CREATED;
+
+    final DispatchQueue queue;
+    final ConnectionImpl connection = new ConnectionImpl();
+    Transport hawtdispatchTransport;
+    TransportImpl protonTransport;
+    Throwable failure;
+    CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+    public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
+
+    private AmqpTransport(DispatchQueue queue) {
+        this.queue = queue;
+        defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
+        defers.setEventHandler(new Task(){
+            public void run() {
+                for( Defer defer: defers.getData() ) {
+                    assert defer.defered = true;
+                    defer.defered = false;
+                    defer.run();
+                }
+            }
+        });
+        defers.resume();
+    }
+
+    static public AmqpTransport connect(AmqpConnectOptions options) {
+        AmqpConnectOptions opts = options.clone();
+        if( opts.getDispatchQueue() == null ) {
+            opts.setDispatchQueue(Dispatch.createQueue());
+        }
+        if( opts.getBlockingExecutor() == null ) {
+            opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+        }
+        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+    }
+
+    private AmqpTransport connecting(final AmqpConnectOptions options) {
+        assert state == CREATED;
+        try {
+            state = CONNECTING;
+            if( options.getLocalContainerId()!=null ) {
+                connection.setLocalContainerId(options.getLocalContainerId());
+            }
+            if( options.getRemoteContainerId()!=null ) {
+                connection.setContainer(options.getRemoteContainerId());
+            }
+            connection.setHostname(options.getHost().getHost());
+            Callback<Void> onConnect = new Callback<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    if( state == CONNECTED ) {
+                        hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+                        fireWatches();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable value) {
+                    if( state == CONNECTED ) {
+                        failure = value;
+                        disconnect();
+                        fireWatches();
+                    }
+                }
+            };
+            if( options.getUser()!=null ) {
+                onConnect = new SaslClientHandler(options, onConnect);
+            }
+            createTransport(options, onConnect);
+        } catch (Throwable e) {
+            failure = e;
+        }
+        fireWatches();
+        return this;
+    }
+
+    public TransportState getState() {
+        return state;
+    }
+
+    /**
+     * Creates and start a transport to the AMQP server.  Passes it to the onConnect
+     * once the transport is connected.
+     *
+     * @param onConnect
+     * @throws Exception
+     */
+    void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
+        final TcpTransport transport;
+        if( options.getSslContext() !=null ) {
+            SslTransport ssl = new SslTransport();
+            ssl.setSSLContext(options.getSslContext());
+            transport = ssl;
+        } else {
+            transport = new TcpTransport();
+        }
+
+        URI host = options.getHost();
+        if( host.getPort() == -1 ) {
+            if( options.getSslContext()!=null ) {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+            } else {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+            }
+        }
+
+
+        transport.setBlockingExecutor(options.getBlockingExecutor());
+        transport.setDispatchQueue(options.getDispatchQueue());
+
+        transport.setMaxReadRate(options.getMaxReadRate());
+        transport.setMaxWriteRate(options.getMaxWriteRate());
+        transport.setReceiveBufferSize(options.getReceiveBufferSize());
+        transport.setSendBufferSize(options.getSendBufferSize());
+        transport.setTrafficClass(options.getTrafficClass());
+        transport.setUseLocalHost(options.isUseLocalHost());
+        transport.connecting(host, options.getLocalAddress());
+
+        transport.setTransportListener(new DefaultTransportListener(){
+            public void onTransportConnected() {
+                if(state==CONNECTING) {
+                    state = CONNECTED;
+                    onConnect.onSuccess(null);
+                    transport.resumeRead();
+                }
+            }
+
+            public void onTransportFailure(final IOException error) {
+                if(state==CONNECTING) {
+                    onConnect.onFailure(error);
+                }
+            }
+
+        });
+        transport.connecting(host, options.getLocalAddress());
+        bind(transport);
+        transport.start(NOOP);
+    }
+
+    class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+        private final AmqpConnectOptions options;
+
+        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
+            super(next);
+            this.options = options;
+        }
+
+        public void onSuccess(final Void value) {
+            final Sasl s = protonTransport.sasl();
+            s.client();
+            pumpOut();
+            hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
+
+                Sasl sasl = s;
+
+                @Override
+                void process() {
+                    if (sasl != null) {
+                        sasl = processSaslEvent(sasl);
+                        if (sasl == null) {
+                            // once sasl handshake is done.. we need to read the protocol header again.
+                            ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                        }
+                    }
+                }
+
+                @Override
+                public void onTransportFailure(IOException error) {
+                    next.onFailure(error);
+                }
+
+                @Override
+                void onFailure(Throwable error) {
+                    next.onFailure(error);
+                }
+
+                boolean authSent = false;
+
+                private Sasl processSaslEvent(Sasl sasl) {
+                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+                        next.onSuccess(null);
+                        return null;
+                    }
+                    HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+                    if (!authSent && !mechanisims.isEmpty()) {
+                        if (!mechanisims.contains("PLAIN")) {
+                            next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
+                            return null;
+                        }
+                        authSent = true;
+                        DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+                        try {
+                            os.write(new UTF8Buffer(options.getUser()));
+                            os.writeByte(0);
+                            if (options.getPassword() != null) {
+                                os.write(new UTF8Buffer(options.getPassword()));
+                                os.writeByte(0);
+                            }
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                        Buffer buffer = os.toBuffer();
+                        sasl.setMechanisms(new String[]{"PLAIN"});
+                        sasl.send(buffer.data, buffer.offset, buffer.length);
+                    }
+                    return sasl;
+                }
+            });
+        }
+    }
+
+    class SaslServerListener extends AmqpTransportListener {
+        Sasl sasl;
+
+        @Override
+        public void onTransportCommand(Object command) {
+            try {
+                if (command.getClass() == AmqpHeader.class) {
+                    AmqpHeader header = (AmqpHeader)command;
+                    switch( header.getProtocolId() ) {
+                        case 3: // Client will be using SASL for auth..
+                            if( listener!=null ) {
+                                sasl = listener.processSaslConnect(protonTransport);
+                                break;
+                            }
+                        default:
+                            AmqpTransportListener listener = new AmqpTransportListener();
+                            hawtdispatchTransport.setTransportListener(listener);
+                            listener.onTransportCommand(command);
+                            return;
+                    }
+                    command = header.getBuffer();
+                }
+            } catch (Exception e) {
+                onFailure(e);
+            }
+            super.onTransportCommand(command);
+        }
+
+        @Override
+        void process() {
+            if (sasl != null) {
+                sasl = listener.processSaslEvent(sasl);
+            }
+            if (sasl == null) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+            }
+        }
+    }
+
+    static public AmqpTransport accept(Transport transport) {
+        return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+    }
+
+    private AmqpTransport accepted(final Transport transport) {
+        state = CONNECTED;
+        bind(transport);
+        hawtdispatchTransport.setTransportListener(new SaslServerListener());
+        return this;
+    }
+
+    private void bind(final Transport transport) {
+        this.hawtdispatchTransport = transport;
+        this.protonTransport = new TransportImpl();
+        this.protonTransport.bind(connection);
+        if( transport.getProtocolCodec()==null ) {
+            try {
+                transport.setProtocolCodec(new AmqpProtocolCodec());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void defer(Defer defer) {
+        if( !defer.defered ) {
+            defer.defered = true;
+            defers.merge(defer);
+        }
+    }
+
+    public void pumpOut() {
+        assertExecuting();
+        defer(deferedPumpOut);
+    }
+
+    private Defer deferedPumpOut = new Defer() {
+        public void run() {
+            doPumpOut();
+        }
+    };
+
+    private void doPumpOut() {
+        switch(state) {
+            case CONNECTING:
+            case CONNECTED:
+                break;
+            default:
+                return;
+        }
+
+        int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+        byte data[] = new byte[size];
+        boolean done = false;
+        int pumped = 0;
+        while( !done && !hawtdispatchTransport.full() ) {
+            int count = protonTransport.output(data, 0, size);
+            if( count > 0 ) {
+                pumped += count;
+                boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
+                assert accepted: "Should be accepted since the transport was not full";
+            } else {
+                done = true;
+            }
+        }
+        if( pumped > 0 && !hawtdispatchTransport.full() ) {
+            listener.processRefill();
+        }
+    }
+
+    public Sasl sasl;
+    public void fireListenerEvents() {
+        fireWatches();
+
+        if( sasl!=null ) {
+            sasl = listener.processSaslEvent(sasl);
+            if( sasl==null ) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+            }
+        }
+
+        context(connection).fireListenerEvents(listener);
+
+        Session session = connection.sessionHead(ALL_SET, ALL_SET);
+        while(session != null)
+        {
+            context(session).fireListenerEvents(listener);
+            session = session.next(ALL_SET, ALL_SET);
+        }
+
+        Link link = connection.linkHead(ALL_SET, ALL_SET);
+        while(link != null)
+        {
+            context(link).fireListenerEvents(listener);
+            link = link.next(ALL_SET, ALL_SET);
+        }
+
+        Delivery delivery = connection.getWorkHead();
+        while(delivery != null)
+        {
+            listener.processDelivery(delivery);
+            delivery = delivery.getWorkNext();
+        }
+
+        listener.processRefill();
+    }
+
+
+    public ConnectionImpl connection() {
+        return connection;
+    }
+
+    AmqpListener listener = new AmqpListener();
+    public AmqpListener getListener() {
+        return listener;
+    }
+
+    public void setListener(AmqpListener listener) {
+        this.listener = listener;
+    }
+
+    public EndpointContext context(Endpoint endpoint) {
+        EndpointContext context = (EndpointContext) endpoint.getContext();
+        if( context == null ) {
+            context = new EndpointContext(this, endpoint);
+            endpoint.setContext(context);
+        }
+        return context;
+    }
+
+    class AmqpTransportListener extends DefaultTransportListener {
+
+        @Override
+        public void onTransportConnected() {
+            if( listener!=null ) {
+                listener.processTransportConnected();
+            }
+        }
+
+        @Override
+        public void onRefill() {
+            if( listener!=null ) {
+                listener.processRefill();
+            }
+        }
+
+        @Override
+        public void onTransportCommand(Object command) {
+            if( state != CONNECTED ) {
+                return;
+            }
+            try {
+                Buffer buffer;
+                if (command.getClass() == AmqpHeader.class) {
+                    buffer = ((AmqpHeader) command).getBuffer();
+                } else {
+                    buffer = (Buffer) command;
+                }
+                protonTransport.input(buffer.data, buffer.offset, buffer.length);
+                process();
+                pumpOut();
+            } catch (Exception e) {
+                onFailure(e);
+            }
+        }
+
+        void process() {
+            fireListenerEvents();
+        }
+
+        @Override
+        public void onTransportFailure(IOException error) {
+            if( state!=CONNECTED ) {
+                failure = error;
+                listener.processTransportFailure(error);
+                fireWatches();
+            }
+        }
+
+        void onFailure(Throwable error) {
+            if( listener!=null ) {
+                failure = error;
+                listener.processFailure(error);
+                fireWatches();
+            }
+        }
+    }
+
+    public void disconnect() {
+        assertExecuting();
+        if( state == CONNECTING || state==CONNECTED) {
+            state = DISCONNECTING;
+            if( hawtdispatchTransport!=null ) {
+                hawtdispatchTransport.stop(new Task(){
+                    public void run() {
+                        state = DISCONNECTED;
+                        hawtdispatchTransport = null;
+                        protonTransport = null;
+                    }
+                });
+            }
+        }
+    }
+
+    public DispatchQueue queue() {
+        return queue;
+    }
+
+    public void assertExecuting() {
+        queue().assertExecuting();
+    }
+
+    public void onTransportConnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure !=null ) {
+                    cb.onFailure(failure);
+                    return true;
+                }
+                if( state!=CONNECTING ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportDisconnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( state!=DISCONNECTED ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportFailure(final Callback<Throwable> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure!=null ) {
+                    cb.onSuccess(failure);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        protonTransport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return protonTransport.getProtocolTracer();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+    boolean defered;
+}

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java&r1=1406782&r2=1408852&rev=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java Tue Nov 13 17:41:01 2012
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.amqp.hawtdispatch;
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
 
 import org.apache.qpid.proton.codec.WritableBuffer;
 

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+    private final AmqpTransport transport;
+    private final Endpoint endpoint;
+    private Object attachment;
+    boolean listenerProcessing;
+
+    public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+        this.transport = transport;
+        this.endpoint = endpoint;
+    }
+
+    class ProcessedTask extends Task {
+        @Override
+        public void run() {
+            transport.assertExecuting();
+            listenerProcessing = false;
+            transport.pumpOut();
+        }
+    }
+
+    public void fireListenerEvents(AmqpListener listener) {
+        if( listener!=null && !listenerProcessing ) {
+            if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+                endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+                listenerProcessing = true;
+                listener.processRemoteOpen(endpoint, new ProcessedTask());
+            } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+                endpoint.getRemoteState() == EndpointState.CLOSED ) {
+                listenerProcessing = true;
+                listener.processRemoteClose(endpoint, new ProcessedTask());
+            }
+        }
+        if( attachment !=null && attachment instanceof Task ) {
+            ((Task) attachment).run();
+        }
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public <T> T getAttachment(Class<T> clazz) {
+        return clazz.cast(getAttachment());
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Support {
+
+    public static IllegalStateException illegalState(String msg) {
+        return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
+    }
+
+    public static IllegalStateException createUnhandledEventError() {
+        return illegalState("Unhandled event.");
+    }
+
+    public static IllegalStateException createListenerNotSetError() {
+        return illegalState("No connection listener set to handle message received from the server.");
+    }
+
+    public static IllegalStateException createDisconnectedError() {
+        return illegalState("Disconnected");
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class Watch {
+    /* returns true if the watch has been triggered */
+    public abstract boolean execute();
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.impl;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+    private LinkedList<Watch> watches = new LinkedList<Watch>();
+    protected void addWatch(final Watch task) {
+        watches.add(task);
+        fireWatches();
+    }
+
+    protected void fireWatches() {
+        if( !this.watches.isEmpty() ) {
+            Dispatch.getCurrentQueue().execute(new Task(){
+                @Override
+                public void run() {
+                    // Lets see if any of the watches are triggered.
+                    LinkedList<Watch> tmp = watches;
+                    watches = new LinkedList<Watch>();
+                    for (Watch task : tmp) {
+                        if( !task.execute() ) {
+                            watches.add(task);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties Tue Nov 13 17:41:01 2012
@@ -0,0 +1,45 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# This file configures how Java Util Logging is handled.
+#
+#
+handlers=java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# Loggers and Handlers may override this level
+#.level=ALL
+#RAW.level=ALL
+FRM.level=ALL
+
+## Loggers
+## ------------------------------------------
+## Loggers are usually attached to packages.
+## Here, the level for each package is specified.
+## The global level is used by default, so levels
+## specified here simply act as an override.
+#myapp.ui.level=ALL
+#myapp.business.level=CONFIG
+#myapp.data.level=SEVERE
+
+# Handlers
+# -----------------------------------------
+
+# --- ConsoleHandler ---
+# Override of global logging level
+java.util.logging.ConsoleHandler.level=ALL
+java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala Tue Nov 13 17:41:01 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.test
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.api._
+import org.apache.qpid.proton.`type`.messaging.{AmqpValue, Source, Target}
+import java.util.concurrent.CountDownLatch
+import org.fusesource.hawtdispatch._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+class AmqpConnectionTest extends AmqpTestSupport {
+
+  def print_result[T](action: String)(then: => Unit): Callback[T] = new Callback[T] {
+    def onSuccess(value: T) {
+      println(action + " completed");
+      then
+    }
+
+    def onFailure(value: Throwable) {
+      println(action + " failed: " + value);
+      value.printStackTrace()
+    }
+  }
+
+  def then[T](func: (T) => Unit): Callback[T] = new Callback[T] {
+    def onSuccess(value: T) {
+      func(value)
+    }
+
+    def onFailure(value: Throwable) {
+      value.printStackTrace()
+    }
+  }
+
+  test("Sender Open") {
+    val amqp = new AmqpConnectOptions();
+    amqp.setHost("localhost", port)
+    amqp.setUser("admin");
+    amqp.setPassword("password");
+
+    val done = new CountDownLatch(1)
+    val connection = AmqpConnection.connect(amqp)
+    connection.queue() {
+      var session = connection.createSession()
+      val target = new Target
+      target.setAddress("/queue/FOO")
+      val sender = session.createSender(target);
+      val md = sender.send(session.createTextMessage("Hello World"))
+      md.onSettle(print_result("message sent") {
+        println("========================================================")
+        println("========================================================")
+        val source = new Source
+        source.setAddress("/queue/FOO")
+        val receiver = session.createReceiver(source);
+        receiver.resume()
+        receiver.setDeliveryListener(new AmqpDeliveryListener {
+          def onMessageDelivery(delivery: MessageDelivery) = {
+            println("Received: " + delivery.getMessage().getBody().asInstanceOf[AmqpValue].getValue);
+            delivery.settle()
+            done.countDown()
+          }
+        })
+      })
+    }
+
+    done.await
+    connection.waitForDisconnected()
+  }
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala Tue Nov 13 17:41:01 2012
@@ -20,10 +20,37 @@ package org.apache.activemq.apollo.amqp.
 import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl}
 import javax.jms._
 
+
+
+
+
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+object QpidJmsTest {
+  def enableJMSFrameTracing {
+    val out = System.out // new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt")))
+    val handler = new Handler {
+      setLevel(Level.ALL)
+      def publish(r: LogRecord)  = out.println(String.format("%s:%s", r.getLoggerName, r.getMessage))
+      def flush = out.flush
+      def close {}
+    }
+    var log = Logger.getLogger("FRM")
+    log.setLevel(Level.ALL)
+    log.addHandler(handler)
+
+//    log = Logger.getLogger("RAW")
+//    log.setLevel(Level.ALL)
+//    log.addHandler(handler)
+  }
+}
+
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-
 class QpidJmsTest extends AmqpTestSupport {
 
   def createConnection: Connection = {
@@ -56,6 +83,7 @@ class QpidJmsTest extends AmqpTestSuppor
 //  }
 
   test("Send Nack Receive") {
+    // enableJMSFrameTracing
     val queue = new QueueImpl("/queue/testqueue")
     val nMsgs = 1
     val dataFormat: String = "%01024d"

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Nov 13 17:41:01 2012
@@ -263,4 +263,6 @@ class Delivery {
     record
   }
 
+  def redelivered = redeliveries = ((redeliveries+1).min(Short.MaxValue)).toShort
+
 }



Mime
View raw message