activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591
Date Mon, 30 Mar 2015 20:41:51 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 0fd174b92 -> e333fd957


https://issues.apache.org/jira/browse/AMQ-5591

Clean up SASL authentication code to make it easier to add new
mechanisms.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e333fd95
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e333fd95
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e333fd95

Branch: refs/heads/master
Commit: e333fd957b117282dd3fca8c7237fa6c82e3c77a
Parents: 0fd174b
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Mar 30 16:41:34 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Mar 30 16:41:34 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java | 203 ++++++-------------
 .../amqp/sasl/AbstractSaslMechanism.java        |  36 ++++
 .../transport/amqp/sasl/AmqpAuthenticator.java  | 147 ++++++++++++++
 .../transport/amqp/sasl/AnonymousMechanism.java |  44 ++++
 .../transport/amqp/sasl/PlainMechanism.java     |  46 +++++
 .../transport/amqp/sasl/SaslMechanism.java      |  51 +++++
 .../transport/amqp/JMSClientSimpleAuthTest.java |  41 ++--
 7 files changed, 418 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index a902315..0edc62f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -26,11 +26,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,8 +54,6 @@ import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.security.AuthenticationBroker;
-import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.transport.amqp.AmqpHeader;
 import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
@@ -67,6 +62,7 @@ import org.apache.activemq.transport.amqp.AmqpTransport;
 import org.apache.activemq.transport.amqp.AmqpTransportFilter;
 import org.apache.activemq.transport.amqp.AmqpWireFormat;
 import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.qpid.proton.Proton;
@@ -80,7 +76,6 @@ import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
@@ -108,27 +103,28 @@ public class AmqpConnection implements AmqpProtocolConverter {
     private final AmqpTransport amqpTransport;
     private final AmqpWireFormat amqpWireFormat;
     private final BrokerService brokerService;
-    private AuthenticationBroker authenticator;
-    private Sasl sasl;
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private final AtomicInteger lastCommandId = new AtomicInteger();
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final ConnectionInfo connectionInfo = new ConnectionInfo();
-    private long nextSessionId = 0;
-    private long nextTempDestinationId = 0;
-    private boolean closing = false;
-    private boolean closedSocket = false;
+    private long nextSessionId;
+    private long nextTempDestinationId;
+    private boolean closing;
+    private boolean closedSocket;
+    private AmqpAuthenticator authenticator;
 
     private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer,
ResponseHandler>();
     private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId =
new ConcurrentHashMap<ConsumerId, AmqpSender>();
 
     public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
         this.amqpTransport = transport;
+
         AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
         if (monitor != null) {
             monitor.setProtocolConverter(this);
         }
+
         this.amqpWireFormat = transport.getWireFormat();
         this.brokerService = brokerService;
 
@@ -272,11 +268,10 @@ public class AmqpConnection implements AmqpProtocolConverter {
 
             switch (header.getProtocolId()) {
                 case 0:
+                    authenticator = null;
                     break; // nothing to do..
                 case 3: // Client will be using SASL for auth..
-                    sasl = protonTransport.sasl();
-                    sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
-                    sasl.server();
+                    authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(),
brokerService);
                     break;
                 default:
             }
@@ -285,10 +280,6 @@ public class AmqpConnection implements AmqpProtocolConverter {
             frame = (Buffer) command;
         }
 
-        onFrame(frame);
-    }
-
-    public void onFrame(Buffer frame) throws Exception {
         while (frame.length > 0) {
             try {
                 int count = protonTransport.input(frame.data, frame.offset, frame.length);
@@ -298,89 +289,69 @@ public class AmqpConnection implements AmqpProtocolConverter {
                 return;
             }
 
-            try {
-                if (sasl != null) {
-                    // Lets try to complete the sasl handshake.
-                    if (sasl.getRemoteMechanisms().length > 0) {
-                        if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
-                            byte[] data = new byte[sasl.pending()];
-                            sasl.recv(data, 0, data.length);
-                            Buffer[] parts = new Buffer(data).split((byte) 0);
-                            if (parts.length > 0) {
-                                connectionInfo.setUserName(parts[0].utf8().toString());
-                            }
-                            if (parts.length > 1) {
-                                connectionInfo.setPassword(parts[1].utf8().toString());
-                            }
-
-                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates()))
{
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                            } else {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
-                            }
-
-                            amqpTransport.getWireFormat().resetMagicRead();
-                            sasl = null;
-                            LOG.debug("SASL [PLAIN] Handshake complete.");
-                        } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
-                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates()))
{
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                            } else {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
-                            }
-                            amqpTransport.getWireFormat().resetMagicRead();
-                            sasl = null;
-                            LOG.debug("SASL [ANONYMOUS] Handshake complete.");
-                        }
-                    }
-                }
+            if (authenticator != null) {
+                processSaslExchange();
+            } else {
+                processProtonEvents();
+            }
+        }
+    }
 
-                Event event = null;
-                while ((event = eventCollector.peek()) != null) {
-                    if (amqpTransport.isTrace()) {
-                        LOG.trace("Processing event: {}", event.getType());
-                    }
-                    switch (event.getType()) {
-                        case CONNECTION_REMOTE_OPEN:
-                            processConnectionOpen(event.getConnection());
-                            break;
-                        case CONNECTION_REMOTE_CLOSE:
-                            processConnectionClose(event.getConnection());
-                            break;
-                        case SESSION_REMOTE_OPEN:
-                            processSessionOpen(event.getSession());
-                            break;
-                        case SESSION_REMOTE_CLOSE:
-                            processSessionClose(event.getSession());
-                            break;
-                        case LINK_REMOTE_OPEN:
-                            processLinkOpen(event.getLink());
-                            break;
-                        case LINK_REMOTE_DETACH:
-                            processLinkDetach(event.getLink());
-                            break;
-                        case LINK_REMOTE_CLOSE:
-                            processLinkClose(event.getLink());
-                            break;
-                        case LINK_FLOW:
-                            processLinkFlow(event.getLink());
-                            break;
-                        case DELIVERY:
-                            processDelivery(event.getDelivery());
-                            break;
-                        default:
-                            break;
-                    }
+    private void processSaslExchange() throws Exception {
+        authenticator.processSaslExchange(connectionInfo);
+        if (authenticator.isDone()) {
+            amqpTransport.getWireFormat().resetMagicRead();
+        }
+        pumpProtonToSocket();
+    }
 
-                    eventCollector.pop();
+    private void processProtonEvents() throws Exception {
+        try {
+            Event event = null;
+            while ((event = eventCollector.peek()) != null) {
+                if (amqpTransport.isTrace()) {
+                    LOG.trace("Processing event: {}", event.getType());
+                }
+                switch (event.getType()) {
+                    case CONNECTION_REMOTE_OPEN:
+                        processConnectionOpen(event.getConnection());
+                        break;
+                    case CONNECTION_REMOTE_CLOSE:
+                        processConnectionClose(event.getConnection());
+                        break;
+                    case SESSION_REMOTE_OPEN:
+                        processSessionOpen(event.getSession());
+                        break;
+                    case SESSION_REMOTE_CLOSE:
+                        processSessionClose(event.getSession());
+                        break;
+                    case LINK_REMOTE_OPEN:
+                        processLinkOpen(event.getLink());
+                        break;
+                    case LINK_REMOTE_DETACH:
+                        processLinkDetach(event.getLink());
+                        break;
+                    case LINK_REMOTE_CLOSE:
+                        processLinkClose(event.getLink());
+                        break;
+                    case LINK_FLOW:
+                        processLinkFlow(event.getLink());
+                        break;
+                    case DELIVERY:
+                        processDelivery(event.getDelivery());
+                        break;
+                    default:
+                        break;
                 }
 
-            } catch (Throwable e) {
-                handleException(new AmqpProtocolException("Could not process AMQP commands",
true, e));
+                eventCollector.pop();
             }
 
-            pumpProtonToSocket();
+        } catch (Throwable e) {
+            handleException(new AmqpProtocolException("Could not process AMQP commands",
true, e));
         }
+
+        pumpProtonToSocket();
     }
 
     protected void processConnectionOpen(Connection connection) throws Exception {
@@ -697,46 +668,4 @@ public class AmqpConnection implements AmqpProtocolConverter {
 
         monitor.stopConnectChecker();
     }
-
-    private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates)
{
-        try {
-            if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates)
!= null) {
-                return true;
-            }
-
-            return false;
-        } catch (Throwable error) {
-            return false;
-        }
-    }
-
-    private AuthenticationBroker getAuthenticator() {
-        if (authenticator == null) {
-            try {
-                authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
-            } catch (Exception e) {
-                LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a
default Noop version.");
-            }
-
-            if (authenticator == null) {
-                authenticator = new DefaultAuthenticationBroker();
-            }
-        }
-
-        return authenticator;
-    }
-
-    private class DefaultAuthenticationBroker implements AuthenticationBroker {
-
-        @Override
-        public SecurityContext authenticate(String username, String password, X509Certificate[]
peerCertificates) throws SecurityException {
-            return new SecurityContext(username) {
-
-                @Override
-                public Set<Principal> getPrincipals() {
-                    return null;
-                }
-            };
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AbstractSaslMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AbstractSaslMechanism.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AbstractSaslMechanism.java
new file mode 100644
index 0000000..6f2e97e
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AbstractSaslMechanism.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.sasl;
+
+/**
+ * Base class for SASL Mechanisms that provides common functionality.
+ */
+public abstract class AbstractSaslMechanism implements SaslMechanism {
+
+    protected String username;
+    protected String password;
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
new file mode 100644
index 0000000..82e5eab
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.sasl;
+
+import java.security.Principal;
+import java.security.cert.X509Certificate;
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.security.AuthenticationBroker;
+import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.transport.amqp.AmqpTransport;
+import org.apache.qpid.proton.engine.Sasl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SASL Authenitcation engine.
+ */
+public class AmqpAuthenticator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAuthenticator.class);
+
+    private static final String[] mechanisms = new String[] { "ANONYMOUS", "PLAIN" };
+
+    private final BrokerService brokerService;
+    private final AmqpTransport transport;
+    private final Sasl sasl;
+
+    private AuthenticationBroker authenticator;
+
+    public AmqpAuthenticator(AmqpTransport transport, Sasl sasl, BrokerService brokerService)
{
+        this.brokerService = brokerService;
+        this.transport = transport;
+        this.sasl = sasl;
+
+        sasl.setMechanisms(mechanisms);
+        sasl.server();
+    }
+
+    /**
+     * @return true if the SASL exchange has conpleted, regardless of success.
+     */
+    public boolean isDone() {
+        return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
+    }
+
+    /**
+     * @return the list of all SASL mechanisms that are supported curretnly.
+     */
+    public String[] getSupportedMechanisms() {
+        return mechanisms;
+    }
+
+    public void processSaslExchange(ConnectionInfo connectionInfo) {
+        if (sasl.getRemoteMechanisms().length > 0) {
+
+            SaslMechanism mechanism = getSaslMechanism(sasl.getRemoteMechanisms());
+            if (mechanism != null) {
+                LOG.debug("SASL [{}} Handshake started.", mechanism.getMechanismName());
+
+                mechanism.processSaslStep(sasl);
+
+                connectionInfo.setUserName(mechanism.getUsername());
+                connectionInfo.setPassword(mechanism.getPassword());
+
+                if (tryAuthenticate(connectionInfo, transport.getPeerCertificates())) {
+                    sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                } else {
+                    sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+                }
+
+                LOG.debug("SASL [{}} Handshake complete.", mechanism.getMechanismName());
+            } else {
+                LOG.info("SASL: could not find supported mechanism");
+                sasl.done(Sasl.SaslOutcome.PN_SASL_PERM);
+            }
+        }
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private SaslMechanism getSaslMechanism(String[] remoteMechanisms) {
+        String primary = remoteMechanisms[0];
+
+        if (primary.equalsIgnoreCase("PLAIN")) {
+            return new PlainMechanism();
+        } else if (primary.equalsIgnoreCase("ANONYMOUS")) {
+            return new AnonymousMechanism();
+        }
+
+        return null;
+    }
+
+    private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates)
{
+        try {
+            return getAuthenticator().authenticate(info.getUserName(), info.getPassword(),
peerCertificates) != null;
+        } catch (Throwable error) {
+            return false;
+        }
+    }
+
+    private AuthenticationBroker getAuthenticator() {
+        if (authenticator == null) {
+            try {
+                authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
+            } catch (Exception e) {
+                LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a
default Noop version.");
+            }
+
+            if (authenticator == null) {
+                authenticator = new DefaultAuthenticationBroker();
+            }
+        }
+
+        return authenticator;
+    }
+
+    private class DefaultAuthenticationBroker implements AuthenticationBroker {
+
+        @Override
+        public SecurityContext authenticate(String username, String password, X509Certificate[]
peerCertificates) throws SecurityException {
+            return new SecurityContext(username) {
+
+                @Override
+                public Set<Principal> getPrincipals() {
+                    return null;
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AnonymousMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AnonymousMechanism.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AnonymousMechanism.java
new file mode 100644
index 0000000..012e8fd
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AnonymousMechanism.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.sasl;
+
+import org.apache.qpid.proton.engine.Sasl;
+
+/**
+ * SASL Anonymous mechanism implementation.
+ */
+public class AnonymousMechanism implements SaslMechanism {
+
+    @Override
+    public void processSaslStep(Sasl sasl) {
+    }
+
+    @Override
+    public String getMechanismName() {
+        return "ANONYMOUS";
+    }
+
+    @Override
+    public String getUsername() {
+        return null;
+    }
+
+    @Override
+    public String getPassword() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/PlainMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/PlainMechanism.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/PlainMechanism.java
new file mode 100644
index 0000000..cf2948f
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/PlainMechanism.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.sasl;
+
+import org.apache.qpid.proton.engine.Sasl;
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * Implements the SASL Plain mechanism.
+ */
+public class PlainMechanism extends AbstractSaslMechanism {
+
+    @Override
+    public void processSaslStep(Sasl sasl) {
+        byte[] data = new byte[sasl.pending()];
+        sasl.recv(data, 0, data.length);
+        Buffer[] parts = new Buffer(data).split((byte) 0);
+
+        if (parts.length > 0) {
+            username = parts[0].utf8().toString();
+        }
+
+        if (parts.length > 1) {
+            password = parts[1].utf8().toString();
+        }
+    }
+
+    @Override
+    public String getMechanismName() {
+        return "PLAIN";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/SaslMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/SaslMechanism.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/SaslMechanism.java
new file mode 100644
index 0000000..95daa24
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/SaslMechanism.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.sasl;
+
+import org.apache.qpid.proton.engine.Sasl;
+
+/**
+ * A SASL Mechanism implements this interface in order to provide the
+ * AmqpAuthenticator with the means of providing authentication services
+ * in the SASL handshake step.
+ */
+public interface SaslMechanism {
+
+    /**
+     * Perform the SASL processing for this mechanism type.
+     *
+     * @param sasl
+     *        the SASL server that has read the incoming SASL exchange.
+     */
+    void processSaslStep(Sasl sasl);
+
+    /**
+     * @return the User Name extracted from the SASL echange or null if none.
+     */
+    String getUsername();
+
+    /**
+     * @return the Password extracted from the SASL echange or null if none.
+     */
+    String getPassword();
+
+    /**
+     * @return the name of the implemented SASL mechanism.
+     */
+    String getMechanismName();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e333fd95/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
index a7f02f7..d9c7ffb 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
@@ -37,36 +37,55 @@ import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JMSClientSimpleAuthTest {
 
+    @Rule public TestName name = new TestName();
+
     private static final Logger LOG = LoggerFactory.getLogger(JMSClientSimpleAuthTest.class);
 
     private final String SIMPLE_AUTH_AMQP_BROKER_XML =
         "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
     private BrokerService brokerService;
+    private Connection connection;
     private URI amqpURI;
 
     @Before
     public void setUp() throws Exception {
+        LOG.info("========== starting: " + getTestName() + " ==========");
         startBroker();
     }
 
     @After
     public void stopBroker() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception ex) {}
+            connection = null;
+        }
+
         if (brokerService != null) {
             brokerService.stop();
             brokerService = null;
         }
+
+        LOG.info("========== finished: " + getTestName() + " ==========");
+    }
+
+    public String getTestName() {
+        return name.getMethodName();
     }
 
     @Test(timeout = 10000)
     public void testNoUserOrPassword() throws Exception {
         try {
-            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "",
"");
+            connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
             connection.start();
             fail("Expected JMSException");
         } catch (JMSSecurityException ex) {
@@ -77,22 +96,22 @@ public class JMSClientSimpleAuthTest {
     @Test(timeout = 10000)
     public void testUnknownUser() throws Exception {
         try {
-            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser",
"blah");
+            connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser",
"blah");
             connection.start();
             fail("Expected JMSException");
         } catch (JMSSecurityException ex) {
-            LOG.debug("Failed to authenticate connection with no user / password.");
+            LOG.debug("Failed to authenticate connection with unknown user ID");
         }
     }
 
     @Test(timeout = 10000)
     public void testKnownUserWrongPassword() throws Exception {
         try {
-            Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user",
"wrongPassword");
+            connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
             connection.start();
             fail("Expected JMSException");
         } catch (JMSSecurityException ex) {
-            LOG.debug("Failed to authenticate connection with no user / password.");
+            LOG.debug("Failed to authenticate connection with incorrect password.");
         }
     }
 
@@ -105,7 +124,7 @@ public class JMSClientSimpleAuthTest {
                 connection.start();
                 fail("Expected JMSException");
             } catch (JMSSecurityException ex) {
-                LOG.debug("Failed to authenticate connection with no user / password.");
+                LOG.debug("Failed to authenticate connection with incorrect password.");
             } finally {
                 if (connection != null) {
                     connection.close();
@@ -116,7 +135,7 @@ public class JMSClientSimpleAuthTest {
 
     @Test(timeout = 30000)
     public void testSendReceive() throws Exception {
-        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user",
"userPassword");
+        connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue("USERS.txQueue");
         MessageProducer p = session.createProducer(queue);
@@ -139,7 +158,7 @@ public class JMSClientSimpleAuthTest {
 
     @Test(timeout = 30000)
     public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
-        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user",
"userPassword");
+        connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         try {
@@ -151,13 +170,11 @@ public class JMSClientSimpleAuthTest {
 
         // Should not be fatal
         assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
-
-        session.close();
     }
 
     @Test(timeout = 30000)
     public void testCreateTemporaryTopicNotAuthorized() throws JMSException {
-        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user",
"userPassword");
+        connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         try {
@@ -169,8 +186,6 @@ public class JMSClientSimpleAuthTest {
 
         // Should not be fatal
         assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
-
-        session.close();
     }
 
     protected BrokerService createBroker() throws Exception {


Mime
View raw message