brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [44/64] incubator-brooklyn git commit: brooklyn-software-messaging: add org.apache package prefix
Date Tue, 18 Aug 2015 11:00:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
new file mode 100644
index 0000000..dcd249b
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.entity.trait.Startable;
+
+import com.google.common.collect.ImmutableList;
+
+public class ActiveMQEc2LiveTest extends AbstractEc2LiveTest {
+
+    /**
+     * Test that can install+start, and use, ActiveMQ.
+     */
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        String queueName = "testQueue";
+        int number = 10;
+        String content = "01234567890123456789012345678901";
+
+        // Start broker with a configured queue
+        ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
+        
+        app.start(ImmutableList.of(loc));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
+
+        // Check queue created
+        assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
+        assertEquals(activeMQ.getChildren().size(), 1);
+        assertEquals(activeMQ.getQueues().size(), 1);
+
+        // Get the named queue entity
+        ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
+        assertNotNull(queue);
+
+        // Connect to broker using JMS and send messages
+        Connection connection = getActiveMQConnection(activeMQ);
+        clearQueue(connection, queueName);
+        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
+        sendMessages(connection, number, queueName, content);
+
+        // Check messages arrived
+        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
+
+        connection.close();
+    }
+
+    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
+        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
+        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
+        Connection connection = factory.createConnection("admin", "activemq");
+        connection.start();
+        return connection;
+    }
+
+    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
+        MessageProducer messageProducer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage(content);
+            messageProducer.send(message);
+        }
+
+        session.close();
+    }
+
+    private int clearQueue(Connection connection, String queueName) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+
+        int received = 0;
+        while (messageConsumer.receive(500) != null) received++;
+
+        session.close();
+
+        return received;
+    }
+    
+    @Test(enabled=false)
+    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
new file mode 100644
index 0000000..6ed13ec
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import brooklyn.entity.AbstractGoogleComputeLiveTest;
+import brooklyn.entity.trait.Startable;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class ActiveMQGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest {
+
+    /**
+     * Test that can install+start, and use, ActiveMQ.
+     */
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        String queueName = "testQueue";
+        int number = 10;
+        String content = "01234567890123456789012345678901";
+
+        // Start broker with a configured queue
+        ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
+        
+        app.start(ImmutableList.of(loc));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
+
+        // Check queue created
+        assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
+        assertEquals(activeMQ.getChildren().size(), 1);
+        assertEquals(activeMQ.getQueues().size(), 1);
+
+        // Get the named queue entity
+        ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
+        assertNotNull(queue);
+
+        // Connect to broker using JMS and send messages
+        Connection connection = getActiveMQConnection(activeMQ);
+        clearQueue(connection, queueName);
+        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
+        sendMessages(connection, number, queueName, content);
+
+        // Check messages arrived
+        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
+
+        connection.close();
+    }
+
+    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
+        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
+        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
+        Connection connection = factory.createConnection("admin", "activemq");
+        connection.start();
+        return connection;
+    }
+
+    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
+        MessageProducer messageProducer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage(content);
+            messageProducer.send(message);
+        }
+
+        session.close();
+    }
+
+    private int clearQueue(Connection connection, String queueName) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+
+        int received = 0;
+        while (messageConsumer.receive(500) != null) received++;
+
+        session.close();
+
+        return received;
+    }
+    
+    @Test(enabled=false)
+    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
new file mode 100644
index 0000000..26980c7
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.java.UsesJmx.JmxAgentModes;
+import brooklyn.entity.trait.Startable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test the operation of the {@link ActiveMQBroker} class.
+ */
+public class ActiveMQIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class);
+
+    private TestApplication app;
+    private Location testLocation;
+    private ActiveMQBroker activeMQ;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = app.newLocalhostProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() throws Exception {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+
+    /**
+     * Test that the broker starts up and sets SERVICE_UP correctly.
+     */
+    @Test(groups = "Integration")
+    public void canStartupAndShutdown() throws Exception {
+        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
+
+        activeMQ.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
+        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
+        activeMQ.stop();
+        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that the broker starts up and sets SERVICE_UP correctly,
+     * when a jmx port is supplied
+     */
+    @Test(groups = "Integration")
+    public void canStartupAndShutdownWithCustomJmx() throws Exception {
+        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
+            .configure("jmxPort", "11099+"));
+       
+        activeMQ.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
+        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
+        activeMQ.stop();
+        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
+    }
+
+    @Test(groups = "Integration")
+    public void canStartupAndShutdownWithCustomBrokerName() throws Exception {
+        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
+            .configure("jmxPort", "11099+")
+            .configure("brokerName", "bridge"));
+
+        activeMQ.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
+        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
+        activeMQ.stop();
+        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
+    }
+
+    
+    @Test(groups = "Integration")
+    public void canStartTwo() throws Exception {
+        ActiveMQBroker activeMQ1 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
+        ActiveMQBroker activeMQ2 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
+
+        activeMQ1.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ1, Startable.SERVICE_UP, true);
+        log.info("JMX URL is "+activeMQ1.getAttribute(UsesJmx.JMX_URL));
+
+        activeMQ2.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ2, Startable.SERVICE_UP, true);
+        log.info("JMX URL is "+activeMQ2.getAttribute(UsesJmx.JMX_URL));
+    }
+
+    /**
+     * Test that setting the 'queue' property causes a named queue to be created.
+     */
+    @Test(groups = "Integration")
+    public void testCreatingQueuesDefault() throws Exception {
+        String url = testCreatingQueuesInternal(null);
+        // localhost default is jmxmp
+        Assert.assertTrue(url.contains("jmxmp"), "url="+url);
+    }
+
+    @Test(groups = "Integration")
+    public void testCreatingQueuesRmi() throws Exception {
+        String url = testCreatingQueuesInternal(JmxAgentModes.JMX_RMI_CUSTOM_AGENT);
+        Assert.assertTrue(url.contains("rmi://"), "url="+url);
+        Assert.assertFalse(url.contains("rmi:///jndi"), "url="+url);
+        Assert.assertFalse(url.contains("jmxmp"), "url="+url);
+    }
+
+    @Test(groups = "Integration")
+    public void testCreatingQueuesJmxmp() throws Exception {
+        String url = testCreatingQueuesInternal(JmxAgentModes.JMXMP);
+        // localhost default is rmi
+        Assert.assertTrue(url.contains("jmxmp"), "url="+url);
+        Assert.assertFalse(url.contains("rmi"), "url="+url);
+    }
+
+    @Test(groups = "Integration")
+    public void testCreatingQueuesNoAgent() throws Exception {
+        String url = testCreatingQueuesInternal(JmxAgentModes.NONE);
+        // localhost default is rmi
+        Assert.assertTrue(url.contains("service:jmx:rmi"), "url="+url);
+        Assert.assertFalse(url.contains("jmxmp"), "url="+url);
+    }
+
+    public String testCreatingQueuesInternal(JmxAgentModes mode) throws Exception {
+        String queueName = "testQueue";
+        int number = 20;
+        String content = "01234567890123456789012345678901";
+
+        // Start broker with a configured queue
+        // FIXME Not yet using app.createAndManageChild because later in test do activeMQ.queueNames,
+        // which is not on interface
+        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
+            .configure("queue", queueName)
+            .configure(UsesJmx.JMX_AGENT_MODE, mode));
+        
+        activeMQ.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
+
+        String jmxUrl = activeMQ.getAttribute(UsesJmx.JMX_URL);
+        log.info("JMX URL ("+mode+") is "+jmxUrl);
+        
+        try {
+            // Check queue created
+            assertFalse(activeMQ.getQueueNames().isEmpty());
+            assertEquals(activeMQ.getQueueNames().size(), 1);
+            assertTrue(activeMQ.getQueueNames().contains(queueName));
+            assertEquals(activeMQ.getChildren().size(), 1);
+            assertFalse(activeMQ.getQueues().isEmpty());
+            assertEquals(activeMQ.getQueues().size(), 1);
+
+            // Get the named queue entity
+            ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
+            assertNotNull(queue);
+            assertEquals(queue.getName(), queueName);
+
+            // Connect to broker using JMS and send messages
+            Connection connection = getActiveMQConnection(activeMQ);
+            clearQueue(connection, queueName);
+            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
+            sendMessages(connection, number, queueName, content);
+            // Check messages arrived
+            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
+
+            // Clear the messages
+            assertEquals(clearQueue(connection, queueName), number);
+
+            // Check messages cleared
+            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
+
+            connection.close();
+
+            // Close the JMS connection
+        } finally {
+            // Stop broker
+            activeMQ.stop();
+        }
+        
+        return jmxUrl;
+    }
+
+    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
+        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
+        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://"+address+":"+port);
+        Connection connection = factory.createConnection("admin", "activemq");
+        connection.start();
+        return connection;
+    }
+
+    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(queueName);
+        MessageProducer messageProducer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage(content);
+            messageProducer.send(message);
+        }
+
+        session.close();
+    }
+
+    private int clearQueue(Connection connection, String queueName) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(queueName);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+
+        int received = 0;
+        while (messageConsumer.receive(500) != null) received++;
+
+        session.close();
+        
+        return received;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
new file mode 100644
index 0000000..4253569
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.entity.messaging.activemq.ActiveMQBroker;
+import brooklyn.entity.trait.Startable;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test the operation of the {@link ActiveMQBroker} class.
+ *
+ * TODO test that sensors update.
+ */
+public class KafkaIntegrationTest {
+
+    private TestApplication app;
+    private Location testLocation;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        LocationSpec<LocalhostMachineProvisioningLocation> locationSpec = LocationSpec.create(LocalhostMachineProvisioningLocation.class);
+        testLocation = app.getManagementContext().getLocationManager().createLocation(locationSpec);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+
+    /**
+     * Test that we can start a zookeeper.
+     */
+    @Test(groups = "Integration")
+    public void testZookeeper() {
+        final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
+
+        zookeeper.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
+
+        zookeeper.stop();
+        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that we can start a  broker and zookeeper together.
+     */
+    @Test(groups = "Integration")
+    public void testBrokerPlusZookeeper() {
+        final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
+        final KafkaBroker broker = app.createAndManageChild(EntitySpec.create(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
+
+        zookeeper.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
+
+        broker.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), broker, Startable.SERVICE_UP, true);
+
+        zookeeper.stop();
+        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+
+        broker.stop();
+        assertFalse(broker.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that we can start a cluster with zookeeper and one broker.
+     *
+     * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
+     */
+    @Test(groups = "Integration")
+    public void testTwoBrokerCluster() throws InterruptedException {
+        final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
+                .configure(KafkaCluster.INITIAL_SIZE, 2));
+
+        cluster.start(ImmutableList.of(testLocation));
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
+                assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
+                assertEquals(cluster.getCurrentSize().intValue(), 2);
+                return null;
+            }
+        });
+
+        Entities.dumpInfo(cluster);
+
+        final KafkaSupport support = new KafkaSupport(cluster);
+
+        support.sendMessage("brooklyn", "TEST_MESSAGE");
+
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() {
+            @Override
+            public void run() {
+                String message = support.getMessage("brooklyn");
+                assertEquals(message, "TEST_MESSAGE");
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
new file mode 100644
index 0000000..c24c7e6
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.trait.Startable;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+
+public class KafkaLiveTest extends AbstractEc2LiveTest {
+
+    /**
+     * Test that can install, start and use a Kafka cluster with two brokers.
+     */
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
+                .configure("startTimeout", 300) // 5 minutes
+                .configure("initialSize", 2));
+        app.start(ImmutableList.of(loc));
+
+        Asserts.succeedsEventually(MutableMap.of("timeout", 300000l), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
+                assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
+                assertEquals(cluster.getCurrentSize().intValue(), 2);
+                return null;
+            }
+        });
+
+        Entities.dumpInfo(cluster);
+
+        KafkaSupport support = new KafkaSupport(cluster);
+
+        support.sendMessage("brooklyn", "TEST_MESSAGE");
+        String message = support.getMessage("brooklyn");
+        assertEquals(message, "TEST_MESSAGE");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
new file mode 100644
index 0000000..0a4f3f2
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.EntityPredicates;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.security.InvalidParameterException;
+import java.util.Properties;
+
+import static java.lang.String.format;
+
+/**
+ * Kafka test framework for integration and live tests, using the Kafka Java API.
+ */
+public class KafkaSupport {
+
+    private final KafkaCluster cluster;
+
+    public KafkaSupport(KafkaCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    /**
+     * Send a message to the {@link KafkaCluster} on the given topic.
+     */
+    public void sendMessage(String topic, String message) {
+        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+                Predicates.instanceOf(KafkaBroker.class),
+                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+        if (anyBrokerNodeInCluster.isPresent()) {
+            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+            Properties props = new Properties();
+
+            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+            Producer<String, String> producer = new KafkaProducer<>(props);
+            ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
+
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
+            producer.send(data);
+            producer.close();
+        } else {
+            throw new InvalidParameterException("No kafka broker node found");
+        }
+    }
+
+    /**
+     * Retrieve the next message on the given topic from the {@link KafkaCluster}.
+     */
+    public String getMessage(String topic) {
+        ZooKeeperNode zookeeper = cluster.getZooKeeper();
+        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
+                Predicates.instanceOf(KafkaBroker.class),
+                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
+        if (anyBrokerNodeInCluster.isPresent()) {
+            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
+
+            Properties props = new Properties();
+
+            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
+            props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort()));
+            props.put("group.id", "brooklyn");
+            props.put("partition.assignment.strategy", "RoundRobin");
+            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+            KafkaConsumer consumer = new KafkaConsumer(props);
+
+            consumer.subscribe(topic);
+            // FIXME unimplemented KafkaConsumer.poll
+//            Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic);
+            return "TEST_MESSAGE";
+        } else {
+            throw new InvalidParameterException("No kafka broker node found");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
new file mode 100644
index 0000000..c01faab
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+
+import com.google.common.collect.ImmutableList;
+
+public class QpidEc2LiveTest extends AbstractEc2LiveTest {
+
+    // TODO Also check can connect (e.g. to send/receive messages)
+    
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        QpidBroker qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
+                .configure("jmxPort", "9909+")
+                .configure("rmiRegistryPort", "9910+"));
+        
+        qpid.start(ImmutableList.of(loc));
+        EntityTestUtils.assertAttributeEqualsEventually(qpid, QpidBroker.SERVICE_UP, true);
+    }
+    
+    @Test(enabled=false)
+    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
new file mode 100644
index 0000000..fcb1033
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.HttpTestUtils;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.configuration.ClientProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.trait.Startable;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Test the operation of the {@link QpidBroker} class.
+ */
+public class QpidIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(QpidIntegrationTest.class);
+
+    private TestApplication app;
+    private Location testLocation;
+    private QpidBroker qpid;
+
+    @BeforeMethod(groups = "Integration")
+    public void setup() {
+        String workingDir = System.getProperty("user.dir");
+        log.info("Qpid working dir: {}", workingDir);
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = app.newLocalhostProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void shutdown() {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+
+    /**
+     * Test that the broker starts up with JMX and RMI ports configured, and sets SERVICE_UP correctly.
+     */
+    @Test(groups = "Integration")
+    public void canStartupAndShutdown() {
+        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
+                .configure("jmxPort", "9909+")
+                .configure("rmiRegistryPort", "9910+"));
+        qpid.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
+        qpid.stop();
+        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that the broker starts up with HTTP management enabled, and we can connect to the URL.
+     */
+    @Test(groups = "Integration")
+    public void canStartupAndShutdownWithHttpManagement() {
+        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
+                .configure("httpManagementPort", "8888+"));
+        qpid.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
+        String httpUrl = "http://"+qpid.getAttribute(QpidBroker.HOSTNAME)+":"+qpid.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT)+"/management";
+        HttpTestUtils.assertHttpStatusCodeEventuallyEquals(httpUrl, 200);
+        // TODO check actual REST output
+        qpid.stop();
+        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that the broker starts up and sets SERVICE_UP correctly when plugins are configured.
+     * 
+     * FIXME the custom plugin was written against qpid 0.14, so that's the version we need to run
+     * this test against. However, v0.14 is no longer available from the download site.
+     * We should update this plugin so it works with the latest qpid.
+     */
+    @Test(enabled = false, groups = "Integration")
+    public void canStartupAndShutdownWithPlugin() {
+        Map<String,String> qpidRuntimeFiles = MutableMap.<String,String>builder()
+                .put("classpath://qpid-test-config.xml", "etc/config.xml")
+                .put("http://developers.cloudsoftcorp.com/brooklyn/repository-test/0.7.0/QpidBroker/qpid-test-plugin.jar", "lib/plugins/sample-plugin.jar")
+                .build();
+        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
+                .configure(SoftwareProcess.RUNTIME_FILES, qpidRuntimeFiles)
+                .configure(QpidBroker.SUGGESTED_VERSION, "0.14"));
+        qpid.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
+        qpid.stop();
+        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that setting the 'queue' property causes a named queue to be created.
+     *
+     * This test is disabled, pending further investigation. Issue with AMQP 0-10 queue names.
+     * 
+     * FIXME disabled becausing failing in jenkins CI (in QpidIntegrationTest.getQpidConnection()).
+     *     url=amqp://admin:********@brooklyn/localhost?brokerlist='tcp://localhost:5672'
+     * Was previously enabled, dispite comment above about "test is disabled".	
+     */
+    @Test(enabled = false, groups = { "Integration", "WIP" })
+    public void testCreatingQueues() {
+        final String queueName = "testQueue";
+        final int number = 20;
+        final String content = "01234567890123456789012345678901";
+
+        // Start broker with a configured queue
+        // FIXME Can't use app.createAndManageChild, because of QpidDestination reffing impl directly
+        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
+                .configure("queue", queueName));
+        qpid.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
+
+        try {
+            // Check queue created
+            assertFalse(qpid.getQueueNames().isEmpty());
+            assertEquals(qpid.getQueueNames().size(), 1);
+            assertTrue(qpid.getQueueNames().contains(queueName));
+            assertEquals(qpid.getChildren().size(), 1);
+            assertFalse(qpid.getQueues().isEmpty());
+            assertEquals(qpid.getQueues().size(), 1);
+
+            // Get the named queue entity
+            final QpidQueue queue = qpid.getQueues().get(queueName);
+            assertNotNull(queue);
+
+            // Connect to broker using JMS and send messages
+            Connection connection = getQpidConnection(qpid);
+            clearQueue(connection, queue.getQueueName());
+            Asserts.succeedsEventually(new Runnable() {
+                @Override
+                public void run() {
+                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(0));
+                }
+            });
+            sendMessages(connection, number, queue.getQueueName(), content);
+
+            // Check messages arrived
+            Asserts.succeedsEventually(new Runnable() {
+                @Override
+                public void run() {
+                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(number));
+                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), Integer.valueOf(number * content.length()));
+                }
+            });
+
+            //TODO clearing the queue currently returns 0
+//            // Clear the messages -- should get 20
+//            assertEquals clearQueue(connection, queue.queueName), 20
+//
+//            // Check messages cleared
+//            executeUntilSucceeds {
+//                assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), 0
+//                assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), 0
+//            }
+
+            // Close the JMS connection
+            connection.close();
+        } catch (JMSException jmse) {
+            log.warn("JMS exception caught", jmse);
+            throw Exceptions.propagate(jmse);
+        } finally {
+            // Stop broker
+            qpid.stop();
+            qpid = null;
+            app = null;
+        }
+    }
+
+    private Connection getQpidConnection(QpidBroker qpid) {
+        int port = qpid.getAttribute(Attributes.AMQP_PORT);
+        System.setProperty(ClientProperties.AMQP_VERSION, "0-10");
+        System.setProperty(ClientProperties.DEST_SYNTAX, "ADDR");
+        String connectionUrl = String.format("amqp://admin:admin@brooklyn/localhost?brokerlist='tcp://localhost:%d'", port);
+        try {
+            AMQConnectionFactory factory = new AMQConnectionFactory(connectionUrl);
+            Connection connection = factory.createConnection();
+            connection.start();
+            return connection;
+        } catch (Exception e) {
+            log.error(String.format("Error connecting to qpid: %s", connectionUrl), e);
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    private void sendMessages(Connection connection, int count, String queueName, String content) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(queueName);
+        MessageProducer messageProducer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage(content);
+            messageProducer.send(message);
+        }
+
+        session.close();
+    }
+
+    private int clearQueue(Connection connection, String queueName) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(queueName);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+
+        int received = 0;
+        while (messageConsumer.receive(500) != null) received++;
+
+        session.close();
+
+        return received;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
new file mode 100644
index 0000000..461ef1a
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class RabbitEc2LiveTest extends AbstractEc2LiveTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitEc2LiveTest.class);
+
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        RabbitBroker rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
+        rabbit.start(ImmutableList.of(loc));
+        EntityTestUtils.assertAttributeEqualsEventually(rabbit, RabbitBroker.SERVICE_UP, true);
+
+        byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
+        String queue = "queueName";
+        Channel producer = null;
+        Channel consumer = null;
+        try {
+            producer = getAmqpChannel(rabbit);
+            consumer = getAmqpChannel(rabbit);
+
+            producer.queueDeclare(queue, true, false, false, Maps.<String,Object>newHashMap());
+            producer.queueBind(queue, AmqpExchange.DIRECT, queue);
+            producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
+            
+            QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
+            consumer.basicConsume(queue, true, queueConsumer);
+        
+            QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery();
+            assertEquals(delivery.getBody(), content);
+        } finally {
+            if (producer != null) producer.close();
+            if (consumer != null) consumer.close();
+        }
+    }
+
+    private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
+        String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
+        LOG.warn("connecting to rabbit {}", uri);
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setUri(uri);
+        Connection conn = factory.newConnection();
+        Channel channel = conn.createChannel();
+        return channel;
+    }
+
+    @Override
+    public void test_CentOS_5() throws SkipException {
+        // Not supported. The EPEL repository described here at [1] does not contain erlang, and the
+        // Erlang repository at [1] requires old versions of rpmlib. Additionally, [2] suggests that
+        // Centos 5 is not supported
+        // [1]:http://www.rabbitmq.com/install-rpm.html
+        // [2]: https://www.erlang-solutions.com/downloads/download-erlang-otp
+        throw new SkipException("Centos 5 is not supported");
+    }
+
+    @Test(enabled=false)
+    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
new file mode 100644
index 0000000..f9bcd5c
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.IOException;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+import brooklyn.entity.trait.Startable;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+/**
+ * Test the operation of the {@link RabbitBroker} class.
+ * 
+ * TODO If you're having problems running this test successfully, here are a few tips:
+ * 
+ *  - Is `erl` on your path for a non-interactive ssh session?
+ *    Look in rabbit's $RUN_DIR/console-err.log (e.g. /tmp/brooklyn-aled/apps/someappid/entities/RabbitBroker_2.8.7_JROYTcSL/console-err.log)
+ *    I worked around that by adding to my ~/.brooklyn/brooklyn.properties:
+ *      brooklyn.ssh.config.scriptHeader=#!/bin/bash -e\nif [ -f ~/.bashrc ] ; then . ~/.bashrc ; fi\nif [ -f ~/.profile ] ; then . ~/.profile ; fi\necho $PATH > /tmp/mypath.txt
+ *    
+ *  - Is the hostname resolving properly?
+ *    Look in $RUN_DIR/console-out.log; is there a message like:
+ *      ERROR: epmd error for host "Aleds-MacBook-Pro": timeout (timed out establishing tcp connection)
+ *    I got around that with disabling my wifi and running when not connected to the internet.
+ */
+public class RabbitIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(RabbitIntegrationTest.class);
+
+    private TestApplication app;
+    private Location testLocation;
+    private RabbitBroker rabbit;
+
+    @BeforeMethod(groups = "Integration")
+    public void setup() {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = new LocalhostMachineProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+
+    /**
+     * Test that the broker starts up and sets SERVICE_UP correctly.
+     */
+    @Test(groups = {"Integration", "WIP"})
+    public void canStartupAndShutdown() throws Exception {
+        rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
+        rabbit.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
+        rabbit.stop();
+        assertFalse(rabbit.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that an AMQP client can connect to and use the broker.
+     */
+    @Test(groups = {"Integration", "WIP"})
+    public void testClientConnection() throws Exception {
+        rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
+        rabbit.start(ImmutableList.of(testLocation));
+        EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
+
+        byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
+        String queue = "queueName";
+        Channel producer = null;
+        Channel consumer = null;
+        try {
+            producer = getAmqpChannel(rabbit);
+            consumer = getAmqpChannel(rabbit);
+
+            producer.queueDeclare(queue, true, false, false, ImmutableMap.<String,Object>of());
+            producer.queueBind(queue, AmqpExchange.DIRECT, queue);
+            producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
+            
+            QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
+            consumer.basicConsume(queue, true, queueConsumer);
+        
+            QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(60 * 1000l); // one minute timeout
+            assertEquals(delivery.getBody(), content);
+        } finally {
+            closeSafely(producer, 10*1000);
+            closeSafely(consumer, 10*1000);
+        }
+    }
+
+    /**
+     * Closes the channel, guaranteeing the call won't hang this thread forever!
+     * 
+     * Saw this during jenkins testing:
+     * "main" prio=10 tid=0x00007f69c8008000 nid=0x5d70 in Object.wait() [0x00007f69d1318000]
+     *         java.lang.Thread.State: WAITING (on object monitor)
+     *         at java.lang.Object.wait(Native Method)
+     *         - waiting on <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
+     *         at java.lang.Object.wait(Object.java:502)
+     *         at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50)
+     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
+     *         at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:65)
+     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
+     *         at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
+     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
+     *         at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
+     *         at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:349)
+     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:543)
+     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:480)
+     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:473)
+     *         at com.rabbitmq.client.Channel$close.call(Unknown Source)
+     *         at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
+     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
+     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
+     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callSafe(AbstractCallSite.java:75)
+     *         at org.apache.brooklyn.entity.messaging.rabbit.RabbitIntegrationTest.testClientConnection(RabbitIntegrationTest.groovy:107)
+     */
+    private void closeSafely(final Channel channel, int timeoutMs) throws InterruptedException {
+        if (channel == null) return;
+        Thread t = new Thread(new Runnable() {
+                @Override public void run() {
+                    try {
+                        channel.close();
+                    } catch (IOException e) {
+                        log.error("Error closing RabbitMQ Channel; continuing", e);
+                    }
+                }});
+        try {
+            t.start();
+            t.join(timeoutMs);
+            
+            if (t.isAlive()) {
+                log.error("Timeout when closing RabbitMQ Channel "+channel+"; aborting close and continuing");
+            }
+        } finally {
+            t.interrupt();
+            t.join(1*1000);
+            if (t.isAlive()) t.stop();
+        }
+    }
+    
+    private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
+        String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
+        log.warn("connecting to rabbit {}", uri);
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setUri(uri);
+        Connection conn = factory.newConnection();
+        Channel channel = conn.createChannel();
+        return channel;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
new file mode 100644
index 0000000..89afe00
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import org.testng.annotations.Test;
+
+@Test(groups="Live")
+public class LocalhostLiveTest extends StormAbstractCloudLiveTest {
+
+    private static final String NAMED_LOCATION = "localhost";
+
+    public String getLocation() {
+        return NAMED_LOCATION;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
new file mode 100644
index 0000000..17cb7d2
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import org.testng.annotations.Test;
+
+@Test(groups="Live")
+public class SoftLayerLiveTest extends StormAbstractCloudLiveTest {
+
+    private static final String NAMED_LOCATION = "softlayer";
+
+    @Override
+    public String getLocation() {
+        return NAMED_LOCATION;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
new file mode 100644
index 0000000..c85b4fa
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import static org.apache.brooklyn.entity.messaging.storm.Storm.NIMBUS_HOSTNAME;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.ZOOKEEPER_ENSEMBLE;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI;
+import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.core.management.internal.LocalManagementContext;
+import org.apache.brooklyn.core.util.ResourceUtils;
+import org.apache.brooklyn.core.util.file.ArchiveBuilder;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.TopologyBuilder;
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.entity.messaging.storm.topologies.ExclamationBolt;
+import brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.os.Os;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+
+public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport {
+
+    protected static final Logger log = LoggerFactory
+            .getLogger(StormAbstractCloudLiveTest.class);
+    private Location location;
+    private ZooKeeperEnsemble zooKeeperEnsemble;
+    private Storm nimbus;
+    private Storm supervisor;
+    private Storm ui;
+
+    @BeforeClass(alwaysRun = true)
+    public void beforeClass() throws Exception {
+        mgmt = new LocalManagementContext();
+        location = mgmt.getLocationRegistry()
+                .resolve(getLocation(), getFlags());
+        super.setUp();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void afterClass() throws Exception {
+        // Entities.destroyAll(mgmt);
+    }
+
+    public abstract String getLocation();
+
+    public Map<String, ?> getFlags() {
+        return MutableMap.of();
+    }
+
+    @Test(groups = {"Live","WIP"})  // needs repair to avoid hard dependency on Andrea's environment
+    public void deployStorm() throws Exception {
+        try {
+            zooKeeperEnsemble = app.createAndManageChild(EntitySpec.create(
+                    ZooKeeperEnsemble.class).configure(
+                    ZooKeeperEnsemble.INITIAL_SIZE, 3));
+            nimbus = app.createAndManageChild(EntitySpec
+                    .create(Storm.class)
+                    .configure(Storm.ROLE, NIMBUS)
+                    .configure(NIMBUS_HOSTNAME, "localhost")
+                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
+                    );
+            supervisor = app.createAndManageChild(EntitySpec
+                    .create(Storm.class)
+                    .configure(Storm.ROLE, SUPERVISOR)
+                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
+                    .configure(NIMBUS_HOSTNAME,
+                            attributeWhenReady(nimbus, Attributes.HOSTNAME)));
+            ui = app.createAndManageChild(EntitySpec
+                    .create(Storm.class)
+                    .configure(Storm.ROLE, UI)
+                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
+                    .configure(NIMBUS_HOSTNAME,
+                            attributeWhenReady(nimbus, Attributes.HOSTNAME)));
+            log.info("Started Storm deployment on '" + getLocation() + "'");
+            app.start(ImmutableList.of(location));
+            Entities.dumpInfo(app);
+            EntityTestUtils.assertAttributeEqualsEventually(app, Startable.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsEventually(zooKeeperEnsemble, Startable.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
+            
+            StormTopology stormTopology = createTopology();
+            submitTopology(stormTopology, "myExclamation", 3, true, 60000);
+        } catch (Exception e) {
+            log.error("Failed to deploy Storm", e);
+            Assert.fail();
+            throw e;
+        }
+    }
+
+    private StormTopology createTopology()
+            throws AlreadyAliveException, InvalidTopologyException {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("word", new TestWordSpout(), 10);
+        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+        
+        return builder.createTopology();
+    }
+
+    public boolean submitTopology(StormTopology stormTopology, String topologyName, int numOfWorkers, boolean debug, long timeoutMs) {
+        if (log.isDebugEnabled()) log.debug("Connecting to NimbusClient: {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME));
+        Config conf = new Config();
+        conf.setDebug(debug);
+        conf.setNumWorkers(numOfWorkers);
+
+        // TODO - confirm this creats the JAR correctly
+        String jar = createJar(
+            new File(Os.mergePaths(ResourceUtils.create(this).getClassLoaderDir(), "org/apache/brooklyn/entity/messaging/storm/topologies")),
+            "org/apache/brooklyn/entity/messaging/storm/");
+        System.setProperty("storm.jar", jar);
+        long startMs = System.currentTimeMillis();
+        long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
+        long currentTime = startMs;
+        Throwable lastError = null;
+        int attempt = 0;
+        while (currentTime <= endMs) {
+            currentTime = System.currentTimeMillis();
+            if (attempt != 0) Time.sleep(Duration.ONE_SECOND);
+            if (log.isTraceEnabled()) log.trace("trying connection to {} at time {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME), currentTime);
+
+            try {
+                StormSubmitter.submitTopology(topologyName, conf, stormTopology);
+                return true;
+            } catch (Exception e) {
+                if (shouldRetryOn(e)) {
+                    if (log.isDebugEnabled()) log.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()});
+                    lastError = e;
+                } else {
+                    throw Throwables.propagate(e);
+                }
+            }
+            attempt++;
+        }
+        log.warn("unable to connect to Nimbus client: ", lastError);
+        Assert.fail();
+        return false;
+    }
+    
+    private boolean shouldRetryOn(Exception e) {
+        if (e.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused"))  return true;
+        return false;
+    }
+    
+    private String createJar(File dir, String parentDirInJar) {
+        if (dir.isDirectory()) {
+            File jarFile = ArchiveBuilder.jar().addAt(dir, parentDirInJar).create(Os.newTempDir(getClass())+"/topologies.jar");
+            return jarFile.getAbsolutePath();
+        } else {
+            return dir.getAbsolutePath(); // An existing Jar archive?
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
new file mode 100644
index 0000000..5339616
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+
+import com.google.common.collect.ImmutableList;
+
+public class StormEc2LiveTest extends AbstractEc2LiveTest {
+
+    /**
+     * Test that can install, start and use a Storm cluster: 1 nimbus, 1 zookeeper, 1 supervisor (worker node).
+     */
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class));
+        Storm nimbus = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
+                Storm.Role.NIMBUS));
+        Storm supervisor = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
+                Storm.Role.SUPERVISOR));
+        Storm ui = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
+                Storm.Role.UI));        
+        app.start(ImmutableList.of(loc));
+        Entities.dumpInfo(app);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
+    }
+
+    @Test(enabled=false)
+    public void testDummy() {} // Convince testng IDE integration that this really does have test methods
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
new file mode 100644
index 0000000..7b84846
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+import brooklyn.util.collections.MutableMap;
+
+@Test(groups="Live")
+public class StormGceLiveTest extends StormAbstractCloudLiveTest {
+
+    private static final String NAMED_LOCATION = "gce-europe-west1";
+    private static final String LOCATION_ID = "gce-europe-west1-a";
+    private static final String URI = "https://www.googleapis.com/compute/v1beta15/projects/google/global/images/centos-6-v20130325";
+    private static final String IMAGE_ID = "centos-6-v20130325";
+
+    @Override
+    public String getLocation() {
+        return NAMED_LOCATION;
+    }
+
+    @Override
+    public Map<String, ?> getFlags() {
+        return MutableMap.of(
+                "locationId", LOCATION_ID,
+                "imageId", IMAGE_ID,
+                "uri", URI + IMAGE_ID,
+                "groupId", "storm-test",
+                "stopIptables", "true"
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
new file mode 100644
index 0000000..a10a30e
--- /dev/null
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm.topologies;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context,
+            OutputCollector collector) {
+        _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        _collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+}
\ No newline at end of file



Mime
View raw message