brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [30/50] brooklyn-library git commit: Changes based on review comments, including: - Updating to use latest 0.5.0 APIs - Adding general Zookeeper entity interface - Make KafkaCluster implement Group
Date Mon, 01 Feb 2016 17:46:48 GMT
Changes based on review comments, including:
- Updating to use latest 0.5.0 APIs
- Adding general Zookeeper entity interface
- Make KafkaCluster implement Group


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/64486e44
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/64486e44
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/64486e44

Branch: refs/heads/0.5.0
Commit: 64486e44f0cf633f7306718ec85cdaebcb3a2435
Parents: 9825780
Author: Andrew Kennedy <andrew.kennedy@cloudsoftcorp.com>
Authored: Mon Apr 1 19:00:44 2013 +0100
Committer: Andrew Kennedy <andrew.kennedy@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100

----------------------------------------------------------------------
 .../java/brooklyn/demo/KafkaClusterExample.java |  10 +-
 .../brooklyn/demo/SimpleCassandraCluster.java   |   2 +-
 .../brooklyn/demo/SimpleCouchDBCluster.java     |   2 +-
 .../java/brooklyn/demo/SimpleRedisCluster.java  |   2 +-
 .../kafka/AbstractfKafkaSshDriver.java          |   2 +-
 .../entity/messaging/kafka/KafkaBroker.java     |  12 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java |  78 ++++------
 .../messaging/kafka/KafkaBrokerSshDriver.java   |   2 +-
 .../entity/messaging/kafka/KafkaCluster.java    |  83 +++--------
 .../messaging/kafka/KafkaClusterImpl.java       | 122 ++++++++--------
 .../entity/messaging/kafka/KafkaZookeeper.java  |  28 ++--
 .../messaging/kafka/KafkaZookeeperImpl.java     | 100 +------------
 .../kafka/KafkaZookeeperSshDriver.java          |   2 +-
 .../entity/zookeeper/AbstractZookeeperImpl.java | 122 ++++++++++++++++
 .../brooklyn/entity/zookeeper/Zookeeper.java    |  50 +++++++
 .../activemq/ActiveMQIntegrationTest.groovy     |  10 +-
 .../messaging/kafka/KafkaIntegrationTest.groovy | 126 ----------------
 .../messaging/kafka/KafkaIntegrationTest.java   | 144 +++++++++++++++++++
 .../entity/messaging/kafka/KafkaSupport.java    |  24 +++-
 19 files changed, 487 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
index fae6bb6..06bbbed 100644
--- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
@@ -5,7 +5,7 @@ import java.util.List;
 import brooklyn.entity.basic.ApplicationBuilder;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.messaging.kafka.KafkaCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.entity.proxying.EntitySpecs;
 import brooklyn.launcher.BrooklynLauncher;
 import brooklyn.util.CommandLineUtil;
 
@@ -18,10 +18,10 @@ public class KafkaClusterExample extends ApplicationBuilder {
 
     /** Configure the application. */
     protected void doBuild() {
-        createChild(BasicEntitySpec.newInstance(KafkaCluster.class)
+        addChild(EntitySpecs.spec(KafkaCluster.class)
+                .configure("startTimeout", 300) // 5 minutes
                 .configure("initialSize", 2));
-
-        appDisplayName("Kafka cluster application");
+        // TODO set application display name?
     }
 
     public static void main(String[] argv) {
@@ -30,7 +30,7 @@ public class KafkaClusterExample extends ApplicationBuilder {
         String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
 
         BrooklynLauncher launcher = BrooklynLauncher.newInstance()
-                .application(new KafkaClusterExample())
+                .application(new KafkaClusterExample().appDisplayName("Kafka cluster application"))
                 .webconsolePort(port)
                 .location(location)
                 .start();

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
index 50c62a8..b538ec7 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java
@@ -24,7 +24,7 @@ public class SimpleCassandraCluster extends ApplicationBuilder {
 
     /** Create entities. */
     protected void doBuild() {
-        createChild(EntitySpecs.spec(CassandraCluster.class)
+        addChild(EntitySpecs.spec(CassandraCluster.class)
                 .configure("initialSize", "2")
                 .configure("clusterName", "Brooklyn")
                 .configure("jmxPort", "11099+")

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
index 179443e..5de676b 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java
@@ -24,7 +24,7 @@ public class SimpleCouchDBCluster extends ApplicationBuilder {
 
     /** Create entities. */
     protected void doBuild() {
-        createChild(EntitySpecs.spec(CouchDBCluster.class)
+        addChild(EntitySpecs.spec(CouchDBCluster.class)
                 .configure("initialSize", "2")
                 .configure("clusterName", "Brooklyn")
                 .configure("httpPort", "8000+"));

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
index da80e39..0f818f5 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java
@@ -24,7 +24,7 @@ public class SimpleRedisCluster extends ApplicationBuilder {
 
     /** Create entities. */
     protected void doBuild() {
-        createChild(EntitySpecs.spec(RedisCluster.class)
+        addChild(EntitySpecs.spec(RedisCluster.class)
                 .configure("initialSize", "2")
                 .configure("clusterName", "Brooklyn"));
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index f6c7c8d..21e7092 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 import brooklyn.BrooklynVersion;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.EntityLocal;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.util.ssh.CommonCommands;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
 import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
 import brooklyn.location.basic.SshMachineLocation;

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index 2a82b13..c2d7632 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -20,6 +20,7 @@ import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.java.UsesJmx;
 import brooklyn.entity.messaging.MessageBroker;
 import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.zookeeper.Zookeeper;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensor;
 import brooklyn.event.basic.BasicConfigKey;
@@ -42,12 +43,13 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
     PortAttributeSensorAndConfigKey KAFKA_PORT = new PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+");
 
     /** Location of the configuration file template to be copied to the server.*/
-    @SetFromFlag("serverConfig")
-    ConfigKey<String> SERVER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
-            String.class, "kafka.broker.configTemplate", "Server configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/server.properties");
+    @SetFromFlag("kafkaServerConfig")
+    ConfigKey<String> KAFKA_BROKER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class,
+            "kafka.broker.configTemplate", "Kafka broker configuration template (in freemarker format)",
+            "classpath://brooklyn/entity/messaging/kafka/server.properties");
 
     @SetFromFlag("zookeeper")
-    ConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
+    ConfigKey<Zookeeper> ZOOKEEPER = new BasicConfigKey<Zookeeper>(Zookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity");
 
     AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID");
 
@@ -66,6 +68,6 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
 
     Integer getBrokerId();
 
-    KafkaZookeeper getZookeeper();
+    Zookeeper getZookeeper();
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index 0dedf9c..5f8add8 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -15,11 +15,7 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.ObjectName;
@@ -31,23 +27,24 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.entity.messaging.MessageBroker;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.entity.zookeeper.Zookeeper;
 import brooklyn.event.feed.jmx.JmxAttributePollConfig;
 import brooklyn.event.feed.jmx.JmxFeed;
 import brooklyn.event.feed.jmx.JmxHelper;
 import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
 
 import com.google.common.base.Functions;
 import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.collect.Sets;
 
 /**
  * An {@link brooklyn.entity.Entity} that represents a single Kafka broker instance.
  */
 public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker {
+
     private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class);
+    private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
+
+    private volatile JmxFeed jmxFeed;
 
     public KafkaBrokerImpl() {
         super();
@@ -63,7 +60,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
     }
 
     @Override
-    public void postConstruct() {
+    public void init() {
         setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work
     }
 
@@ -74,7 +71,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
     public Integer getBrokerId() { return getAttribute(BROKER_ID); }
 
     @Override
-    public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
+    public Zookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
 
     public KafkaTopic createTopic(Map<?, ?> properties) {
         KafkaTopic result = new KafkaTopic(properties, this);
@@ -88,98 +85,85 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
         return KafkaBrokerDriver.class;
     }
 
-    private ObjectName socketServerStatsMbean = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
-    private volatile FunctionFeed functionFeed;
-    private volatile JmxFeed jmxFeed;
-
-    /** Wait for five minutes to start. */
-    @Override
-    public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); }
-
     @Override
     public void waitForServiceUp(long duration, TimeUnit units) {
         super.waitForServiceUp(duration, units);
 
         // Wait for the MBean to exist
-        JmxHelper helper = null;
+        JmxHelper helper = new JmxHelper(this);
         try {
-            helper = new JmxHelper(this);
-            helper.connect();
-            helper.assertMBeanExistsEventually(socketServerStatsMbean, units.toMillis(duration));
-        } catch (IOException e) {
-            throw Exceptions.propagate(e);
+            helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
         } finally {
-            if (helper != null) helper.disconnect();
+            helper.disconnect();
         }
     }
 
     @Override
     protected void connectSensors() {
-        functionFeed = FunctionFeed.builder()
-                .entity(this)
-                .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
-                        .period(500, TimeUnit.MILLISECONDS)
-                        .callable(new Callable<Boolean>() {
-                            public Boolean call() throws Exception {
-                                return getDriver().isRunning();
-                            }
-                        })
-                        .onError(Functions.constant(Boolean.FALSE)))
-                .build();
+        connectServiceUpIsRunning();
 
         jmxFeed = JmxFeed.builder()
                 .entity(this)
                 .period(500, TimeUnit.MILLISECONDS)
                 .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("NumFetchRequests")
                         .onError(Functions.constant(-1l)))
                 .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("TotalFetchRequestMs")
                         .onError(Functions.constant(-1l)))
                 .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("MaxFetchRequestMs")
                         .onError(Functions.constant(-1.0d)))
                 .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("NumProduceRequests")
                         .onError(Functions.constant(-1l)))
                 .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("TotalProduceRequestMs")
                         .onError(Functions.constant(-1l)))
                 .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("MaxProduceRequestMs")
                         .onError(Functions.constant(-1.0d)))
                 .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("TotalBytesRead")
                         .onError(Functions.constant(-1l)))
                 .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT)
-                        .objectName(socketServerStatsMbean)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
                         .attributeName("TotalBytesWritten")
                         .onError(Functions.constant(-1l)))
                 .build();
+
+        setBrokerUrl();
     }
 
     @Override
     public void disconnectSensors() {
         super.disconnectSensors();
-        if (functionFeed != null) functionFeed.stop();
+        disconnectServiceUpIsRunning();
         if (jmxFeed != null) jmxFeed.stop();
     }
 
     @Override
     protected ToStringHelper toStringHelper() {
-        return super.toStringHelper().add("kafkaPort", getKafkaPort());
+        return super.toStringHelper()
+                .add("kafkaPort", getKafkaPort());
     }
 
+    /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */
     @Override
     public void setBrokerUrl() {
-        // TODO
+        Zookeeper zookeeper = getZookeeper();
+        if (zookeeper != null) {
+            setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort()));
+        } else {
+            setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort()));
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 40e7234..40df6b4 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -34,7 +34,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf
 
     @Override
     protected ConfigKey<String> getConfigTemplateKey() {
-        return KafkaBroker.SERVER_CONFIG_TEMPLATE;
+        return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
index 96e46ff..d1e123a 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
@@ -18,16 +18,15 @@ package brooklyn.entity.messaging.kafka;
 import brooklyn.catalog.Catalog;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigurableEntityFactory;
+import brooklyn.entity.Group;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.group.Cluster;
 import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.entity.trait.Resizable;
 import brooklyn.entity.trait.Startable;
+import brooklyn.entity.zookeeper.Zookeeper;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
@@ -35,93 +34,53 @@ import brooklyn.event.basic.BasicConfigKey;
 import brooklyn.util.flags.SetFromFlag;
 
 /**
- * This entity contains the sub-groups and entities that go in to a single location (e.g. datacenter)
- * to provide Kafka cluster functionality.
+ * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled
+ * by a single {@link KafkaZookeeper zookeeper} entity.
  * <p>
- * You can customise the broker by customising the factory (by reference in calling code)
- * or supplying your own factory (as a config flag).
+ * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications}
+ * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the
+ * Kafka zookeeper.
  * <p>
- * The contents of this group entity are:
+ * The contents of this entity are:
  * <ul>
  * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s
- * <li>a {@link KafkaZookeeper}
- * <li>a {@link brooklyn.policy.Policy} to resize the DynamicCluster
+ * <li>a {@link KafkaZookeeper} or {@link Zookeeper}
+ * <li>a {@link brooklyn.policy.Policy} to resize the broker cluster
  * </ul>
+ * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling
+ * {@link Resizable#resize(Integer) resize} will change the number of brokers.
  */
 @SuppressWarnings({ "unchecked", "rawtypes" })
 @Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system")
 @ImplementedBy(KafkaClusterImpl.class)
-public interface KafkaCluster extends Entity, Startable, Resizable  {
-
-    class Spec<T extends KafkaCluster, S extends Spec<T,S>> extends BasicEntitySpec<T,S> {
-
-        private static class ConcreteSpec extends Spec<KafkaCluster, ConcreteSpec> {
-            ConcreteSpec() {
-                super(KafkaCluster.class);
-            }
-        }
-
-        public static Spec<KafkaCluster, ?> newInstance() {
-            return new ConcreteSpec();
-        }
-
-        protected Spec(Class<T> type) {
-            super(type);
-        }
-
-        public S initialSize(int val) {
-            configure(INITIAL_SIZE, val);
-            return self();
-        }
-
-        public S zookeeper(KafkaZookeeper val) {
-            configure(ZOOKEEPER, val);
-            return self();
-        }
-
-        public S brokerSpec(EntitySpec<KafkaBroker> val) {
-            configure(BROKER_SPEC, val);
-            return self();
-        }
-
-        public S brokerFactory(ConfigurableEntityFactory<KafkaBroker> val) {
-            configure(BROKER_FACTORY, val);
-            return self();
-        }
-    }
+public interface KafkaCluster extends Entity, Startable, Resizable, Group  {
 
     @SetFromFlag("startTimeout")
-    public static final ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT;
+    ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT;
 
     @SetFromFlag("initialSize")
     ConfigKey<Integer> INITIAL_SIZE = new BasicConfigKey<Integer>(Cluster.INITIAL_SIZE, 1);
 
+    /** Zookeeper for the cluster. If null a default be will created. */
     @SetFromFlag("zookeeper")
-    BasicAttributeSensorAndConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<KafkaZookeeper>(
-            KafkaZookeeper.class, "kafka.cluster.zookeeper", "Kafka zookeeper for the cluster; if null a default will created");
+    BasicAttributeSensorAndConfigKey<Zookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<Zookeeper>(
+            Zookeeper.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created");
 
+    /** Spec for creating the default Kafka zookeeper entity. */
     @SetFromFlag("zookeeperSpec")
     BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZookeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey(
             EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper");
 
-    /** Factory to create a Kafka broker, given flags */
-    @SetFromFlag("brokerFactory")
-    BasicAttributeSensorAndConfigKey<ConfigurableEntityFactory<KafkaBroker>> BROKER_FACTORY = new BasicAttributeSensorAndConfigKey(
-            ConfigurableEntityFactory.class, "kafka.cluster.brokerFactory", "Factory to create a Kafka broker");
-
-    /** Spec for Kafka broker entiites to be created */
+    /** Spec for Kafka broker entities to be created. */
     @SetFromFlag("brokerSpec")
     BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey(
             EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created");
 
+    /** Underlying Kafka broker cluster. */
     AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>(
             DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster");
 
-    AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME;
-
-    KafkaZookeeper getZookeeper();
-
-    ConfigurableEntityFactory<KafkaBroker> getBrokerFactory();
+    Zookeeper getZookeeper();
 
     DynamicCluster getCluster();
 

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
index efc14fc..1938efa 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
@@ -18,7 +18,6 @@ package brooklyn.entity.messaging.kafka;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,19 +25,19 @@ import org.slf4j.LoggerFactory;
 import brooklyn.enricher.basic.SensorPropagatingEnricher;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.ConfigurableEntityFactory;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.BasicEntitySpec;
 import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.proxying.WrappingEntitySpec;
+import brooklyn.entity.proxying.EntitySpecs;
 import brooklyn.entity.trait.Startable;
+import brooklyn.entity.zookeeper.Zookeeper;
 import brooklyn.event.feed.ConfigToAttributes;
 import brooklyn.location.Location;
 import brooklyn.util.MutableList;
 import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.CompoundRuntimeException;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -65,104 +64,97 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
     }
 
     @Override
-    public void postConstruct() {
-        ConfigToAttributes.apply(this, BROKER_FACTORY);
+    public void init() {
         ConfigToAttributes.apply(this, BROKER_SPEC);
         ConfigToAttributes.apply(this, ZOOKEEPER);
         ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
 
         log.debug("creating zookeeper child for {}", this);
-        KafkaZookeeper zookeeper = getAttribute(ZOOKEEPER);
+        Zookeeper zookeeper = getAttribute(ZOOKEEPER);
         if (zookeeper == null) {
             EntitySpec<KafkaZookeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC);
             if (zookeeperSpec == null) {
                 log.debug("creating zookeeper using default spec for {}", this);
-                zookeeperSpec = BasicEntitySpec.newInstance(KafkaZookeeper.class);
+                zookeeperSpec = EntitySpecs.spec(KafkaZookeeper.class);
                 setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
             } else {
                 log.debug("creating zookeeper using custom spec for {}", this);
             }
-            zookeeper = getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this));
+            zookeeper = addChild(zookeeperSpec);
             if (Entities.isManaged(this)) Entities.manage(zookeeper);
             setAttribute(ZOOKEEPER, zookeeper);
         }
 
         log.debug("creating cluster child for {}", this);
-        ConfigurableEntityFactory<KafkaBroker> brokerFactory = getAttribute(BROKER_FACTORY);
         EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC);
-        if (brokerFactory == null && brokerSpec == null) {
+        if (brokerSpec == null) {
             log.debug("creating default broker spec for {}", this);
-            brokerSpec = BasicEntitySpec.newInstance(KafkaBroker.class);
+            brokerSpec = EntitySpecs.spec(KafkaBroker.class);
             setAttribute(BROKER_SPEC, brokerSpec);
         }
-        // Note relies on initial_size being inherited by DynamicCluster, because key id is identical
-        // We add the zookeeper configuration to the KafkaBroker specification or factory here
-        Map<String,Object> flags;
-        if (brokerSpec != null) {
-            flags = MutableMap.<String, Object>of("memberSpec", WrappingEntitySpec.newInstance(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper));
-        } else {
-            brokerFactory.configure(KafkaBroker.ZOOKEEPER, zookeeper);
-            flags = MutableMap.<String, Object>of("factory", brokerFactory);
-        }
-        DynamicCluster cluster = getEntityManager().createEntity(BasicEntitySpec.newInstance(DynamicCluster.class)
-                .parent(this)
-                .configure(flags));
+        // Relies on initialSize being inherited by DynamicCluster, because key id is identical
+        // We add the zookeeper configuration to the KafkaBroker specification here
+        DynamicCluster cluster = addChild(EntitySpecs.spec(DynamicCluster.class)
+                .configure("memberSpec", EntitySpecs.wrapSpec(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)));
         if (Entities.isManaged(this)) Entities.manage(cluster);
         setAttribute(CLUSTER, cluster);
     }
 
     @Override
-    public KafkaZookeeper getZookeeper() {
+    public Zookeeper getZookeeper() {
         return getAttribute(ZOOKEEPER);
     }
 
     @Override
-    public synchronized ConfigurableEntityFactory<KafkaBroker> getBrokerFactory() {
-        return (ConfigurableEntityFactory<KafkaBroker>) getAttribute(BROKER_FACTORY);
-    }
-
-    @Override
-    public synchronized DynamicCluster getCluster() {
+    public DynamicCluster getCluster() {
         return getAttribute(CLUSTER);
     }
 
     @Override
     public void start(Collection<? extends Location> locations) {
         if (isLegacyConstruction()) {
-            postConstruct();
+            init();
         }
 
-        if (locations.isEmpty()) locations = this.getLocations();
-        Iterables.getOnlyElement(locations); //assert just one
+        if (locations.isEmpty()) locations = getLocations();
+        Iterables.getOnlyElement(locations); // Assert just one
         addLocations(locations);
 
         List<Entity> childrenToStart = MutableList.<Entity>of(getCluster());
         // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent
         if (getZookeeper().getParent() == null) {
             addChild(getZookeeper());
-        }
-        // And only start zookeeper if we are parent
-        if (this.equals(getZookeeper().getParent())) childrenToStart.add(getZookeeper());
-        try {
-            Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).get();
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
-        } catch (ExecutionException e) {
-            throw Exceptions.propagate(e);
-        }
+        } // And only start zookeeper if we are parent
+        if (Objects.equal(this, getZookeeper().getParent())) childrenToStart.add(getZookeeper());
+        Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked();
 
         connectSensors();
     }
 
     @Override
     public void stop() {
-        if (this.equals(getZookeeper().getParent())) {
-            getZookeeper().stop();
+        List<Exception> errors = Lists.newArrayList();
+        if (getZookeeper() != null && Objects.equal(this, getZookeeper().getParent())) {
+            try {
+                getZookeeper().stop();
+            } catch (Exception e) {
+                errors.add(e);
+            }
+        }
+        if (getCurrentSize() > 0) {
+            try {
+                getCluster().stop();
+            } catch (Exception e) {
+                errors.add(e);
+            }
         }
-        getCluster().stop();
 
-        super.getLocations().clear();
+        getLocations().clear();
         setAttribute(SERVICE_UP, false);
+
+        if (errors.size() != 0) {
+            throw new CompoundRuntimeException("Error stopping Kafka cluster", errors);
+        }
     }
 
     @Override
@@ -181,14 +173,32 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
                 .addToEntityAndEmitAll(this);
     }
 
+    /*
+     * All Group and Resizable interface methods are delegated to the broker cluster.
+     */
+
+    /** {@inheritDoc} */
     @Override
-    public Integer resize(Integer desiredSize) {
-        return getCluster().resize(desiredSize);
-    }
+    public Collection<Entity> getMembers() { return getCluster().getMembers(); }
 
-    /** @return the current size of the group. */
-    public Integer getCurrentSize() {
-        return getCluster().getCurrentSize();
-    }
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasMember(Entity member) { return getCluster().hasMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean addMember(Entity member) { return getCluster().addMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean removeMember(Entity member) { return getCluster().removeMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer getCurrentSize() { return getCluster().getCurrentSize(); }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); }
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
index a001a29..a0d7a46 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
@@ -17,40 +17,28 @@ package brooklyn.entity.messaging.kafka;
 
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.java.UsesJmx;
 import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.entity.zookeeper.Zookeeper;
 import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
 import brooklyn.util.flags.SetFromFlag;
 
 /**
  * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance.
  */
 @ImplementedBy(KafkaZookeeperImpl.class)
-public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka {
+public interface KafkaZookeeper extends Zookeeper, Kafka {
 
     @SetFromFlag("startTimeout")
     public static final ConfigKey<Integer> START_TIMEOUT = SoftwareProcess.START_TIMEOUT;
 
+    /** The Kafka version, not the Zookeeper version. */
     @SetFromFlag("version")
     ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
 
-    @SetFromFlag("zookeeperPort")
-    PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
-
-    /** Location of the configuration file template to be copied to the server. */
-    @SetFromFlag("zookeeperConfig")
-    ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
-            String.class, "kafka.zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
-
-    AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.outstandingRequests", "Outstanding request count");
-    AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.received", "Total packets received");
-    AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.sent", "Total packets sent");
-
-    Integer getZookeeperPort();
-
-    String getHostname();
+    /** Location of the kafka configuration file template to be copied to the server. */
+    @SetFromFlag("kafkaZookeeperConfig")
+    ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class,
+            "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)",
+            "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
index 00f892b..79a6cf6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
@@ -15,37 +15,22 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects.ToStringHelper;
+
 import brooklyn.entity.Entity;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.entity.zookeeper.AbstractZookeeperImpl;
 import brooklyn.util.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
 
 /**
  * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance.
  */
-public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZookeeper {
+public class KafkaZookeeperImpl extends AbstractZookeeperImpl implements KafkaZookeeper {
+
     private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperImpl.class);
 
     public KafkaZookeeperImpl() {
@@ -62,83 +47,8 @@ public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZook
     }
 
     @Override
-    public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
-
-    @Override
-    public String getHostname() { return getAttribute(HOSTNAME); }
-
-    @Override
     public Class<?> getDriverInterface() {
         return KafkaZookeeperDriver.class;
     }
 
-    private ObjectName zookeeperMbean = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
-    private volatile FunctionFeed functionFeed;
-    private volatile JmxFeed jmxFeed;
-
-    /** Wait for five minutes to start. */
-    @Override
-    public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); }
-
-    @Override
-    public void waitForServiceUp(long duration, TimeUnit units) {
-        super.waitForServiceUp(duration, units);
-
-        // Wait for the MBean to exist
-        JmxHelper helper = null;
-        try {
-            helper = new JmxHelper(this);
-            helper.connect();
-            helper.assertMBeanExistsEventually(zookeeperMbean, units.toMillis(duration));
-        } catch (IOException e) {
-            throw Exceptions.propagate(e);
-        } finally {
-            if (helper != null) helper.disconnect();
-        }
-    }
-
-    @Override
-    protected void connectSensors() {
-        functionFeed = FunctionFeed.builder()
-                .entity(this)
-                .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
-                        .period(500, TimeUnit.MILLISECONDS)
-                        .callable(new Callable<Boolean>() {
-                            public Boolean call() throws Exception {
-                                return getDriver().isRunning();
-                            }
-                        })
-                        .onError(Functions.constant(Boolean.FALSE)))
-                .build();
-
-        jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .period(500, TimeUnit.MILLISECONDS)
-                .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
-                        .objectName(zookeeperMbean)
-                        .attributeName("OutstandingRequests")
-                        .onError(Functions.constant(-1l)))
-                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
-                        .objectName(zookeeperMbean)
-                        .attributeName("PacketsReceived")
-                        .onError(Functions.constant(-1l)))
-                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
-                        .objectName(zookeeperMbean)
-                        .attributeName("PacketsSent")
-                        .onError(Functions.constant(-1l)))
-                .build();
-    }
-
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        if (functionFeed != null) functionFeed.stop();
-        if (jmxFeed != null) jmxFeed.stop();
-    }
-
-    @Override
-    protected ToStringHelper toStringHelper() {
-        return super.toStringHelper().add("zookeeperPort", getZookeeperPort());
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
index a35aab6..df417e0 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -34,7 +34,7 @@ public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements
 
     @Override
     protected ConfigKey<String> getConfigTemplateKey() {
-        return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE;
+        return KafkaZookeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
new file mode 100644
index 0000000..2e2fc73
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.zookeeper;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.util.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance.
+ */
+public abstract class AbstractZookeeperImpl extends SoftwareProcessImpl implements Zookeeper {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractZookeeperImpl.class);
+    private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
+
+    private volatile JmxFeed jmxFeed;
+
+    public AbstractZookeeperImpl() {
+        super();
+    }
+    public AbstractZookeeperImpl(Map<?, ?> properties) {
+        this(properties, null);
+    }
+    public AbstractZookeeperImpl(Entity parent) {
+        this(MutableMap.of(), parent);
+    }
+    public AbstractZookeeperImpl(Map<?, ?> properties, Entity parent) {
+        super(properties, parent);
+    }
+
+    @Override
+    public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
+
+    @Override
+    public String getHostname() { return getAttribute(HOSTNAME); }
+
+    @Override
+    public void waitForServiceUp(long duration, TimeUnit units) {
+        super.waitForServiceUp(duration, units);
+
+        // Wait for the MBean to exist
+        JmxHelper helper = new JmxHelper(this);
+        try {
+            helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+        } finally {
+            helper.disconnect();
+        }
+    }
+
+    @Override
+    protected void connectSensors() {
+        connectServiceUpIsRunning();
+
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("OutstandingRequests")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("PacketsReceived")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("PacketsSent")
+                        .onError(Functions.constant(-1l)))
+                .build();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper()
+                .add("zookeeperPort", getZookeeperPort());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
new file mode 100644
index 0000000..369ff61
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.zookeeper;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance.
+ * <p>
+ * Currently {@code abstract} as there is no generic Zookeeper driver.
+ */
+@ImplementedBy(AbstractZookeeperImpl.class)
+public interface Zookeeper extends SoftwareProcess, UsesJmx {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = new BasicConfigKey<String>(SoftwareProcess.SUGGESTED_VERSION, "3.3.3");
+
+    @SetFromFlag("zookeeperPort")
+    PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
+
+    AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count");
+    AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received");
+    AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent");
+
+    Integer getZookeeperPort();
+
+    String getHostname();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
index 28ff308..8733cb0 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
@@ -41,19 +41,15 @@ public class ActiveMQIntegrationTest {
     private Location testLocation
     private ActiveMQBroker activeMQ
 
-    @BeforeMethod(groups = "Integration")
+    @BeforeMethod(alwaysRun = true)
     public void setup() {
         app = ApplicationBuilder.newManagedApp(TestApplication.class);
         testLocation = new LocalhostMachineProvisioningLocation()
     }
 
-    @AfterMethod(groups = "Integration")
+    @AfterMethod(alwaysRun = true)
     public void shutdown() {
-        try {
-            if (app != null) Entities.destroyAll(app);
-        } catch (Exception e) {
-            log.warn("Error stopping entities", e);
-        }
+        if (app != null) Entities.destroyAll(app);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
deleted file mode 100644
index 2ef95c5..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2013 by Cloudsoft Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import static brooklyn.test.TestUtils.*
-import static java.util.concurrent.TimeUnit.*
-import static org.testng.Assert.*
-
-import java.util.concurrent.TimeUnit
-
-import javax.jms.Connection
-import javax.jms.MessageConsumer
-import javax.jms.MessageProducer
-import javax.jms.Queue
-import javax.jms.Session
-import javax.jms.TextMessage
-
-import org.apache.activemq.ActiveMQConnectionFactory
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.testng.annotations.AfterMethod
-import org.testng.annotations.BeforeMethod
-import org.testng.annotations.Test
-
-import brooklyn.entity.basic.ApplicationBuilder
-import brooklyn.entity.basic.Entities
-import brooklyn.entity.proxying.BasicEntitySpec
-import brooklyn.entity.trait.Startable
-import brooklyn.location.Location
-import brooklyn.location.basic.LocalhostMachineProvisioningLocation
-import brooklyn.test.entity.TestApplication
-import brooklyn.util.internal.TimeExtras
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- *
- * TODO test that sensors update.
- */
-public class KafkaIntegrationTest {
-    private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class)
-
-    static { TimeExtras.init() }
-
-    private TestApplication app
-    private Location testLocation
-
-    @BeforeMethod(groups = "Integration")
-    public void setup() {
-        app = ApplicationBuilder.builder(TestApplication.class).manage();
-        testLocation = new LocalhostMachineProvisioningLocation()
-    }
-
-    @AfterMethod(groups = "Integration")
-    public void shutdown() {
-        if (app != null) Entities.destroyAll(app);
-    }
-
-    /**
-     * Test that we can start a zookeeper.
-     */
-    @Test(groups = "Integration")
-    public void testZookeeper() {
-        KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
-
-        zookeeper.start([ testLocation ])
-        executeUntilSucceedsWithShutdown(zookeeper, timeout:600*TimeUnit.SECONDS) {
-            assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
-        }
-        assertFalse zookeeper.getAttribute(Startable.SERVICE_UP)
-    }
-
-    /**
-     * Test that we can start a  broker and zookeeper together.
-     */
-    @Test(groups = "Integration")
-    public void testBrokerPlusZookeeper() {
-        KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
-        KafkaBroker broker = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
-
-        zookeeper.start([ testLocation ])
-        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
-            assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
-        }
-    
-        broker.start([ testLocation ])
-        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
-            assertTrue broker.getAttribute(Startable.SERVICE_UP)
-        }
-    }
-
-    /**
-     * Test that we can start a cluster with zookeeper and one broker.
-     *
-     * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
-     */
-    @Test(groups = "Integration")
-    public void testSingleBrokerCluster() {
-        KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1));
-
-        cluster.start([ testLocation ])
-        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
-            assertTrue cluster.getAttribute(Startable.SERVICE_UP)
-        }
-
-        Entities.dumpInfo(cluster);
-
-        KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
-        support.sendMessage("brooklyn", "TEST_MESSAGE")
-        String message = support.getMessage("brooklyn");
-        assertEquals(message, "TEST_MESSAGE");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
new file mode 100644
index 0000000..54f698a
--- /dev/null
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.activemq.ActiveMQBroker;
+import brooklyn.entity.proxying.EntitySpecs;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.Asserts;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.util.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Test the operation of the {@link ActiveMQBroker} class.
+ *
+ * TODO test that sensors update.
+ */
+public class KafkaIntegrationTest {
+
+    private TestApplication app;
+    private Location testLocation;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = new LocalhostMachineProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() {
+        if (app != null) Entities.destroyAll(app);
+    }
+
+    /**
+     * Test that we can start a zookeeper.
+     */
+    @Test(groups = "Integration")
+    public void testZookeeper() {
+        final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class));
+
+        zookeeper.start(ImmutableList.of(testLocation));
+        Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP));
+                return null;
+            }
+        });
+
+        zookeeper.stop();
+        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that we can start a  broker and zookeeper together.
+     */
+    @Test(groups = "Integration")
+    public void testBrokerPlusZookeeper() {
+        final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class));
+        final KafkaBroker broker = app.createAndManageChild(EntitySpecs.spec(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
+
+        zookeeper.start(ImmutableList.of(testLocation));
+        Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP));
+                return null;
+            }
+        });
+
+        broker.start(ImmutableList.of(testLocation));
+        Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(broker.getAttribute(Startable.SERVICE_UP));
+                return null;
+            }
+        });
+
+        zookeeper.stop();
+        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
+
+        broker.stop();
+        assertFalse(broker.getAttribute(Startable.SERVICE_UP));
+    }
+
+    /**
+     * Test that we can start a cluster with zookeeper and one broker.
+     *
+     * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
+     */
+    @Test(groups = "Integration")
+    public void testSingleBrokerCluster() {
+        final KafkaCluster cluster = app.createAndManageChild(EntitySpecs.spec(KafkaCluster.class));
+
+        cluster.start(ImmutableList.of(testLocation));
+        Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() {
+            @Override
+            public Void call() {
+                assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
+                assertTrue(cluster.getZookeeper().getAttribute(Startable.SERVICE_UP));
+                assertEquals(cluster.getCurrentSize().intValue(), 1);
+                return null;
+            }
+        });
+
+        Entities.dumpInfo(cluster);
+
+        KafkaSupport support = new KafkaSupport(cluster);
+
+        support.sendMessage("brooklyn", "TEST_MESSAGE");
+        String message = support.getMessage("brooklyn");
+        assertEquals(message, "TEST_MESSAGE");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index d9372a9..019a65b 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -31,40 +31,54 @@ import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
 import kafka.producer.ProducerConfig;
 import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.zookeeper.Zookeeper;
 
-import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
+/**
+ * Kafka test framework for integration and live tests, using the Kafka Java API.
+ */
 public class KafkaSupport {
 
-    private final KafkaZookeeper zookeeper;
+    private final KafkaCluster cluster;
 
-    public KafkaSupport(KafkaZookeeper zookeeper) {
-        this.zookeeper = zookeeper;
+    public KafkaSupport(KafkaCluster cluster) {
+        this.cluster = cluster;
     }
 
+    /**
+     * Send a message to the {@link KafkaCluster} on the given topic.
+     */
     public void sendMessage(String topic, String message) {
+        Zookeeper zookeeper = cluster.getZookeeper();
         Properties props = new Properties();
         props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         ProducerConfig config = new ProducerConfig(props);
+
         Producer<String, String> producer = new Producer<String, String>(config);
         ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
         producer.send(data);
         producer.close();
     }
 
+    /**
+     * Retrieve the next message on the given topic from the {@link KafkaCluster}.
+     */
     public String getMessage(String topic) {
+        Zookeeper zookeeper = cluster.getZookeeper();
         Properties props = new Properties();
         props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
-        props.put("zk.connectiontimeout.ms", "1000000");
+        props.put("zk.connectiontimeout.ms", "120000"); // two minutes
         props.put("groupid", "brooklyn");
         ConsumerConfig consumerConfig = new ConsumerConfig(props);
+
         ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
         List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
         ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
         Message msg = iterator.next();
+
         assertTrue(msg.isValid());
         ByteBuffer buf = msg.payload();
         byte[] data = new byte[buf.remaining()];


Mime
View raw message