activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1454: Support SASL in outgoing AMQP
Date Mon, 09 Oct 2017 12:30:14 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 30ba65a08 -> 88e1fdc78


ARTEMIS-1454: Support SASL in outgoing AMQP

Update ProtonHandler to allow for both client and server side SASL
and other related changes to allow for setting of client side mechanism


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cc8a0cb9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cc8a0cb9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cc8a0cb9

Branch: refs/heads/master
Commit: cc8a0cb90eba71aa595f0ed15814ae3bec326af2
Parents: 30ba65a
Author: Robert Godfrey <rgodfrey@apache.org>
Authored: Wed Sep 27 23:45:14 2017 +0200
Committer: rgodfrey <rgodfrey@apache.org>
Committed: Mon Oct 9 10:05:35 2017 +0200

----------------------------------------------------------------------
 .../ActiveMQProtonRemotingConnection.java       |   4 +
 .../amqp/broker/ProtonProtocolManager.java      |   2 +-
 .../client/AMQPClientConnectionFactory.java     |   6 +-
 .../client/ProtonClientConnectionManager.java   |  16 +-
 .../amqp/proton/AMQPConnectionContext.java      | 106 ++++++------
 .../amqp/proton/handler/EventHandler.java       |  62 ++++---
 .../amqp/proton/handler/ProtonHandler.java      | 169 +++++++++++++++----
 .../artemis/protocol/amqp/sasl/ClientSASL.java  |  23 +++
 .../protocol/amqp/sasl/ClientSASLFactory.java   |  21 +++
 .../amqp/AmqpOutboundConnectionTest.java        |  91 +++++++++-
 10 files changed, 367 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index c3533eb..fb6ca0a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -175,4 +175,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
    public String getClientID() {
       return amqpConnection.getContainer();
    }
+
+   public void open() {
+      amqpConnection.open();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 8f88d8f..cd35664 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -124,7 +124,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
 
       String id = server.getConfiguration().getName();
       boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, null, null);
 
       Executor executor = server.getExecutorFactory().getExecutor();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 4e532bb..c633db8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.amqp.Symbol;
 
@@ -49,19 +50,18 @@ public class AMQPClientConnectionFactory {
       this.useCoreSubscriptionNaming = false;
    }
 
-   public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager,
Connection connection, Optional<EventHandler> eventHandler) {
+   public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager,
Connection connection, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory)
{
       AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager,
connection, server.getExecutorFactory().getExecutor(), server);
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
useCoreSubscriptionNaming, server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
useCoreSubscriptionNaming, server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager,
amqpConnection, connection, executor);
 
       connectionCallback.setProtonConnectionDelegate(delegate);
 
-      amqpConnection.open(connectionProperties);
       return delegate;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
index ec9136f..df0de77 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -40,16 +41,19 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
    private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
    private final AMQPClientConnectionFactory connectionFactory;
    private final Optional<EventHandler> eventHandler;
+   private final ClientSASLFactory clientSASLFactory;
 
-   public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler>
eventHandler) {
+   public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler>
eventHandler, ClientSASLFactory clientSASLFactory) {
       this.connectionFactory = connectionFactory;
       this.eventHandler = eventHandler;
+      this.clientSASLFactory = clientSASLFactory;
    }
 
    @Override
    public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager
protocolManager) {
-      ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager,
connection, eventHandler);
+      ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager,
connection, eventHandler, clientSASLFactory);
       connectionMap.put(connection.getID(), amqpConnection);
+      amqpConnection.open();
 
       log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
    }
@@ -60,6 +64,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
       if (connection != null) {
          log.info("Connection " + connection.getRemoteAddress() + " destroyed");
          connection.disconnect(false);
+      } else {
+         log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
       }
    }
 
@@ -69,6 +75,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
       if (connection != null) {
          log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
          connection.fail(me);
+      } else {
+         log.error("Connection with id " + connectionID + " not found in connectionException");
       }
    }
 
@@ -78,6 +86,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
       if (connection != null) {
          log.info("Connection " + connection.getRemoteAddress() + " ready");
          connection.getTransportConnection().fireReady(true);
+      } else {
+         log.error("Connection with id " + connectionID + " not found in connectionReadyForWrites()!");
       }
    }
 
@@ -92,6 +102,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
       RemotingConnection connection = connectionMap.get(connectionID);
       if (connection != null) {
          connection.bufferReceived(connectionID, buffer);
+      } else {
+         log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 680111a..5d376f1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
@@ -68,6 +69,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
 
    protected AMQPConnectionCallback connectionCallback;
    private final String containerId;
+   private final boolean isIncomingConnection;
+   private final ClientSASLFactory saslClientFactory;
    private final Map<Symbol, Object> connectionProperties = new HashMap<>();
    private final ScheduledExecutorService scheduledPool;
 
@@ -84,19 +87,28 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
                                 int maxFrameSize,
                                 int channelMax,
                                 boolean useCoreSubscriptionNaming,
-                                ScheduledExecutorService scheduledPool) {
+                                ScheduledExecutorService scheduledPool,
+                                boolean isIncomingConnection,
+                                ClientSASLFactory saslClientFactory,
+                                Map<Symbol, Object> connectionProperties) {
 
       this.protocolManager = protocolManager;
       this.connectionCallback = connectionSP;
       this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
       this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
+      this.isIncomingConnection = isIncomingConnection;
+      this.saslClientFactory = saslClientFactory;
 
-      connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
-      connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
+      this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
+      this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
+
+      if (connectionProperties != null) {
+         this.connectionProperties.putAll(connectionProperties);
+      }
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
+      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(),
isIncomingConnection);
       handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
@@ -106,6 +118,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
       transport.setChannelMax(channelMax);
       transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
       transport.setMaxFrameSize(maxFrameSize);
+      if (!isIncomingConnection && saslClientFactory != null) {
+         handler.createClientSASL();
+      }
+   }
+
+   public boolean isIncomingConnection() {
+      return isIncomingConnection;
+   }
+
+   public ClientSASLFactory getSaslClientFactory() {
+      return saslClientFactory;
    }
 
    protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException
{
@@ -232,7 +255,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
       return ExtCapability.getCapabilities();
    }
 
-   public void open(Map<Symbol, Object> connectionProperties) {
+   public void open() {
       handler.open(containerId, connectionProperties);
    }
 
@@ -271,56 +294,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
    }
 
    @Override
-   public void onInit(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalOpen(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalClose(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onFinal(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onInit(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onFinal(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onInit(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalOpen(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalClose(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onFinal(Link link) throws Exception {
-
-   }
-
-   @Override
    public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
       if (sasl) {
          // configured mech in decreasing order of preference
@@ -344,6 +317,25 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
    }
 
    @Override
+   public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms)
{
+      if (saslClientFactory != null) {
+         handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms));
+      }
+   }
+
+   @Override
+   public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection)
{
+      connectionCallback.close();
+      handler.close(null);
+   }
+
+   @Override
+   public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection)
{
+      connection.open();
+      flush();
+   }
+
+   @Override
    public void onTransport(Transport transport) {
       handler.flushBytes();
    }
@@ -438,10 +430,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
    }
 
    @Override
-   public void onLocalClose(Session session) throws Exception {
-   }
-
-   @Override
    public void onRemoteClose(Session session) throws Exception {
       lock();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 8b99284..34fba0c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -29,58 +29,66 @@ import org.apache.qpid.proton.engine.Transport;
  */
 public interface EventHandler {
 
-   void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
+   default void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
}
 
-   void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech);
+   default void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { }
 
-   void onInit(Connection connection) throws Exception;
+   default void onAuthFailed(ProtonHandler protonHandler, Connection connection) { }
 
-   void onLocalOpen(Connection connection) throws Exception;
+   default void onAuthSuccess(ProtonHandler protonHandler, Connection connection) { }
 
-   void onRemoteOpen(Connection connection) throws Exception;
+   default void onSaslMechanismsOffered(ProtonHandler handler, String[] mechanisms) { }
 
-   void onLocalClose(Connection connection) throws Exception;
+   default void onInit(Connection connection) throws Exception { }
 
-   void onRemoteClose(Connection connection) throws Exception;
+   default void onLocalOpen(Connection connection) throws Exception { }
 
-   void onFinal(Connection connection) throws Exception;
+   default void onRemoteOpen(Connection connection) throws Exception { }
 
-   void onInit(Session session) throws Exception;
+   default void onLocalClose(Connection connection) throws Exception { }
 
-   void onLocalOpen(Session session) throws Exception;
+   default void onRemoteClose(Connection connection) throws Exception { }
 
-   void onRemoteOpen(Session session) throws Exception;
+   default void onFinal(Connection connection) throws Exception { }
 
-   void onLocalClose(Session session) throws Exception;
+   default void onInit(Session session) throws Exception { }
 
-   void onRemoteClose(Session session) throws Exception;
+   default void onLocalOpen(Session session) throws Exception { }
 
-   void onFinal(Session session) throws Exception;
+   default void onRemoteOpen(Session session) throws Exception { }
 
-   void onInit(Link link) throws Exception;
+   default void onLocalClose(Session session) throws Exception { }
 
-   void onLocalOpen(Link link) throws Exception;
+   default void onRemoteClose(Session session) throws Exception { }
 
-   void onRemoteOpen(Link link) throws Exception;
+   default void onFinal(Session session) throws Exception { }
 
-   void onLocalClose(Link link) throws Exception;
+   default void onInit(Link link) throws Exception { }
 
-   void onRemoteClose(Link link) throws Exception;
+   default void onLocalOpen(Link link) throws Exception { }
 
-   void onFlow(Link link) throws Exception;
+   default void onRemoteOpen(Link link) throws Exception { }
 
-   void onFinal(Link link) throws Exception;
+   default void onLocalClose(Link link) throws Exception { }
 
-   void onRemoteDetach(Link link) throws Exception;
+   default void onRemoteClose(Link link) throws Exception { }
 
-   void onLocalDetach(Link link) throws Exception;
+   default void onFlow(Link link) throws Exception { }
 
-   void onDelivery(Delivery delivery) throws Exception;
+   default void onFinal(Link link) throws Exception { }
 
-   void onTransport(Transport transport) throws Exception;
+   default void onRemoteDetach(Link link) throws Exception { }
 
-   void pushBytes(ByteBuf bytes);
+   default void onLocalDetach(Link link) throws Exception { }
 
-   boolean flowControl(ReadyListener readyListener);
+   default void onDelivery(Delivery delivery) throws Exception { }
+
+   default void onTransport(Transport transport) throws Exception { }
+
+   default void pushBytes(ByteBuf bytes) { }
+
+   default boolean flowControl(ReadyListener readyListener) {
+      return true;
+   }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 918b383..54201ea 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
+import javax.security.auth.Subject;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -25,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -60,14 +63,17 @@ public class ProtonHandler extends ProtonInitializable {
 
    private List<EventHandler> handlers = new ArrayList<>();
 
-   private Sasl serverSasl;
+   private Sasl sasl;
 
    private ServerSASL chosenMechanism;
+   private ClientSASL clientSASLMechanism;
 
    private final ReentrantLock lock = new ReentrantLock();
 
    private final long creationTime;
 
+   private final boolean isServer;
+
    private SASLResult saslResult;
 
    protected volatile boolean dataReceived;
@@ -80,12 +86,13 @@ public class ProtonHandler extends ProtonInitializable {
 
    boolean inDispatch = false;
 
-   public ProtonHandler(Executor flushExecutor) {
+   public ProtonHandler(Executor flushExecutor, boolean isServer) {
       this.flushExecutor = flushExecutor;
       this.readyListener = () -> flushExecutor.execute(() -> {
          flush();
       });
       this.creationTime = System.currentTimeMillis();
+      this.isServer = isServer;
       transport.bind(connection);
       connection.collect(collector);
    }
@@ -157,9 +164,9 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void createServerSASL(String[] mechanisms) {
-      this.serverSasl = transport.sasl();
-      this.serverSasl.server();
-      serverSasl.setMechanisms(mechanisms);
+      this.sasl = transport.sasl();
+      this.sasl.server();
+      sasl.setMechanisms(mechanisms);
    }
 
    public void flushBytes() {
@@ -210,7 +217,11 @@ public class ProtonHandler extends ProtonInitializable {
                try {
                   byte auth = buffer.getByte(4);
                   if (auth == SASL || auth == BARE) {
-                     dispatchAuth(auth == SASL);
+                     if (isServer) {
+                        dispatchAuth(auth == SASL);
+                     } else if (auth == BARE && clientSASLMechanism == null) {
+                        dispatchAuthSuccess();
+                     }
                      /*
                      * there is a chance that if SASL Handshake has been carried out that
the capacity may change.
                      * */
@@ -260,7 +271,7 @@ public class ProtonHandler extends ProtonInitializable {
       lock.lock();
       try {
          transport.process();
-         checkServerSASL();
+         checkSASL();
       } finally {
          lock.unlock();
       }
@@ -282,52 +293,131 @@ public class ProtonHandler extends ProtonInitializable {
       flush();
    }
 
-   protected void checkServerSASL() {
-      if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
+   protected void checkSASL() {
+      if (isServer) {
+         if (sasl != null && sasl.getRemoteMechanisms().length > 0) {
 
-         if (chosenMechanism == null) {
-            if (log.isTraceEnabled()) {
-               log.trace("SASL chosenMechanism: " + serverSasl.getRemoteMechanisms()[0]);
+            if (chosenMechanism == null) {
+               if (log.isTraceEnabled()) {
+                  log.trace("SASL chosenMechanism: " + sasl.getRemoteMechanisms()[0]);
+               }
+               dispatchRemoteMechanismChosen(sasl.getRemoteMechanisms()[0]);
             }
-            dispatchRemoteMechanismChosen(serverSasl.getRemoteMechanisms()[0]);
-         }
-         if (chosenMechanism != null) {
+            if (chosenMechanism != null) {
 
-            byte[] dataSASL = new byte[serverSasl.pending()];
-            serverSasl.recv(dataSASL, 0, dataSASL.length);
+               byte[] dataSASL = new byte[sasl.pending()];
+               sasl.recv(dataSASL, 0, dataSASL.length);
 
-            if (log.isTraceEnabled()) {
-               log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length
> 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
-            }
+               if (log.isTraceEnabled()) {
+                  log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length
> 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
+               }
 
-            byte[] response = chosenMechanism.processSASL(dataSASL);
-            if (response != null) {
-               serverSasl.send(response, 0, response.length);
-            }
-            saslResult = chosenMechanism.result();
+               byte[] response = chosenMechanism.processSASL(dataSASL);
+               if (response != null) {
+                  sasl.send(response, 0, response.length);
+               }
+               saslResult = chosenMechanism.result();
 
-            if (saslResult != null) {
-               if (saslResult.isSuccess()) {
-                  saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
-               } else {
-                  saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
+               if (saslResult != null) {
+                  if (saslResult.isSuccess()) {
+                     saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
+                  } else {
+                     saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
+                  }
                }
+            } else {
+               // no auth available, system error
+               saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
+            }
+         }
+      } else {
+         if (sasl != null) {
+            switch (sasl.getState()) {
+               case PN_SASL_IDLE:
+                  if (sasl.getRemoteMechanisms().length != 0) {
+                     dispatchMechanismsOffered(sasl.getRemoteMechanisms());
+
+                     if (clientSASLMechanism == null) {
+                        log.infof("Outbound connection failed - unknown mechanism, offered
mechanisms: %s",
+                                  Arrays.asList(sasl.getRemoteMechanisms()));
+                        sasl = null;
+                        dispatchAuthFailed();
+                     } else {
+                        sasl.setMechanisms(clientSASLMechanism.getName());
+                        byte[] initialResponse = clientSASLMechanism.getInitialResponse();
+                        if (initialResponse != null) {
+                           sasl.send(initialResponse, 0, initialResponse.length);
+                        }
+                     }
+                  }
+                  break;
+               case PN_SASL_STEP:
+                  int challengeSize = sasl.pending();
+                  byte[] challenge = new byte[challengeSize];
+                  sasl.recv(challenge, 0, challengeSize);
+                  byte[] response = clientSASLMechanism.getResponse(challenge);
+                  sasl.send(response, 0, response.length);
+                  break;
+               case PN_SASL_FAIL:
+                  log.info("Outbound connection failed, authentication failure");
+                  sasl = null;
+                  dispatchAuthFailed();
+                  break;
+               case PN_SASL_PASS:
+                  log.debug("Outbound connection succeeded");
+                  saslResult = new SASLResult() {
+                     @Override
+                     public String getUser() {
+                        return null;
+                     }
+
+                     @Override
+                     public Subject getSubject() {
+                        return null;
+                     }
+
+                     @Override
+                     public boolean isSuccess() {
+                        return true;
+                     }
+                  };
+                  sasl = null;
+
+                  dispatchAuthSuccess();
+                  break;
+               case PN_SASL_CONF:
+                  // do nothing
+                  break;
             }
-         } else {
-            // no auth available, system error
-            saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
          }
       }
    }
 
    private void saslComplete(Sasl.SaslOutcome saslOutcome) {
-      serverSasl.done(saslOutcome);
-      serverSasl = null;
+      sasl.done(saslOutcome);
+      sasl = null;
       if (chosenMechanism != null) {
          chosenMechanism.done();
       }
    }
 
+   private void dispatchAuthFailed() {
+      for (EventHandler h : handlers) {
+         h.onAuthFailed(this, getConnection());
+      }
+   }
+
+   private void dispatchAuthSuccess() {
+      for (EventHandler h : handlers) {
+         h.onAuthSuccess(this, getConnection());
+      }
+   }
+
+   private void dispatchMechanismsOffered(final String[] mechs) {
+      for (EventHandler h : handlers) {
+         h.onSaslMechanismsOffered(this, mechs);
+      }
+   }
    private void dispatchAuth(boolean sasl) {
       for (EventHandler h : handlers) {
          h.onAuthInit(this, getConnection(), sasl);
@@ -393,4 +483,13 @@ public class ProtonHandler extends ProtonInitializable {
    public void setChosenMechanism(ServerSASL chosenMechanism) {
       this.chosenMechanism = chosenMechanism;
    }
+
+   public void setClientMechanism(final ClientSASL saslClientMech) {
+      this.clientSASLMechanism = saslClientMech;
+   }
+
+   public void createClientSASL() {
+      this.sasl = transport.sasl();
+      this.sasl.client();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
new file mode 100644
index 0000000..cd132a9
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ClientSASL {
+   String getName();
+   byte[] getInitialResponse();
+   byte[] getResponse(byte[] challenge);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
new file mode 100644
index 0000000..f6e9e25
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ClientSASLFactory {
+   ClientSASL chooseMechanism(String[] availableMechanims);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 3d8be49..16589fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -28,16 +31,37 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFac
 import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
 import org.junit.Test;
 
 public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
 
+   private boolean securityEnabled;
+
    @Test(timeout = 60000)
    public void testOutboundConnection() throws Throwable {
-      final ActiveMQServer remote = createServer(AMQP_PORT + 1);
-      remote.start();
+      runOutboundConnectionTest(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testOutboundConnectionWithSecurity() throws Throwable {
+      runOutboundConnectionTest(true);
+   }
+
+
+   private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
+      final ActiveMQServer remote;
+      try {
+         securityEnabled = withSecurity;
+         remote = createServer(AMQP_PORT + 1);
+      } finally {
+         securityEnabled = false;
+      }
       try {
          Wait.waitFor(remote::isActive);
       } catch (Exception e) {
@@ -45,10 +69,30 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport
{
          throw e;
       }
 
-      final Map<String, Object> config = new LinkedHashMap<>();
-      config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      final Map<String, Object> config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME,
"localhost");
       config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
-      ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new
AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"),
"propvalue"), 5000), Optional.empty());
+      final ClientSASLFactory clientSASLFactory;
+      if (withSecurity) {
+         clientSASLFactory = availableMechanims -> {
+            if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN"))
{
+               return new PlainSASLMechanism(fullUser, fullPass);
+            } else {
+               return null;
+            }
+         };
+      } else {
+         clientSASLFactory = null;
+      }
+      final AtomicBoolean connectionOpened = new AtomicBoolean();
+
+      EventHandler eventHandler = new EventHandler() {
+         @Override
+         public void onRemoteOpen(Connection connection) throws Exception {
+            connectionOpened.set(true);
+         }
+      };
+
+      ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new
AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"),
"propvalue"), 5000), Optional.of(eventHandler), clientSASLFactory);
       ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(),
server);
       NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener,
server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(),
protocolManager);
       connector.start();
@@ -57,7 +101,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
       try {
          Wait.waitFor(() -> remote.getConnectionCount() > 0);
          assertEquals(1, remote.getConnectionCount());
-
+         Wait.waitFor(connectionOpened::get);
+         assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get());
          lifeCycleListener.stop();
 
          Wait.waitFor(() -> remote.getConnectionCount() == 0);
@@ -67,4 +112,38 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport
{
          remote.stop();
       }
    }
+
+   @Override
+   protected boolean isSecurityEnabled() {
+      return securityEnabled;
+   }
+
+   private static class PlainSASLMechanism implements ClientSASL {
+
+      private final byte[] initialResponse;
+
+      PlainSASLMechanism(String username, String password) {
+         byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+         byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+         byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2];
+         System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length);
+         System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length);
+         initialResponse = encoded;
+      }
+
+      @Override
+      public String getName() {
+         return "PLAIN";
+      }
+
+      @Override
+      public byte[] getInitialResponse() {
+         return initialResponse;
+      }
+
+      @Override
+      public byte[] getResponse(byte[] challenge) {
+         return new byte[0];
+      }
+   }
 }


Mime
View raw message