Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7E606200D1A for ; Mon, 9 Oct 2017 14:30:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7CC441609E0; Mon, 9 Oct 2017 12:30:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2175C1609BB for ; Mon, 9 Oct 2017 14:30:14 +0200 (CEST) Received: (qmail 17697 invoked by uid 500); 9 Oct 2017 12:30:14 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 17688 invoked by uid 99); 9 Oct 2017 12:30:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 12:30:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B034F5732; Mon, 9 Oct 2017 12:30:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Mon, 09 Oct 2017 12:30:14 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-1454: Support SASL in outgoing AMQP archived-at: Mon, 09 Oct 2017 12:30:16 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master 30ba65a08 -> 88e1fdc78 ARTEMIS-1454: Support SASL in outgoing AMQP Update ProtonHandler to allow for both client and server side SASL and other related changes to allow for setting of client side mechanism Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cc8a0cb9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cc8a0cb9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cc8a0cb9 Branch: refs/heads/master Commit: cc8a0cb90eba71aa595f0ed15814ae3bec326af2 Parents: 30ba65a Author: Robert Godfrey Authored: Wed Sep 27 23:45:14 2017 +0200 Committer: rgodfrey Committed: Mon Oct 9 10:05:35 2017 +0200 ---------------------------------------------------------------------- .../ActiveMQProtonRemotingConnection.java | 4 + .../amqp/broker/ProtonProtocolManager.java | 2 +- .../client/AMQPClientConnectionFactory.java | 6 +- .../client/ProtonClientConnectionManager.java | 16 +- .../amqp/proton/AMQPConnectionContext.java | 106 ++++++------ .../amqp/proton/handler/EventHandler.java | 62 ++++--- .../amqp/proton/handler/ProtonHandler.java | 169 +++++++++++++++---- .../artemis/protocol/amqp/sasl/ClientSASL.java | 23 +++ .../protocol/amqp/sasl/ClientSASLFactory.java | 21 +++ .../amqp/AmqpOutboundConnectionTest.java | 91 +++++++++- 10 files changed, 367 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index c3533eb..fb6ca0a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -175,4 +175,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection public String getClientID() { return amqpConnection.getContainer(); } + + public void open() { + amqpConnection.open(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 8f88d8f..cd35664 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -124,7 +124,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager eventHandler) { + public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional eventHandler, ClientSASLFactory clientSASLFactory) { AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server); Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), false, clientSASLFactory, connectionProperties); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); connectionCallback.setProtonConnectionDelegate(delegate); - amqpConnection.open(connectionProperties); return delegate; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java index ec9136f..df0de77 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; @@ -40,16 +41,19 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class); private final AMQPClientConnectionFactory connectionFactory; private final Optional eventHandler; + private final ClientSASLFactory clientSASLFactory; - public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional eventHandler) { + public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional eventHandler, ClientSASLFactory clientSASLFactory) { this.connectionFactory = connectionFactory; this.eventHandler = eventHandler; + this.clientSASLFactory = clientSASLFactory; } @Override public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) { - ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler); + ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler, clientSASLFactory); connectionMap.put(connection.getID(), amqpConnection); + amqpConnection.open(); log.info("Connection " + amqpConnection.getRemoteAddress() + " created"); } @@ -60,6 +64,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis if (connection != null) { log.info("Connection " + connection.getRemoteAddress() + " destroyed"); connection.disconnect(false); + } else { + log.error("Connection with id " + connectionID + " not found in connectionDestroyed"); } } @@ -69,6 +75,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis if (connection != null) { log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage()); connection.fail(me); + } else { + log.error("Connection with id " + connectionID + " not found in connectionException"); } } @@ -78,6 +86,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis if (connection != null) { log.info("Connection " + connection.getRemoteAddress() + " ready"); connection.getTransportConnection().fireReady(true); + } else { + log.error("Connection with id " + connectionID + " not found in connectionReadyForWrites()!"); } } @@ -92,6 +102,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis RemotingConnection connection = connectionMap.get(connectionID); if (connection != null) { connection.bufferReceived(connectionID, buffer); + } else { + log.error("Connection with id " + connectionID + " not found in bufferReceived()!"); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 680111a..5d376f1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; @@ -68,6 +69,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH protected AMQPConnectionCallback connectionCallback; private final String containerId; + private final boolean isIncomingConnection; + private final ClientSASLFactory saslClientFactory; private final Map connectionProperties = new HashMap<>(); private final ScheduledExecutorService scheduledPool; @@ -84,19 +87,28 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH int maxFrameSize, int channelMax, boolean useCoreSubscriptionNaming, - ScheduledExecutorService scheduledPool) { + ScheduledExecutorService scheduledPool, + boolean isIncomingConnection, + ClientSASLFactory saslClientFactory, + Map connectionProperties) { this.protocolManager = protocolManager; this.connectionCallback = connectionSP; this.useCoreSubscriptionNaming = useCoreSubscriptionNaming; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); + this.isIncomingConnection = isIncomingConnection; + this.saslClientFactory = saslClientFactory; - connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); - connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion()); + this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); + this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion()); + + if (connectionProperties != null) { + this.connectionProperties.putAll(connectionProperties); + } this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); - this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor()); + this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection); handler.addEventHandler(this); Transport transport = handler.getTransport(); transport.setEmitFlowEventOnSend(false); @@ -106,6 +118,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH transport.setChannelMax(channelMax); transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize()); transport.setMaxFrameSize(maxFrameSize); + if (!isIncomingConnection && saslClientFactory != null) { + handler.createClientSASL(); + } + } + + public boolean isIncomingConnection() { + return isIncomingConnection; + } + + public ClientSASLFactory getSaslClientFactory() { + return saslClientFactory; } protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { @@ -232,7 +255,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return ExtCapability.getCapabilities(); } - public void open(Map connectionProperties) { + public void open() { handler.open(containerId, connectionProperties); } @@ -271,56 +294,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } @Override - public void onInit(Connection connection) throws Exception { - - } - - @Override - public void onLocalOpen(Connection connection) throws Exception { - - } - - @Override - public void onLocalClose(Connection connection) throws Exception { - - } - - @Override - public void onFinal(Connection connection) throws Exception { - - } - - @Override - public void onInit(Session session) throws Exception { - - } - - @Override - public void onFinal(Session session) throws Exception { - - } - - @Override - public void onInit(Link link) throws Exception { - - } - - @Override - public void onLocalOpen(Link link) throws Exception { - - } - - @Override - public void onLocalClose(Link link) throws Exception { - - } - - @Override - public void onFinal(Link link) throws Exception { - - } - - @Override public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { if (sasl) { // configured mech in decreasing order of preference @@ -344,6 +317,25 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } @Override + public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) { + if (saslClientFactory != null) { + handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms)); + } + } + + @Override + public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) { + connectionCallback.close(); + handler.close(null); + } + + @Override + public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) { + connection.open(); + flush(); + } + + @Override public void onTransport(Transport transport) { handler.flushBytes(); } @@ -438,10 +430,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } @Override - public void onLocalClose(Session session) throws Exception { - } - - @Override public void onRemoteClose(Session session) throws Exception { lock(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java index 8b99284..34fba0c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java @@ -29,58 +29,66 @@ import org.apache.qpid.proton.engine.Transport; */ public interface EventHandler { - void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl); + default void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { } - void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech); + default void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { } - void onInit(Connection connection) throws Exception; + default void onAuthFailed(ProtonHandler protonHandler, Connection connection) { } - void onLocalOpen(Connection connection) throws Exception; + default void onAuthSuccess(ProtonHandler protonHandler, Connection connection) { } - void onRemoteOpen(Connection connection) throws Exception; + default void onSaslMechanismsOffered(ProtonHandler handler, String[] mechanisms) { } - void onLocalClose(Connection connection) throws Exception; + default void onInit(Connection connection) throws Exception { } - void onRemoteClose(Connection connection) throws Exception; + default void onLocalOpen(Connection connection) throws Exception { } - void onFinal(Connection connection) throws Exception; + default void onRemoteOpen(Connection connection) throws Exception { } - void onInit(Session session) throws Exception; + default void onLocalClose(Connection connection) throws Exception { } - void onLocalOpen(Session session) throws Exception; + default void onRemoteClose(Connection connection) throws Exception { } - void onRemoteOpen(Session session) throws Exception; + default void onFinal(Connection connection) throws Exception { } - void onLocalClose(Session session) throws Exception; + default void onInit(Session session) throws Exception { } - void onRemoteClose(Session session) throws Exception; + default void onLocalOpen(Session session) throws Exception { } - void onFinal(Session session) throws Exception; + default void onRemoteOpen(Session session) throws Exception { } - void onInit(Link link) throws Exception; + default void onLocalClose(Session session) throws Exception { } - void onLocalOpen(Link link) throws Exception; + default void onRemoteClose(Session session) throws Exception { } - void onRemoteOpen(Link link) throws Exception; + default void onFinal(Session session) throws Exception { } - void onLocalClose(Link link) throws Exception; + default void onInit(Link link) throws Exception { } - void onRemoteClose(Link link) throws Exception; + default void onLocalOpen(Link link) throws Exception { } - void onFlow(Link link) throws Exception; + default void onRemoteOpen(Link link) throws Exception { } - void onFinal(Link link) throws Exception; + default void onLocalClose(Link link) throws Exception { } - void onRemoteDetach(Link link) throws Exception; + default void onRemoteClose(Link link) throws Exception { } - void onLocalDetach(Link link) throws Exception; + default void onFlow(Link link) throws Exception { } - void onDelivery(Delivery delivery) throws Exception; + default void onFinal(Link link) throws Exception { } - void onTransport(Transport transport) throws Exception; + default void onRemoteDetach(Link link) throws Exception { } - void pushBytes(ByteBuf bytes); + default void onLocalDetach(Link link) throws Exception { } - boolean flowControl(ReadyListener readyListener); + default void onDelivery(Delivery delivery) throws Exception { } + + default void onTransport(Transport transport) throws Exception { } + + default void pushBytes(ByteBuf bytes) { } + + default boolean flowControl(ReadyListener readyListener) { + return true; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 918b383..54201ea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.handler; +import javax.security.auth.Subject; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -25,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -60,14 +63,17 @@ public class ProtonHandler extends ProtonInitializable { private List handlers = new ArrayList<>(); - private Sasl serverSasl; + private Sasl sasl; private ServerSASL chosenMechanism; + private ClientSASL clientSASLMechanism; private final ReentrantLock lock = new ReentrantLock(); private final long creationTime; + private final boolean isServer; + private SASLResult saslResult; protected volatile boolean dataReceived; @@ -80,12 +86,13 @@ public class ProtonHandler extends ProtonInitializable { boolean inDispatch = false; - public ProtonHandler(Executor flushExecutor) { + public ProtonHandler(Executor flushExecutor, boolean isServer) { this.flushExecutor = flushExecutor; this.readyListener = () -> flushExecutor.execute(() -> { flush(); }); this.creationTime = System.currentTimeMillis(); + this.isServer = isServer; transport.bind(connection); connection.collect(collector); } @@ -157,9 +164,9 @@ public class ProtonHandler extends ProtonInitializable { } public void createServerSASL(String[] mechanisms) { - this.serverSasl = transport.sasl(); - this.serverSasl.server(); - serverSasl.setMechanisms(mechanisms); + this.sasl = transport.sasl(); + this.sasl.server(); + sasl.setMechanisms(mechanisms); } public void flushBytes() { @@ -210,7 +217,11 @@ public class ProtonHandler extends ProtonInitializable { try { byte auth = buffer.getByte(4); if (auth == SASL || auth == BARE) { - dispatchAuth(auth == SASL); + if (isServer) { + dispatchAuth(auth == SASL); + } else if (auth == BARE && clientSASLMechanism == null) { + dispatchAuthSuccess(); + } /* * there is a chance that if SASL Handshake has been carried out that the capacity may change. * */ @@ -260,7 +271,7 @@ public class ProtonHandler extends ProtonInitializable { lock.lock(); try { transport.process(); - checkServerSASL(); + checkSASL(); } finally { lock.unlock(); } @@ -282,52 +293,131 @@ public class ProtonHandler extends ProtonInitializable { flush(); } - protected void checkServerSASL() { - if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) { + protected void checkSASL() { + if (isServer) { + if (sasl != null && sasl.getRemoteMechanisms().length > 0) { - if (chosenMechanism == null) { - if (log.isTraceEnabled()) { - log.trace("SASL chosenMechanism: " + serverSasl.getRemoteMechanisms()[0]); + if (chosenMechanism == null) { + if (log.isTraceEnabled()) { + log.trace("SASL chosenMechanism: " + sasl.getRemoteMechanisms()[0]); + } + dispatchRemoteMechanismChosen(sasl.getRemoteMechanisms()[0]); } - dispatchRemoteMechanismChosen(serverSasl.getRemoteMechanisms()[0]); - } - if (chosenMechanism != null) { + if (chosenMechanism != null) { - byte[] dataSASL = new byte[serverSasl.pending()]; - serverSasl.recv(dataSASL, 0, dataSASL.length); + byte[] dataSASL = new byte[sasl.pending()]; + sasl.recv(dataSASL, 0, dataSASL.length); - if (log.isTraceEnabled()) { - log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous")); - } + if (log.isTraceEnabled()) { + log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous")); + } - byte[] response = chosenMechanism.processSASL(dataSASL); - if (response != null) { - serverSasl.send(response, 0, response.length); - } - saslResult = chosenMechanism.result(); + byte[] response = chosenMechanism.processSASL(dataSASL); + if (response != null) { + sasl.send(response, 0, response.length); + } + saslResult = chosenMechanism.result(); - if (saslResult != null) { - if (saslResult.isSuccess()) { - saslComplete(Sasl.SaslOutcome.PN_SASL_OK); - } else { - saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH); + if (saslResult != null) { + if (saslResult.isSuccess()) { + saslComplete(Sasl.SaslOutcome.PN_SASL_OK); + } else { + saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH); + } } + } else { + // no auth available, system error + saslComplete(Sasl.SaslOutcome.PN_SASL_SYS); + } + } + } else { + if (sasl != null) { + switch (sasl.getState()) { + case PN_SASL_IDLE: + if (sasl.getRemoteMechanisms().length != 0) { + dispatchMechanismsOffered(sasl.getRemoteMechanisms()); + + if (clientSASLMechanism == null) { + log.infof("Outbound connection failed - unknown mechanism, offered mechanisms: %s", + Arrays.asList(sasl.getRemoteMechanisms())); + sasl = null; + dispatchAuthFailed(); + } else { + sasl.setMechanisms(clientSASLMechanism.getName()); + byte[] initialResponse = clientSASLMechanism.getInitialResponse(); + if (initialResponse != null) { + sasl.send(initialResponse, 0, initialResponse.length); + } + } + } + break; + case PN_SASL_STEP: + int challengeSize = sasl.pending(); + byte[] challenge = new byte[challengeSize]; + sasl.recv(challenge, 0, challengeSize); + byte[] response = clientSASLMechanism.getResponse(challenge); + sasl.send(response, 0, response.length); + break; + case PN_SASL_FAIL: + log.info("Outbound connection failed, authentication failure"); + sasl = null; + dispatchAuthFailed(); + break; + case PN_SASL_PASS: + log.debug("Outbound connection succeeded"); + saslResult = new SASLResult() { + @Override + public String getUser() { + return null; + } + + @Override + public Subject getSubject() { + return null; + } + + @Override + public boolean isSuccess() { + return true; + } + }; + sasl = null; + + dispatchAuthSuccess(); + break; + case PN_SASL_CONF: + // do nothing + break; } - } else { - // no auth available, system error - saslComplete(Sasl.SaslOutcome.PN_SASL_SYS); } } } private void saslComplete(Sasl.SaslOutcome saslOutcome) { - serverSasl.done(saslOutcome); - serverSasl = null; + sasl.done(saslOutcome); + sasl = null; if (chosenMechanism != null) { chosenMechanism.done(); } } + private void dispatchAuthFailed() { + for (EventHandler h : handlers) { + h.onAuthFailed(this, getConnection()); + } + } + + private void dispatchAuthSuccess() { + for (EventHandler h : handlers) { + h.onAuthSuccess(this, getConnection()); + } + } + + private void dispatchMechanismsOffered(final String[] mechs) { + for (EventHandler h : handlers) { + h.onSaslMechanismsOffered(this, mechs); + } + } private void dispatchAuth(boolean sasl) { for (EventHandler h : handlers) { h.onAuthInit(this, getConnection(), sasl); @@ -393,4 +483,13 @@ public class ProtonHandler extends ProtonInitializable { public void setChosenMechanism(ServerSASL chosenMechanism) { this.chosenMechanism = chosenMechanism; } + + public void setClientMechanism(final ClientSASL saslClientMech) { + this.clientSASLMechanism = saslClientMech; + } + + public void createClientSASL() { + this.sasl = transport.sasl(); + this.sasl.client(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java new file mode 100644 index 0000000..cd132a9 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASL.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.sasl; + +public interface ClientSASL { + String getName(); + byte[] getInitialResponse(); + byte[] getResponse(byte[] challenge); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java new file mode 100644 index 0000000..f6e9e25 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLFactory.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.sasl; + +public interface ClientSASLFactory { + ClientSASL chooseMechanism(String[] availableMechanims); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cc8a0cb9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java index 3d8be49..16589fc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java @@ -16,10 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; @@ -28,16 +31,37 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFac import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; +import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.engine.Connection; import org.junit.Test; public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { + private boolean securityEnabled; + @Test(timeout = 60000) public void testOutboundConnection() throws Throwable { - final ActiveMQServer remote = createServer(AMQP_PORT + 1); - remote.start(); + runOutboundConnectionTest(false); + } + + @Test(timeout = 60000) + public void testOutboundConnectionWithSecurity() throws Throwable { + runOutboundConnectionTest(true); + } + + + private void runOutboundConnectionTest(boolean withSecurity) throws Exception { + final ActiveMQServer remote; + try { + securityEnabled = withSecurity; + remote = createServer(AMQP_PORT + 1); + } finally { + securityEnabled = false; + } try { Wait.waitFor(remote::isActive); } catch (Exception e) { @@ -45,10 +69,30 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { throw e; } - final Map config = new LinkedHashMap<>(); - config.put(TransportConstants.HOST_PROP_NAME, "localhost"); + final Map config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME, "localhost"); config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1)); - ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty()); + final ClientSASLFactory clientSASLFactory; + if (withSecurity) { + clientSASLFactory = availableMechanims -> { + if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) { + return new PlainSASLMechanism(fullUser, fullPass); + } else { + return null; + } + }; + } else { + clientSASLFactory = null; + } + final AtomicBoolean connectionOpened = new AtomicBoolean(); + + EventHandler eventHandler = new EventHandler() { + @Override + public void onRemoteOpen(Connection connection) throws Exception { + connectionOpened.set(true); + } + }; + + ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.of(eventHandler), clientSASLFactory); ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server); NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager); connector.start(); @@ -57,7 +101,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { try { Wait.waitFor(() -> remote.getConnectionCount() > 0); assertEquals(1, remote.getConnectionCount()); - + Wait.waitFor(connectionOpened::get); + assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get()); lifeCycleListener.stop(); Wait.waitFor(() -> remote.getConnectionCount() == 0); @@ -67,4 +112,38 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { remote.stop(); } } + + @Override + protected boolean isSecurityEnabled() { + return securityEnabled; + } + + private static class PlainSASLMechanism implements ClientSASL { + + private final byte[] initialResponse; + + PlainSASLMechanism(String username, String password) { + byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); + byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); + byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2]; + System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length); + System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length); + initialResponse = encoded; + } + + @Override + public String getName() { + return "PLAIN"; + } + + @Override + public byte[] getInitialResponse() { + return initialResponse; + } + + @Override + public byte[] getResponse(byte[] challenge) { + return new byte[0]; + } + } }