activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Fix and tests for: https://issues.apache.org/jira/browse/AMQ-4596
Date Wed, 04 Sep 2013 14:26:51 GMT
Updated Branches:
  refs/heads/trunk 74b35bc5d -> ebe54c46b


Fix and tests for: https://issues.apache.org/jira/browse/AMQ-4596

Updates to proton 0.5

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebe54c46
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebe54c46
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebe54c46

Branch: refs/heads/trunk
Commit: ebe54c46b3103b40b1428dd18df08fecc3d72386
Parents: 74b35bc
Author: Timothy Bish <tabish121@gmai.com>
Authored: Wed Sep 4 10:26:44 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Wed Sep 4 10:26:44 2013 -0400

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |   5 +
 .../transport/amqp/AmqpProtocolConverter.java   |  41 +++--
 .../transport/amqp/SimpleAMQPAuthTest.java      | 168 +++++++++++++++++++
 .../transport/amqp/simple-auth-amqp-broker.xml  |  85 ++++++++++
 pom.xml                                         |   2 +-
 5 files changed, 284 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 4e7d65b..b5678a9 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -63,6 +63,11 @@
       <artifactId>activemq-kahadb-store</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-jaas</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <!--  Joram JMS conformance tests -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 20f7594..ff81fb7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -72,19 +72,22 @@ import org.apache.qpid.proton.amqp.transaction.Declare;
 import org.apache.qpid.proton.amqp.transaction.Declared;
 import org.apache.qpid.proton.amqp.transaction.Discharge;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointError;
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.LinkImpl;
+import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
@@ -120,8 +123,9 @@ class AmqpProtocolConverter {
     int prefetch = 100;
 
     ReentrantLock lock = new ReentrantLock();
-    TransportImpl protonTransport = new TransportImpl();
-    ConnectionImpl protonConnection = new ConnectionImpl();
+    EngineFactory engineFactory = new EngineFactoryImpl();
+    Transport protonTransport = engineFactory.createTransport();
+    Connection protonConnection = engineFactory.createConnection();
 
     public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
         this.amqpTransport = transport;
@@ -131,7 +135,7 @@ class AmqpProtocolConverter {
 
     void updateTracer() {
         if (amqpTransport.isTrace()) {
-            this.protonTransport.setProtocolTracer(new ProtocolTracer() {
+            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
                 @Override
                 public void receivedFrame(TransportFrame transportFrame) {
                     if (TRACE_FRAMES.isTraceEnabled()) {
@@ -415,9 +419,7 @@ class AmqpProtocolConverter {
 
                 if (response.isException()) {
                     Throwable exception = ((ExceptionResponse) response).getException();
-                    // TODO: figure out how to close /w an error.
-                    // protonConnection.setLocalError(new EndpointError(exception.getClass().getName(),
-                    // exception.getMessage()));
+                    protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS,
exception.getMessage()));
                     protonConnection.close();
                     pumpProtonToSocket();
                     amqpTransport.onException(IOExceptionSupport.create(exception));
@@ -729,7 +731,7 @@ class AmqpProtocolConverter {
                         if (response.isException()) {
                             receiver.setTarget(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
-                            ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getClass().getName(),
exception.getMessage()));
+                            receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR,
exception.getMessage()));
                             receiver.close();
                         } else {
                             receiver.open();
@@ -740,7 +742,7 @@ class AmqpProtocolConverter {
             }
         } catch (AmqpProtocolException exception) {
             receiver.setTarget(null);
-            ((LinkImpl) receiver).setLocalError(new EndpointError(exception.getSymbolicName(),
exception.getMessage()));
+            receiver.setCondition(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()),
exception.getMessage()));
             receiver.close();
         }
     }
@@ -840,6 +842,7 @@ class AmqpProtocolConverter {
 
         Buffer currentBuffer;
         Delivery currentDelivery;
+        final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
         public void pumpOutbound() throws Exception {
             while (!closed) {
@@ -868,6 +871,12 @@ class AmqpProtocolConverter {
 
                 final MessageDispatch md = outbound.removeFirst();
                 try {
+                    if (md.getMessage() != null) {
+                        org.apache.activemq.command.Message message = md.getMessage();
+                        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+                            message.setProperty(MESSAGE_FORMAT_KEY, 0);
+                        }
+                    }
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
                     if (jms == null) {
                         // It's the end of browse signal.
@@ -1102,7 +1111,7 @@ class AmqpProtocolConverter {
                             SelectorParser.parse(selector);
                         } catch (InvalidSelectorException e) {
                             sender.setSource(null);
-                            ((LinkImpl) sender).setLocalError(new EndpointError("amqp:invalid-field",
e.getMessage()));
+                            sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD,
e.getMessage()));
                             sender.close();
                             consumerContext.closed = true;
                             return;
@@ -1133,7 +1142,7 @@ class AmqpProtocolConverter {
                             sender.setSource(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
                             String name = exception.getClass().getName();
-                            ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                            sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR,
exception.getMessage()));
                         }
                         sender.open();
                         pumpProtonToSocket();
@@ -1185,11 +1194,11 @@ class AmqpProtocolConverter {
                     if (response.isException()) {
                         sender.setSource(null);
                         Throwable exception = ((ExceptionResponse) response).getException();
-                        String name = exception.getClass().getName();
+                        Symbol condition = AmqpError.INTERNAL_ERROR;
                         if (exception instanceof InvalidSelectorException) {
-                            name = "amqp:invalid-field";
+                            condition = AmqpError.INVALID_FIELD;
                         }
-                        ((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                        sender.setCondition(new ErrorCondition(condition, exception.getMessage()));
                         subscriptionsByConsumerId.remove(id);
                         sender.close();
                     } else {
@@ -1201,7 +1210,7 @@ class AmqpProtocolConverter {
             });
         } catch (AmqpProtocolException e) {
             sender.setSource(null);
-            ((LinkImpl) sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
+            sender.setCondition(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()),
e.getMessage()));
             sender.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
new file mode 100644
index 0000000..f3bc037
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java
@@ -0,0 +1,168 @@
+package org.apache.activemq.transport.amqp;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Kevin Earls
+ */
+public class SimpleAMQPAuthTest {
+    public static final String SIMPLE_AUTH_AMQP_BROKER_XML = "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
+    public BrokerService brokerService;
+    protected static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class);
+    protected int port = 5672;
+
+    @Before
+    public void setUp() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService = null;
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testNoUserOrPassword() throws Exception {
+        try {
+            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port,
"", "");
+            Connection connection = factory.createConnection();
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.error("Unexpected exception ", exception);
+                    exception.printStackTrace();
+                }
+            });
+            connection.start();
+            Thread.sleep(1000);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e) {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException)
{
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [null]
or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testUnknownUser() throws Exception {
+        try {
+            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port,
"admin", "password");
+            Connection connection = factory.createConnection("nosuchuser", "blah");
+            connection.start();
+            Thread.sleep(500);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e)  {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException)
{
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser]
or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testKnownUserWrongPassword() throws Exception {
+        try {
+            ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port,
"admin", "password");
+            Connection connection = factory.createConnection("user", "wrongPassword");
+            connection.start();
+            Thread.sleep(500);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Expected JMSException");
+        } catch (JMSException e) {
+            Exception linkedException = e.getLinkedException();
+            if (linkedException != null && linkedException instanceof ConnectionClosedException)
{
+                ConnectionClosedException cce = (ConnectionClosedException) linkedException;
+                assertEquals("Error{condition=unauthorized-access,description=User name [user]
or password is invalid.}", cce.getRemoteError().toString());
+            } else {
+                LOG.error("Unexpected Exception", e);
+                fail("Unexpected exception: " + e.getMessage());
+            }
+        }
+    }
+
+
+    @Test(timeout = 30000)
+    public void testSendReceive() throws Exception {
+        ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin",
"password");
+        Connection connection = factory.createConnection("user", "userPassword");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        MessageProducer p = session.createProducer(queue);
+        TextMessage message = null;
+        message = session.createTextMessage();
+        String messageText = "hello  sent at " + new java.util.Date().toString();
+        message.setText(messageText);
+        p.send(message);
+
+        // Get the message we just sent
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) msg;
+        assertEquals(messageText, textMessage.getText());
+        connection.close();
+    }
+
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        LOG.debug(">>>>> Loading broker configuration from the classpath with
URI: " + uri);
+        return BrokerFactory.createBroker(new URI("xbean:" +  uri));
+    }
+
+    public void startBroker() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
new file mode 100644
index 0000000..624c649
--- /dev/null
+++ b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+
+  <broker useJmx="true" persistent="false" xmlns="http://activemq.apache.org/schema/core"
populateJMSXUserID="true" schedulePeriodForDestinationPurge="2000">
+
+    <destinations>
+      <queue physicalName="TEST.Q" />      
+    </destinations> 
+
+    <!-- Use a non-default port in case the default port is in use -->
+    <managementContext>
+      <managementContext connectorPort="1199"/>
+    </managementContext>
+
+    <transportConnectors>
+      <transportConnector name="openwire" uri="vm://localhost" />
+      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
+    </transportConnectors>
+
+    <plugins>
+
+		<simpleAuthenticationPlugin>
+			<users>
+				<authenticationUser username="system" password="systemPassword" groups="users,admins"/>
+				<authenticationUser username="user" password="userPassword" groups="users"/>
+				<authenticationUser username="guest" password="guestPassword" groups="guests"/>
+			</users>
+		</simpleAuthenticationPlugin>
+
+
+
+      <!--  lets configure a destination based authorization mechanism -->
+        <!--
+      <authorizationPlugin>
+        <map>
+          <authorizationMap>
+            <authorizationEntries>
+              <authorizationEntry queue=">" read="admins" write="admins" admin="admins"
/>
+              <authorizationEntry queue="USERS.>" read="users" write="users" admin="users"
/>
+              <authorizationEntry queue="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
+              
+              <authorizationEntry queue="TEST.Q" read="guests" write="guests" />
+              
+              <authorizationEntry topic=">" read="admins" write="admins" admin="admins"
/>
+              <authorizationEntry topic="USERS.>" read="users" write="users" admin="users"
/>
+              <authorizationEntry topic="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
+              
+              <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users"
admin="guests,users"/>
+            </authorizationEntries>
+            <tempDestinationAuthorizationEntry>  
+                <tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
+            </tempDestinationAuthorizationEntry>     
+          </authorizationMap>
+        </map>
+      </authorizationPlugin>
+      -->
+    </plugins>
+  </broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebe54c46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5e726fb..1564e9e 100755
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@
     <p2psockets-version>1.1.2</p2psockets-version>
     <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
     <zookeeper-version>3.4.3</zookeeper-version>
-    <qpid-proton-version>0.3.0-fuse-4</qpid-proton-version>
+    <qpid-proton-version>0.5</qpid-proton-version>
     <qpid-jms-version>0.22</qpid-jms-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>


Mime
View raw message