brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [50/64] incubator-brooklyn git commit: brooklyn-software-messaging: add org.apache package prefix
Date Tue, 18 Aug 2015 11:01:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
deleted file mode 100644
index 7b810ae..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-import brooklyn.entity.messaging.MessageBroker;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Objects.ToStringHelper;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka broker instance.
- */
-public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker {
-
-    @SuppressWarnings("unused")
-    private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class);
-    private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
-
-    private volatile JmxFeed jmxFeed;
-
-    public KafkaBrokerImpl() {
-        super();
-    }
-
-    @Override
-    public void init() {
-        super.init();
-        setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work
-    }
-
-    @Override
-    public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); }
-
-    @Override
-    public Integer getBrokerId() { return getAttribute(BROKER_ID); }
-
-    @Override
-    public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); }
-
-    @Override
-    public Class<?> getDriverInterface() {
-        return KafkaBrokerDriver.class;
-    }
-
-    @Override
-    public void waitForServiceUp(long duration, TimeUnit units) {
-        super.waitForServiceUp(duration, units);
-
-        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
-            // Wait for the MBean to exist
-            JmxHelper helper = new JmxHelper(this);
-            try {
-                helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
-            } finally {
-                helper.terminate();
-            }
-        }
-    }
-
-    @Override
-    protected void connectSensors() {
-        connectServiceUpIsRunning();
-        boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
-        
-        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
-            jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .period(500, TimeUnit.MILLISECONDS)
-                .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("NumFetchRequests")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("TotalFetchRequestMs")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("MaxFetchRequestMs")
-                        .onException(Functions.constant(-1.0d))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("NumProduceRequests")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("TotalProduceRequestMs")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("MaxProduceRequestMs")
-                        .onException(Functions.constant(-1.0d))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("TotalBytesRead")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT)
-                        .objectName(SOCKET_SERVER_STATS_MBEAN)
-                        .attributeName("TotalBytesWritten")
-                        .onException(Functions.constant(-1l))
-                        .enabled(retrieveUsageMetrics))
-                .build();
-        }
-
-        setBrokerUrl();
-    }
-
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        disconnectServiceUpIsRunning();
-        if (jmxFeed != null) jmxFeed.stop();
-    }
-
-    @Override
-    protected ToStringHelper toStringHelper() {
-        return super.toStringHelper()
-                .add("kafkaPort", getKafkaPort());
-    }
-
-    /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */
-    @Override
-    public void setBrokerUrl() {
-        ZooKeeperNode zookeeper = getZookeeper();
-        if (zookeeper != null) {
-            setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort()));
-        } else {
-            setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort()));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
deleted file mode 100644
index 7892ac5..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.java.UsesJmx;
-import brooklyn.entity.java.UsesJmx.JmxAgentModes;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-import brooklyn.util.collections.MutableMap;
-
-public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerSshDriver.class);
-
-    public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    @Override
-    protected Map<String, Integer> getPortMap() {
-        return MutableMap.of("kafkaPort", getKafkaPort());
-    }
-
-    @Override
-    protected ConfigKey<String> getConfigTemplateKey() {
-        return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE;
-    }
-
-    @Override
-    protected String getConfigFileName() {
-        return "server.properties";
-    }
-
-    @Override
-    protected String getLaunchScriptName() {
-        return "kafka-server-start.sh";
-    }
-
-    @Override
-    public String getTopicsScriptName() {
-        return "kafka-topics.sh";
-    }
-
-    @Override
-    protected String getProcessIdentifier() {
-        return "kafka\\.Kafka";
-    }
-
-    @Override
-    public Integer getKafkaPort() {
-        return getEntity().getAttribute(KafkaBroker.KAFKA_PORT);
-    }
-
-    @Override
-    public Map<String, String> getShellEnvironment() {
-        JmxAgentModes jmxAgentMode = getEntity().getConfig(KafkaBroker.JMX_AGENT_MODE);
-        String jmxPort;
-        if (jmxAgentMode == JmxAgentModes.NONE) {
-            // seems odd to pass RMI port here, as it gets assigned to com.sun.mgmt.jmx.port in kafka-run-class.sh
-            // but RMI server/registry port works, whereas JMX port does not
-            jmxPort = String.valueOf(entity.getAttribute(UsesJmx.JMX_PORT));
-        } else {
-            /*
-             * See ./bin/kafka-server-start.sh  and ./bin/kafka-run-class.sh
-             * Really hard to turn off jmxremote on kafka! And can't use default because
-             * uses 9999, which means could only run one kafka broker per server.
-             */
-            jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT));
-        }
-
-        return MutableMap.<String, String> builder()
-                .putAll(super.getShellEnvironment())
-                .put("JMX_PORT", jmxPort)
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
deleted file mode 100644
index 3a24377..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-import org.apache.brooklyn.api.event.AttributeSensor;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.BrooklynConfigKeys;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.group.Cluster;
-import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.trait.Resizable;
-import brooklyn.entity.trait.Startable;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
-import brooklyn.event.basic.BasicAttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.util.time.Duration;
-
-/**
- * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled
- * by a single {@link KafkaZookeeper zookeeper} entity.
- * <p>
- * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications}
- * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the
- * Kafka zookeeper.
- * <p>
- * The contents of this entity are:
- * <ul>
- * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s
- * <li>a {@link KafkaZookeeper} or {@link Zookeeper}
- * <li>a {@link org.apache.brooklyn.api.policy.Policy} to resize the broker cluster
- * </ul>
- * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling
- * {@link Resizable#resize(Integer) resize} will change the number of brokers.
- */
-@SuppressWarnings({ "unchecked", "rawtypes" })
-@Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system", iconUrl="classpath://brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg")
-@ImplementedBy(KafkaClusterImpl.class)
-public interface KafkaCluster extends Entity, Startable, Resizable, Group  {
-
-    @SetFromFlag("startTimeout")
-    ConfigKey<Duration> START_TIMEOUT = BrooklynConfigKeys.START_TIMEOUT;
-
-    @SetFromFlag("initialSize")
-    ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 1);
-
-    /** Zookeeper for the cluster. If null a default be will created. */
-    @SetFromFlag("zookeeper")
-    BasicAttributeSensorAndConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<ZooKeeperNode>(
-            ZooKeeperNode.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created");
-
-    /** Spec for creating the default Kafka zookeeper entity. */
-    @SetFromFlag("zookeeperSpec")
-    BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZooKeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey(
-            EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper");
-
-    /** Spec for Kafka broker entities to be created. */
-    @SetFromFlag("brokerSpec")
-    BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey(
-            EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created");
-
-    /** Underlying Kafka broker cluster. */
-    AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>(
-            DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster");
-
-    ZooKeeperNode getZooKeeper();
-
-    DynamicCluster getCluster();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
deleted file mode 100644
index b5b8449..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.enricher.Enrichers;
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.trait.Startable;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
-import brooklyn.event.feed.ConfigToAttributes;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.exceptions.CompoundRuntimeException;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node and a group of {@link KafkaBroker}s.
- */
-public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
-
-    public static final Logger log = LoggerFactory.getLogger(KafkaClusterImpl.class);
-
-    public KafkaClusterImpl() {
-    }
-
-    @Override
-    public void init() {
-        super.init();
-        
-        setAttribute(SERVICE_UP, false);
-        ConfigToAttributes.apply(this, BROKER_SPEC);
-        ConfigToAttributes.apply(this, ZOOKEEPER);
-        ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
-
-        log.debug("creating zookeeper child for {}", this);
-        ZooKeeperNode zookeeper = getAttribute(ZOOKEEPER);
-        if (zookeeper == null) {
-            EntitySpec<KafkaZooKeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC);
-            if (zookeeperSpec == null) {
-                log.debug("creating zookeeper using default spec for {}", this);
-                zookeeperSpec = EntitySpec.create(KafkaZooKeeper.class);
-                setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
-            } else {
-                log.debug("creating zookeeper using custom spec for {}", this);
-            }
-            zookeeper = addChild(zookeeperSpec);
-            if (Entities.isManaged(this)) Entities.manage(zookeeper);
-            setAttribute(ZOOKEEPER, zookeeper);
-        }
-
-        log.debug("creating cluster child for {}", this);
-        EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC);
-        if (brokerSpec == null) {
-            log.debug("creating default broker spec for {}", this);
-            brokerSpec = EntitySpec.create(KafkaBroker.class);
-            setAttribute(BROKER_SPEC, brokerSpec);
-        }
-        // Relies on initialSize being inherited by DynamicCluster, because key id is identical
-        // We add the zookeeper configuration to the KafkaBroker specification here
-        DynamicCluster cluster = addChild(EntitySpec.create(DynamicCluster.class)
-                .configure("memberSpec", EntitySpec.create(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)));
-        if (Entities.isManaged(this)) Entities.manage(cluster);
-        setAttribute(CLUSTER, cluster);
-        
-        connectSensors();
-    }
-
-    @Override
-    public ZooKeeperNode getZooKeeper() {
-        return getAttribute(ZOOKEEPER);
-    }
-
-    @Override
-    public DynamicCluster getCluster() {
-        return getAttribute(CLUSTER);
-    }
-
-    @Override
-    public void start(Collection<? extends Location> locations) {
-        if (isLegacyConstruction()) {
-            init();
-        }
-
-        if (locations.isEmpty()) locations = getLocations();
-        Iterables.getOnlyElement(locations); // Assert just one
-        addLocations(locations);
-
-        List<Entity> childrenToStart = MutableList.<Entity>of(getCluster());
-        // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent
-        if (getZooKeeper().getParent() == null) {
-            addChild(getZooKeeper());
-        } // And only start zookeeper if we are parent
-        if (Objects.equal(this, getZooKeeper().getParent())) childrenToStart.add(getZooKeeper());
-        Entities.invokeEffector(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked();
-    }
-
-    @Override
-    public void stop() {
-        List<Exception> errors = Lists.newArrayList();
-        if (getZooKeeper() != null && Objects.equal(this, getZooKeeper().getParent())) {
-            try {
-                getZooKeeper().stop();
-            } catch (Exception e) {
-                errors.add(e);
-            }
-        }
-        if (getCurrentSize() > 0) {
-            try {
-                getCluster().stop();
-            } catch (Exception e) {
-                errors.add(e);
-            }
-        }
-
-        clearLocations();
-        setAttribute(SERVICE_UP, false);
-
-        if (errors.size() != 0) {
-            throw new CompoundRuntimeException("Error stopping Kafka cluster", errors);
-        }
-    }
-
-    @Override
-    public void restart() {
-        // TODO prod the entities themselves to restart, instead?
-        Collection<Location> locations = Lists.newArrayList(getLocations());
-
-        stop();
-        start(locations);
-    }
-
-    void connectSensors() {
-        addEnricher(Enrichers.builder()
-                .propagatingAllBut(SERVICE_UP)
-                .from(getCluster())
-                .build());
-        addEnricher(Enrichers.builder()
-                .propagating(SERVICE_UP)
-                .from(getZooKeeper())
-                .build());
-    }
-
-    /*
-     * All Group and Resizable interface methods are delegated to the broker cluster.
-     */
-
-    /** {@inheritDoc} */
-    @Override
-    public Collection<Entity> getMembers() { return getCluster().getMembers(); }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean hasMember(Entity member) { return getCluster().hasMember(member); }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean addMember(Entity member) { return getCluster().addMember(member); }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean removeMember(Entity member) { return getCluster().removeMember(member); }
-
-    /** {@inheritDoc} */
-    @Override
-    public Integer getCurrentSize() { return getCluster().getCurrentSize(); }
-
-    /** {@inheritDoc} */
-    @Override
-    public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); }
-
-    @Override
-    public <T extends Entity> T addMemberChild(EntitySpec<T> spec) { return getCluster().addMemberChild(spec); }
-
-    @Override
-    public <T extends Entity> T addMemberChild(T child) { return getCluster().addMemberChild(child); }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
deleted file mode 100644
index 106690a..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.annotation.Effector;
-import brooklyn.entity.annotation.EffectorParam;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.util.time.Duration;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance.
- */
-@ImplementedBy(KafkaZooKeeperImpl.class)
-public interface KafkaZooKeeper extends ZooKeeperNode, Kafka {
-
-    @SetFromFlag("startTimeout")
-    ConfigKey<Duration> START_TIMEOUT = SoftwareProcess.START_TIMEOUT;
-
-    /** The Kafka version, not the Zookeeper version. */
-    @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
-    
-    /** The Kafka version, not the Zookeeper version. */
-    @SetFromFlag("downloadUrl")
-    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = Kafka.DOWNLOAD_URL;
-
-    /** Location of the kafka configuration file template to be copied to the server. */
-    @SetFromFlag("kafkaZookeeperConfig")
-    ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class,
-            "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)",
-            "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
-
-    @Effector(description = "Create a topic with a single partition and only one replica")
-    void createTopic(@EffectorParam(name = "topic") String topic);
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
deleted file mode 100644
index 97edc8b..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import brooklyn.entity.java.JavaSoftwareProcessDriver;
-
-public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver {
-
-    Integer getZookeeperPort();
-
-    void createTopic(String topic);
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
deleted file mode 100644
index c9a1148..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import brooklyn.entity.annotation.EffectorParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.zookeeper.AbstractZooKeeperImpl;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance.
- */
-public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZooKeeper {
-
-    @SuppressWarnings("unused")
-    private static final Logger log = LoggerFactory.getLogger(KafkaZooKeeperImpl.class);
-
-    public KafkaZooKeeperImpl() {
-    }
-
-    @Override
-    public Class<?> getDriverInterface() {
-        return KafkaZooKeeperDriver.class;
-    }
-
-    @Override
-    public void createTopic(String topic) {
-        ((KafkaZooKeeperDriver)getDriver()).createTopic(topic);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
deleted file mode 100644
index dc7688f..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.kafka;
-
-import java.util.Map;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.Attributes;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-import brooklyn.util.collections.MutableMap;
-
-import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash;
-
-public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZooKeeperDriver {
-
-    public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    @Override
-    protected Map<String, Integer> getPortMap() {
-        return MutableMap.of("zookeeperPort", getZookeeperPort());
-    }
-
-    @Override
-    protected ConfigKey<String> getConfigTemplateKey() {
-        return KafkaZooKeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE;
-    }
-
-    @Override
-    protected String getConfigFileName() {
-        return "zookeeper.properties";
-    }
-
-    @Override
-    protected String getLaunchScriptName() {
-        return "zookeeper-server-start.sh";
-    }
-
-    @Override
-    protected String getTopicsScriptName() {
-        return "kafka-topics.sh";
-    }
-
-    @Override
-    protected String getProcessIdentifier() {
-        return "quorum\\.QuorumPeerMain";
-    }
-
-    @Override
-    public Integer getZookeeperPort() {
-        return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT);
-    }
-
-    @Override
-    public void createTopic(String topic) {
-        String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort();
-        newScript(CUSTOMIZING)
-                .failOnNonZeroResultCode()
-                .body.append(String.format("./bin/%s  --create --zookeeper \"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"",
-                                           getTopicsScriptName(),
-                                           escapeLiteralForDoubleQuotedBash(zookeeperUrl),
-                                           escapeLiteralForDoubleQuotedBash(topic)))
-                .execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java
deleted file mode 100644
index a2af8a4..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import java.util.Map;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.java.UsesJmx;
-import brooklyn.entity.messaging.MessageBroker;
-import brooklyn.entity.messaging.amqp.AmqpServer;
-import brooklyn.entity.messaging.jms.JMSBroker;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10.
- */
-@Catalog(name="Qpid Broker", description="Apache Qpid is an open-source messaging system, implementing the Advanced Message Queuing Protocol (AMQP)", iconUrl="classpath:///qpid-logo.jpeg")
-@ImplementedBy(QpidBrokerImpl.class)
-public interface QpidBroker extends SoftwareProcess, MessageBroker, UsesJmx, AmqpServer, JMSBroker<QpidQueue, QpidTopic> {
-
-    /* Qpid runtime file locations for convenience. */
-
-    public static final String CONFIG_XML = "etc/config.xml";
-    public static final String VIRTUALHOSTS_XML = "etc/virtualhosts.xml";
-    public static final String PASSWD = "etc/passwd";
-
-    @SetFromFlag("version")
-    public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.20");
-    
-    @SetFromFlag("downloadUrl")
-    public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
-            Attributes.DOWNLOAD_URL, "http://download.nextag.com/apache/qpid/${version}/qpid-java-broker-${version}.tar.gz");
-
-    @SetFromFlag("amqpPort")
-    public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT;
-
-    @SetFromFlag("virtualHost")
-    public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME;
-
-    @SetFromFlag("amqpVersion")
-    public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>(
-            AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_10);
-    
-    @SetFromFlag("httpManagementPort")
-    public static final PortAttributeSensorAndConfigKey HTTP_MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey("qpid.http-management.port", "Qpid HTTP management plugin port");
-
-    @SetFromFlag("jmxUser")
-    public static final BasicAttributeSensorAndConfigKey<String> JMX_USER = new BasicAttributeSensorAndConfigKey<String>(
-            UsesJmx.JMX_USER, "admin");
-    
-    @SetFromFlag("jmxPassword")
-    public static final BasicAttributeSensorAndConfigKey<String> JMX_PASSWORD = new BasicAttributeSensorAndConfigKey<String>(
-            UsesJmx.JMX_PASSWORD, "admin");
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
deleted file mode 100644
index baf487d..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.java.JmxSupport;
-import brooklyn.entity.messaging.jms.JMSBrokerImpl;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
-import brooklyn.util.exceptions.Exceptions;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Objects.ToStringHelper;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10.
- */
-public class QpidBrokerImpl extends JMSBrokerImpl<QpidQueue, QpidTopic> implements QpidBroker {
-    private static final Logger log = LoggerFactory.getLogger(QpidBrokerImpl.class);
-
-    private volatile JmxFeed jmxFeed;
-
-    public QpidBrokerImpl() {
-        super();
-    }
-
-    public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); }
-    public String getAmqpVersion() { return getAttribute(AMQP_VERSION); }
-    public Integer getAmqpPort() { return getAttribute(AMQP_PORT); }
-
-    public void setBrokerUrl() {
-        String urlFormat = "amqp://guest:guest@/%s?brokerlist='tcp://%s:%d'";
-        setAttribute(BROKER_URL, format(urlFormat, getAttribute(VIRTUAL_HOST_NAME), getAttribute(HOSTNAME), getAttribute(AMQP_PORT)));
-    }
-    
-    @Override
-    public void init() {
-        super.init();
-        new JmxSupport(this, null).recommendJmxRmiCustomAgent();
-    }
-
-    public void waitForServiceUp(long duration, TimeUnit units) {
-        super.waitForServiceUp(duration, units);
-
-        // Also wait for the MBean to exist (as used when creating queue/topic)
-        JmxHelper helper = new JmxHelper(this);
-        try {
-            String virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME);
-            ObjectName virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost));
-            helper.connect();
-            helper.assertMBeanExistsEventually(virtualHostManager, units.toMillis(duration));
-        } catch (MalformedObjectNameException e) {
-            throw Exceptions.propagate(e);
-        } catch (IOException e) {
-            throw Exceptions.propagate(e);
-        } finally {
-            if (helper != null) helper.terminate();
-        }
-    }
-    
-    public QpidQueue createQueue(Map properties) {
-        QpidQueue result = addChild(EntitySpec.create(QpidQueue.class).configure(properties));
-        Entities.manage(result);
-        result.create();
-        return result;
-    }
-
-    public QpidTopic createTopic(Map properties) {
-        QpidTopic result = addChild(EntitySpec.create(QpidTopic.class).configure(properties));
-        Entities.manage(result);
-        result.create();
-        return result;
-    }
-
-    @Override
-    public Class getDriverInterface() {
-        return QpidDriver.class;
-    }
-
-    @Override
-    protected void connectSensors() {
-        super.connectSensors();
-        String serverInfoMBeanName = "org.apache.qpid:type=ServerInformation,name=ServerInformation";
-
-        jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .period(500, TimeUnit.MILLISECONDS)
-                .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP)
-                        .objectName(serverInfoMBeanName)
-                        .attributeName("ProductVersion")
-                        .onSuccess(new Function<Object,Boolean>() {
-                                private boolean hasWarnedOfVersionMismatch;
-                                @Override public Boolean apply(Object input) {
-                                    if (input == null) return false;
-                                    if (!hasWarnedOfVersionMismatch && !getConfig(QpidBroker.SUGGESTED_VERSION).equals(input)) {
-                                        log.warn("Qpid version mismatch: ProductVersion is {}, requested version is {}", input, getConfig(QpidBroker.SUGGESTED_VERSION));
-                                        hasWarnedOfVersionMismatch = true;
-                                    }
-                                    return true;
-                                }})
-                        .onException(Functions.constant(false))
-                        .suppressDuplicates(true))
-                .build();
-    }
-
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        if (jmxFeed != null) jmxFeed.stop();
-    }
-
-    @Override
-    protected ToStringHelper toStringHelper() {
-        return super.toStringHelper().add("amqpPort", getAmqpPort());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java
deleted file mode 100644
index 9250e2a..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import brooklyn.entity.messaging.amqp.AmqpExchange;
-import brooklyn.entity.messaging.jms.JMSDestination;
-
-public interface QpidDestination extends JMSDestination, AmqpExchange {
-    
-    public void create();
-    
-    /**
-     * Return the AMQP name for the queue.
-     */
-    public String getQueueName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
deleted file mode 100644
index 0dc6390..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import static java.lang.String.format;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.java.UsesJmx;
-import brooklyn.entity.messaging.amqp.AmqpServer;
-import brooklyn.entity.messaging.jms.JMSDestinationImpl;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.event.feed.jmx.JmxHelper;
-import brooklyn.util.exceptions.Exceptions;
-
-public abstract class QpidDestinationImpl extends JMSDestinationImpl implements QpidDestination {
-    public static final Logger log = LoggerFactory.getLogger(QpidDestination.class);
-    
-    @SetFromFlag
-    String virtualHost;
-
-    protected ObjectName virtualHostManager;
-    protected ObjectName exchange;
-    protected transient JmxHelper jmxHelper;
-    protected volatile JmxFeed jmxFeed;
-
-    public QpidDestinationImpl() {
-    }
-
-    @Override
-    public QpidBroker getParent() {
-        return (QpidBroker) super.getParent();
-    }
-    
-    @Override
-    public void onManagementStarting() {
-        super.onManagementStarting();
-        
-        // TODO Would be nice to share the JmxHelper for all destinations, so just one connection.
-        // But tricky for if brooklyn were distributed
-        try {
-            if (virtualHost == null) virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME);
-            setAttribute(QpidBroker.VIRTUAL_HOST_NAME, virtualHost);
-            virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost));
-            jmxHelper = new JmxHelper((EntityLocal)getParent());
-        } catch (MalformedObjectNameException e) {
-            throw Exceptions.propagate(e);
-        }
-    }
-
-    @Override
-    protected void disconnectSensors() {
-        if (jmxFeed != null) jmxFeed.stop();
-    }
-
-    @Override
-    public void create() {
-        jmxHelper.operation(virtualHostManager, "createNewQueue", getName(), getParent().getAttribute(UsesJmx.JMX_USER), true);
-        jmxHelper.operation(exchange, "createNewBinding", getName(), getName());
-        connectSensors();
-    }
-    
-    @Override
-    public void delete() {
-        jmxHelper.operation(exchange, "removeBinding", getName(), getName());
-        jmxHelper.operation(virtualHostManager, "deleteQueue", getName());
-        disconnectSensors();
-    }
-
-    @Override
-    public String getQueueName() {
-
-        if (AmqpServer.AMQP_0_10.equals(getParent().getAmqpVersion())) {
-            return String.format("'%s'/'%s'; { assert: never }", getExchangeName(), getName());
-        } else {
-            return getName();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java
deleted file mode 100644
index 26d0e5a..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import brooklyn.entity.java.JavaSoftwareProcessDriver;
-
-public interface QpidDriver extends JavaSoftwareProcessDriver {
-
-    Integer getAmqpPort();
-
-    String getAmqpVersion();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java
deleted file mode 100644
index 42c3065..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-
-import brooklyn.entity.messaging.Queue;
-
-@ImplementedBy(QpidQueueImpl.class)
-public interface QpidQueue extends QpidDestination, Queue {
-    @Override
-    public String getExchangeName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
deleted file mode 100644
index 1774583..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import static java.lang.String.format;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import brooklyn.entity.messaging.amqp.AmqpExchange;
-import brooklyn.event.feed.jmx.JmxAttributePollConfig;
-import brooklyn.event.feed.jmx.JmxFeed;
-import brooklyn.util.exceptions.Exceptions;
-
-public class QpidQueueImpl extends QpidDestinationImpl implements QpidQueue {
-    public QpidQueueImpl() {
-    }
-
-    @Override
-    public void onManagementStarting() {
-        super.onManagementStarting();
-        setAttribute(QUEUE_NAME, getName());
-        try {
-            exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=direct", virtualHost, getExchangeName()));
-        } catch (MalformedObjectNameException e) {
-            throw Exceptions.propagate(e);
-        }
-    }
-
-    @Override
-    protected void connectSensors() {
-        String queue = format("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=\"%s\",name=\"%s\"", virtualHost, getName());
-        
-        jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .helper(jmxHelper)
-                .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_BYTES)
-                        .objectName(queue)
-                        .attributeName("QueueDepth"))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES)
-                        .objectName(queue)
-                        .attributeName("MessageCount"))
-                .build();
-    }
-
-    @Override
-    public String getExchangeName() {
-        return AmqpExchange.DIRECT;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java
deleted file mode 100644
index 2b1a8ec..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import static java.lang.String.format;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.net.Networking;
-import brooklyn.util.os.Os;
-import brooklyn.util.ssh.BashCommands;
-
-import com.google.common.collect.ImmutableMap;
-
-public class QpidSshDriver extends JavaSoftwareProcessSshDriver implements QpidDriver{
-
-    private static final Logger log = LoggerFactory.getLogger(QpidSshDriver.class);
-
-    public QpidSshDriver(QpidBrokerImpl entity, SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    @Override
-    protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), "log", "qpid.log"); }
-
-    @Override
-    public Integer getAmqpPort() { return entity.getAttribute(QpidBroker.AMQP_PORT); }
-
-    @Override
-    public String getAmqpVersion() { return entity.getAttribute(QpidBroker.AMQP_VERSION); }
-
-    public Integer getHttpManagementPort() { return entity.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT); }
-
-    @Override
-    public void preInstall() {
-        resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("qpid-broker-%s", getVersion()))));
-    }
-
-    @Override
-    public void install() {
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-
-        List<String> commands = new LinkedList<String>();
-        commands.addAll( BashCommands.commandsToDownloadUrlsAs(urls, saveAs));
-        commands.add(BashCommands.INSTALL_TAR);
-        commands.add("tar xzfv "+saveAs);
-
-        newScript(INSTALLING)
-                .body.append(commands)
-                .execute();
-    }
-
-    @Override
-    public void customize() {
-        Networking.checkPortsValid(MutableMap.of("jmxPort", getJmxPort(), "amqpPort", getAmqpPort()));
-        newScript(CUSTOMIZING)
-                .body.append(
-                        format("cp -R %s/{bin,etc,lib} .", getExpandedInstallDir()),
-                        "mkdir lib/opt"
-                    )
-                .execute();
-    }
-
-    @Override
-    public void launch() {
-        newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING)
-                .body.append("nohup ./bin/qpid-server -b '*' > qpid-server-launch.log 2>&1 &")
-                .execute();
-    }
-
-    public String getPidFile() { return "qpid-server.pid"; }
-    
-    @Override
-    public boolean isRunning() {
-        return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
-    }
-
-    @Override
-    public void stop() {
-        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
-    }
-
-    @Override
-    public void kill() {
-        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), KILLING).execute();
-    }
-
-    @Override
-    public Map<String, Object> getCustomJavaSystemProperties() {
-        return MutableMap.<String, Object>builder()
-                .putAll(super.getCustomJavaSystemProperties())
-                .put("connector.port", getAmqpPort())
-                .put("management.enabled", "true")
-                .put("management.jmxport.registryServer", getRmiRegistryPort())
-                .put("management.jmxport.connectorServer", getJmxPort())
-                .put("management.http.enabled", Boolean.toString(getHttpManagementPort() != null))
-                .putIfNotNull("management.http.port", getHttpManagementPort())
-                .build();
-    }
-
-    @Override
-    public Map<String, String> getShellEnvironment() {
-        return MutableMap.<String, String>builder()
-                .putAll(super.getShellEnvironment())
-                .put("QPID_HOME", getRunDir())
-                .put("QPID_WORK", getRunDir())
-                .renameKey("JAVA_OPTS", "QPID_OPTS")
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java
deleted file mode 100644
index 3ec1c06..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-
-import brooklyn.entity.messaging.Topic;
-
-@ImplementedBy(QpidTopicImpl.class)
-public interface QpidTopic extends QpidDestination, Topic {
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
deleted file mode 100644
index 1d8e3c3..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.qpid;
-
-import static java.lang.String.format;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import brooklyn.entity.messaging.amqp.AmqpExchange;
-import brooklyn.util.exceptions.Exceptions;
-
-public class QpidTopicImpl extends QpidDestinationImpl implements QpidTopic {
-
-    public QpidTopicImpl() {
-    }
-
-    @Override
-    public void onManagementStarting() {
-        super.onManagementStarting();
-        setAttribute(TOPIC_NAME, getName());
-        try {
-            String virtualHost = getParent().getVirtualHost();
-            exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=topic", virtualHost, getExchangeName()));
-        } catch (MalformedObjectNameException e) {
-            throw Exceptions.propagate(e);
-        }
-    }
-
-    // TODO sensors
-    @Override
-    public void connectSensors() {
-    }
-
-    @Override
-    public String getExchangeName() { return AmqpExchange.TOPIC; }
-
-    @Override
-    public String getTopicName() { return getQueueName(); }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java
deleted file mode 100644
index a70bfce..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.rabbit;
-
-import java.util.Map;
-
-import com.google.common.annotations.Beta;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-import org.apache.brooklyn.api.event.AttributeSensor;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.messaging.MessageBroker;
-import brooklyn.entity.messaging.amqp.AmqpServer;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1.
- */
-@Catalog(name="RabbitMQ Broker", description="RabbitMQ is an open source message broker software (i.e. message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP) standard", iconUrl="classpath:///RabbitMQLogo.png")
-@ImplementedBy(RabbitBrokerImpl.class)
-public interface RabbitBroker extends SoftwareProcess, MessageBroker, AmqpServer {
-
-    @SetFromFlag("version")
-    public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.7");
-
-    @SetFromFlag("downloadUrl")
-    public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
-            SoftwareProcess.DOWNLOAD_URL, "http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/rabbitmq-server-generic-unix-${version}.tar.gz");
-    
-    @SetFromFlag("erlangVersion")
-    public static final BasicConfigKey<String> ERLANG_VERSION = new BasicConfigKey<String>(String.class, "erlang.version", "Erlang runtime version", "R15B");
-
-    @SetFromFlag("rabbitmqConfigTemplateUrl")
-    ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
-            "rabbitmq.templateUrl", "Template file (in freemarker format) for the rabbitmq.config config file",
-            "classpath://brooklyn/entity/messaging/rabbit/rabbitmq.config");
-
-    @SetFromFlag("amqpPort")
-    public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT;
-
-    @SetFromFlag("virtualHost")
-    public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME;
-
-    @SetFromFlag("amqpVersion")
-    public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>(
-            AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_9_1);
-
-    @SetFromFlag("managmentPort")
-    public static final PortAttributeSensorAndConfigKey MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey(
-            "rabbitmq.management.port", "Port on which management interface will be available", "15672+");
-
-    public static AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor(
-            "rabbitmq.management.url", "Management URL is only available if management plugin flag is true");
-
-    @SetFromFlag("enableManagementPlugin")
-    public static final ConfigKey<Boolean> ENABLE_MANAGEMENT_PLUGIN = ConfigKeys.newBooleanConfigKey(
-            "rabbitmq.management.plugin", "Management plugin will be enabled", false);
-
-    RabbitQueue createQueue(Map properties);
-
-    // TODO required by RabbitDestination due to close-coupling between that and RabbitBroker; how best to improve?
-    @Beta
-    Map<String, String> getShellEnvironment();
-    
-    @Beta
-    String getRunDir();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
deleted file mode 100644
index 5d96d92..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.rabbit;
-
-import static java.lang.String.format;
-
-import java.util.Map;
-
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects.ToStringHelper;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.SoftwareProcessImpl;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1.
- */
-public class RabbitBrokerImpl extends SoftwareProcessImpl implements RabbitBroker {
-    private static final Logger log = LoggerFactory.getLogger(RabbitBrokerImpl.class);
-
-    public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); }
-    public String getAmqpVersion() { return getAttribute(AMQP_VERSION); }
-    public Integer getAmqpPort() { return getAttribute(AMQP_PORT); }
-
-    public RabbitBrokerImpl() {
-        super();
-    }
-
-    @Override
-    public RabbitDriver getDriver() {
-        return (RabbitDriver) super.getDriver();
-    }
-
-    @Override
-    public Map<String, String> getShellEnvironment() {
-        return getDriver().getShellEnvironment();
-    }
-    
-    @Override
-    public String getRunDir() {
-        return getDriver().getRunDir();
-    }
-    
-    @Override
-    protected void postStart() {
-        super.postStart();
-
-        getDriver().configure();
-
-        // TODO implement this using AMQP connection, no external mechanism available
-        // queueNames.each { String name -> addQueue(name) }
-    }
-
-    public void setBrokerUrl() {
-        String urlFormat = "amqp://guest:guest@%s:%d/%s";
-        setAttribute(BROKER_URL, format(urlFormat, getAttribute(HOSTNAME), getAttribute(AMQP_PORT), getAttribute(VIRTUAL_HOST_NAME)));
-    }
-
-    public RabbitQueue createQueue(Map properties) {
-        RabbitQueue result = addChild(EntitySpec.create(RabbitQueue.class).configure(properties));
-        Entities.manage(result);
-        result.create();
-        return result;
-    }
-
-    @Override
-    public Class<? extends RabbitDriver> getDriverInterface() {
-        return RabbitDriver.class;
-    }
-
-    @Override
-    protected void connectSensors() {
-        super.connectSensors();
-
-        connectServiceUpIsRunning();
-
-        setBrokerUrl();
-
-        if (getEnableManagementPlugin()) {
-            setAttribute(MANAGEMENT_URL, format("http://%s:%s/", getAttribute(HOSTNAME), getAttribute(MANAGEMENT_PORT)));
-        }
-    }
-
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        disconnectServiceUpIsRunning();
-    }
-
-    public boolean getEnableManagementPlugin() {
-        return Boolean.TRUE.equals(getConfig(ENABLE_MANAGEMENT_PLUGIN));
-    }
-
-    public Integer getManagementPort() {
-        return getAttribute(MANAGEMENT_PORT);
-    }
-
-    @Override
-    protected ToStringHelper toStringHelper() {
-        return super.toStringHelper().add("amqpPort", getAmqpPort());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java
deleted file mode 100644
index 14e0e47..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.rabbit;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.messaging.amqp.AmqpExchange;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
-public abstract class RabbitDestination extends AbstractEntity implements AmqpExchange {
-    public static final Logger log = LoggerFactory.getLogger(RabbitDestination.class);
-    
-    private String virtualHost;
-    private String exchange;
-    protected SshMachineLocation machine;
-    protected Map<String,String> shellEnvironment;
-
-    public RabbitDestination() {
-    }
-
-    @Override
-    public void onManagementStarting() {
-        super.onManagementStarting();
-        
-        exchange = (getConfig(EXCHANGE_NAME) != null) ? getConfig(EXCHANGE_NAME) : getDefaultExchangeName();
-        virtualHost = getConfig(RabbitBroker.VIRTUAL_HOST_NAME);
-        setAttribute(RabbitBroker.VIRTUAL_HOST_NAME, virtualHost);
-        
-        machine = (SshMachineLocation) Iterables.find(getParent().getLocations(), Predicates.instanceOf(SshMachineLocation.class));
-        shellEnvironment = getParent().getShellEnvironment();
-    }
-
-    // FIXME Should return RabbitBroker; won't work if gets a proxy rather than "real" entity
-    @Override
-    public RabbitBroker getParent() {
-        return (RabbitBroker) super.getParent();
-    }
-    
-    public void create() {
-        connectSensors();
-    }
-    
-    public void delete() {
-        disconnectSensors();
-    }
-
-    protected void connectSensors() { }
-
-    protected void disconnectSensors() { }
-
-    public String getVirtualHost() {
-        return virtualHost;
-    }
-    
-    @Override
-    public String getExchangeName() { 
-        return exchange;
-    }
-    
-    public String getDefaultExchangeName() {
-        return AmqpExchange.DIRECT;
-    }
-
-    @Override
-    protected ToStringHelper toStringHelper() {
-        return super.toStringHelper().add("virtualHost", getParent().getVirtualHost()).add("exchange", getExchangeName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java
deleted file mode 100644
index da07477..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.rabbit;
-
-import java.util.Map;
-
-import brooklyn.entity.basic.SoftwareProcessDriver;
-
-public interface RabbitDriver extends SoftwareProcessDriver {
-    
-    public void configure();
-    
-    public Map<String, String> getShellEnvironment();
-
-    public String getRunDir();
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java
deleted file mode 100644
index 31f1876..0000000
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.rabbit;
-
-import brooklyn.entity.messaging.Queue;
-import brooklyn.event.feed.ssh.SshFeed;
-import brooklyn.event.feed.ssh.SshPollConfig;
-import brooklyn.event.feed.ssh.SshPollValue;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-
-public class RabbitQueue extends RabbitDestination implements Queue {
-
-    private SshFeed sshFeed;
-
-    public RabbitQueue() {
-    }
-    
-    public String getName() {
-        return getDisplayName();
-    }
-
-    @Override
-    public void create() {
-        setAttribute(QUEUE_NAME, getName());
-        super.create();
-    }
-
-    @Override
-    protected void connectSensors() {
-        String runDir = getParent().getRunDir();
-        String cmd = String.format("%s/sbin/rabbitmqctl list_queues -p /%s  | grep '%s'", runDir, getVirtualHost(), getQueueName());
-        
-        sshFeed = SshFeed.builder()
-                .entity(this)
-                .machine(machine)
-                .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_BYTES)
-                        .env(shellEnvironment)
-                        .command(cmd)
-                        .onFailure(Functions.constant(-1))
-                        .onSuccess(new Function<SshPollValue, Integer>() {
-                                @Override public Integer apply(SshPollValue input) {
-                                    return 0; // TODO parse out queue depth from output
-                                }}))
-                .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_MESSAGES)
-                        .env(shellEnvironment)
-                        .command(cmd)
-                        .onFailure(Functions.constant(-1))
-                        .onSuccess(new Function<SshPollValue, Integer>() {
-                                @Override public Integer apply(SshPollValue input) {
-                                    return 0; // TODO parse out queue depth from output
-                                }}))
-                .build();
-    }
-    
-    @Override
-    protected void disconnectSensors() {
-        if (sshFeed != null) sshFeed.stop();
-        super.disconnectSensors();
-    }
-    
-    /**
-     * Return the AMQP name for the queue.
-     */
-    public String getQueueName() {
-        return getName();
-    }
-}


Mime
View raw message