brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [13/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages
Date Thu, 06 Aug 2015 16:32:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java
new file mode 100644
index 0000000..d9997aa
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import java.net.UnknownHostException;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.net.HostAndPort;
+import com.mongodb.BasicDBObject;
+import com.mongodb.CommandResult;
+import com.mongodb.DB;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoException;
+import com.mongodb.ServerAddress;
+
+import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.util.BrooklynNetworkUtils;
+
+/**
+ * Manages connections to standalone MongoDB servers.
+ *
+ * @see <a href="http://docs.mongodb.org/manual/reference/command/">MongoDB database command documentation</a>
+ */
+public class MongoDBClientSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBClientSupport.class);
+
+    private ServerAddress address;
+    
+    private MongoClient client() {
+        return new MongoClient(address, connectionOptions);
+    }
+
+    // Set client to automatically reconnect to servers.
+    private static final MongoClientOptions connectionOptions = MongoClientOptions.builder()
+            .autoConnectRetry(true)
+            .socketKeepAlive(true)
+            .build();
+
+    private static final BasicBSONObject EMPTY_RESPONSE = new BasicBSONObject();
+
+    public MongoDBClientSupport(ServerAddress standalone) {
+        // We could also use a MongoClient to access an entire replica set. See MongoClient(List<ServerAddress>).
+        address = standalone;
+    }
+
+    /**
+     * Creates a {@link MongoDBClientSupport} instance in standalone mode.
+     */
+    public static MongoDBClientSupport forServer(AbstractMongoDBServer standalone) throws UnknownHostException {
+        HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(standalone, standalone.getAttribute(MongoDBServer.PORT));
+        ServerAddress address = new ServerAddress(hostAndPort.getHostText(), hostAndPort.getPort());
+        return new MongoDBClientSupport(address);
+    }
+
+    private ServerAddress getServerAddress() {
+        MongoClient client = client();
+        try {
+            return client.getServerAddressList().get(0);
+        } finally {
+            client.close();
+        }
+    }
+
+    private HostAndPort getServerHostAndPort() {
+        ServerAddress address = getServerAddress();
+        return HostAndPort.fromParts(address.getHost(), address.getPort());
+    }
+
+    public Optional<CommandResult> runDBCommand(String database, String command) {
+        return runDBCommand(database, new BasicDBObject(command, Boolean.TRUE));
+    }
+
+    private Optional<CommandResult> runDBCommand(String database, DBObject command) {
+        MongoClient client = client();
+        try {
+            DB db = client.getDB(database);
+            CommandResult status;
+            try {
+                status = db.command(command);
+            } catch (MongoException e) {
+                LOG.warn("Command " + command + " on " + getServerAddress() + " failed", e);
+                return Optional.absent();
+            }
+            if (!status.ok()) {
+                LOG.debug("Unexpected result of {} on {}: {}",
+                        new Object[] { command, getServerAddress(), status.getErrorMessage() });
+            }
+            return Optional.of(status);
+        } finally {
+            client.close();
+        }
+    }
+    
+    public long getShardCount() {
+        MongoClient client = client();
+        try {
+            return client.getDB("config").getCollection("shards").getCount();
+        } finally {
+            client.close();
+        }
+    }
+
+    public BasicBSONObject getServerStatus() {
+        Optional<CommandResult> result = runDBCommand("admin", "serverStatus");
+        if (result.isPresent() && result.get().ok()) {
+            return result.get();
+        } else {
+            return EMPTY_RESPONSE;
+        }
+    }
+    
+    public boolean ping() {
+        DBObject ping = new BasicDBObject("ping", "1");
+        try {
+            runDBCommand("admin", ping);
+        } catch (MongoException e) {
+            return false;
+        }
+        return true;
+    }
+
+    public boolean initializeReplicaSet(String replicaSetName, Integer id) {
+        HostAndPort primary = getServerHostAndPort();
+        BasicBSONObject config = ReplicaSetConfig.builder(replicaSetName)
+                .member(primary, id)
+                .build();
+
+        BasicDBObject dbObject = new BasicDBObject("replSetInitiate", config);
+        LOG.debug("Initiating replica set with: " + dbObject);
+
+        Optional<CommandResult> result = runDBCommand("admin", dbObject);
+        if (result.isPresent() && result.get().ok() && LOG.isDebugEnabled()) {
+            LOG.debug("Completed initiating MongoDB replica set {} on entity {}", replicaSetName, this);
+        }
+        return result.isPresent() && result.get().ok();
+    }
+
+    /**
+     * Java equivalent of calling rs.conf() in the console.
+     */
+    private BSONObject getReplicaSetConfig() {
+        MongoClient client = client();
+        try {
+            return client.getDB("local").getCollection("system.replset").findOne();
+        } catch (MongoException e) {
+            LOG.error("Failed to get replica set config on "+client, e);
+            return null;
+        } finally {
+            client.close();
+        }
+    }
+
+    /**
+     * Runs <code>replSetGetStatus</code> on the admin database.
+     *
+     * @return The result of <code>replSetGetStatus</code>, or
+     *         an empty {@link BasicBSONObject} if the command threw an exception (e.g. if
+     *         the connection was reset) or if the resultant {@link CommandResult#ok} was false.
+     *
+     * @see <a href="http://docs.mongodb.org/manual/reference/replica-status/">Replica set status reference</a>
+     * @see <a href="http://docs.mongodb.org/manual/reference/command/replSetGetStatus/">replSetGetStatus documentation</a>
+     */
+    public BasicBSONObject getReplicaSetStatus() {
+        Optional<CommandResult> result = runDBCommand("admin", "replSetGetStatus");
+        if (result.isPresent() && result.get().ok()) {
+            return result.get();
+        } else {
+            return EMPTY_RESPONSE;
+        }
+    }
+
+    /**
+     * Reconfigures the replica set that this client is the primary member of to include a new member.
+     * <p/>
+     * Note that this can cause long downtime (typically 10-20s, even up to a minute).
+     *
+     * @param secondary New member of the set.
+     * @param id The id for the new set member. Must be unique within the set.
+     * @return True if successful
+     */
+    public boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id) {
+        // We need to:
+        // - get the existing configuration
+        // - update its version
+        // - add the new member to its list of members
+        // - run replSetReconfig with the new configuration.
+        BSONObject existingConfig = getReplicaSetConfig();
+        if (existingConfig == null) {
+            LOG.warn("Couldn't load existing config for replica set from {}. Server {} not added.",
+                    getServerAddress(), secondary);
+            return false;
+        }
+
+        BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig)
+                .primary(getServerHostAndPort())
+                .member(secondary, id)
+                .build();
+        return reconfigureReplicaSet(newConfig);
+    }
+
+    /**
+     * Reconfigures the replica set that this client is the primary member of to
+     * remove the given server.
+     * @param server The server to remove
+     * @return True if successful
+     */
+    public boolean removeMemberFromReplicaSet(MongoDBServer server) {
+        BSONObject existingConfig = getReplicaSetConfig();
+        if (existingConfig == null) {
+            LOG.warn("Couldn't load existing config for replica set from {}. Server {} not removed.",
+                    getServerAddress(), server);
+            return false;
+        }
+        BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig)
+                .primary(getServerHostAndPort())
+                .remove(server)
+                .build();
+        return reconfigureReplicaSet(newConfig);
+    }
+
+    /**
+     * Runs replSetReconfig with the given BasicBSONObject. Returns true if the result's
+     * status is ok.
+     */
+    private boolean reconfigureReplicaSet(BasicBSONObject newConfig) {
+        BasicDBObject command = new BasicDBObject("replSetReconfig", newConfig);
+        LOG.debug("Reconfiguring replica set to: " + command);
+        Optional<CommandResult> result = runDBCommand("admin", command);
+        return result.isPresent() && result.get().ok();
+    }
+
+    public boolean addShardToRouter(String hostAndPort) {
+        LOG.debug("Adding shard " + hostAndPort);
+        BasicDBObject command = new BasicDBObject("addShard", hostAndPort);
+        Optional<CommandResult> result = runDBCommand("admin", command);
+        return result.isPresent() && result.get().ok();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java
new file mode 100644
index 0000000..b7d93f0
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface MongoDBDriver extends SoftwareProcessDriver {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
new file mode 100644
index 0000000..b7d91db
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import java.util.Collection;
+import java.util.List;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.group.Cluster;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A replica set of {@link MongoDBServer}s, based on {@link DynamicCluster} which can be resized by a policy
+ * if required.
+ *
+ * <p/><b>Note</b>
+ * An issue with <code>mongod</code> on Mac OS X can cause unpredictable failure of servers at start-up.
+ * See <a href="https://groups.google.com/forum/#!topic/mongodb-user/QRQYdIXOR2U">this mailing list post</a>
+ * for more information.
+ *
+ * <p/>This replica set implementation has been tested on OS X 10.6 and Ubuntu 12.04.
+ *
+ * @see <a href="http://docs.mongodb.org/manual/replication/">http://docs.mongodb.org/manual/replication/</a>
+ */
+@ImplementedBy(MongoDBReplicaSetImpl.class)
+public interface MongoDBReplicaSet extends DynamicCluster {
+
+    @SetFromFlag("replicaSetName")
+    ConfigKey<String> REPLICA_SET_NAME = ConfigKeys.newStringConfigKey(
+            "mongodb.replicaSet.name", "Name of the MongoDB replica set", "BrooklynCluster");
+
+    ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 3);
+
+    AttributeSensor<MongoDBServer> PRIMARY_ENTITY = Sensors.newSensor(
+            MongoDBServer.class, "mongodb.replicaSet.primary.entity", "The entity acting as primary");
+
+    @SuppressWarnings("serial")
+    AttributeSensor<List<String>> REPLICA_SET_ENDPOINTS = Sensors.newSensor(new TypeToken<List<String>>() {}, 
+        "mongodb.replicaSet.endpoints", "Endpoints active for this replica set");
+
+    /**
+     * The name of the replica set.
+     */
+    String getName();
+
+    /**
+     * @return The primary MongoDB server in the replica set.
+     */
+    MongoDBServer getPrimary();
+
+    /**
+     * @return The secondary servers in the replica set.
+     */
+    Collection<MongoDBServer> getSecondaries();
+
+    /**
+     * @return All servers in the replica set.
+     */
+    Collection<MongoDBServer> getReplicas();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
new file mode 100644
index 0000000..e5ce093
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+/**
+ * Implementation of {@link MongoDBReplicaSet}.
+ *
+ * Replica sets have a <i>minimum</i> of three members.
+ *
+ * Removal strategy is always {@link #NON_PRIMARY_REMOVAL_STRATEGY}.
+ */
+public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDBReplicaSet {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBReplicaSetImpl.class);
+
+    // Provides IDs for replica set members. The first member will have ID 0.
+    private final AtomicInteger nextMemberId = new AtomicInteger(0);
+
+    private MemberTrackingPolicy policy;
+    private final AtomicBoolean mustInitialise = new AtomicBoolean(true);
+
+    @SuppressWarnings("unchecked")
+    protected static final List<AttributeSensor<Long>> SENSORS_TO_SUM = Arrays.asList(
+        MongoDBServer.OPCOUNTERS_INSERTS,
+        MongoDBServer.OPCOUNTERS_QUERIES,
+        MongoDBServer.OPCOUNTERS_UPDATES,
+        MongoDBServer.OPCOUNTERS_DELETES,
+        MongoDBServer.OPCOUNTERS_GETMORE,
+        MongoDBServer.OPCOUNTERS_COMMAND,
+        MongoDBServer.NETWORK_BYTES_IN,
+        MongoDBServer.NETWORK_BYTES_OUT,
+        MongoDBServer.NETWORK_NUM_REQUESTS);
+    
+    public MongoDBReplicaSetImpl() {
+    }
+
+    /**
+     * Manages member addition and removal.
+     *
+     * It's important that this is a single thread: the concurrent addition and removal
+     * of members from the set would almost certainly have unintended side effects,
+     * like reconfigurations using outdated ReplicaSetConfig instances.
+     */
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+    /** true iff input is a non-null MongoDBServer with attribute REPLICA_SET_MEMBER_STATUS PRIMARY. */
+    static final Predicate<Entity> IS_PRIMARY = new Predicate<Entity>() {
+        // getPrimary relies on instanceof check
+        @Override public boolean apply(@Nullable Entity input) {
+            return input != null
+                    && input instanceof MongoDBServer
+                    && ReplicaSetMemberStatus.PRIMARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+        }
+    };
+
+    /** true iff. input is a non-null MongoDBServer with attribute REPLICA_SET_MEMBER_STATUS SECONDARY. */
+    static final Predicate<Entity> IS_SECONDARY = new Predicate<Entity>() {
+        @Override public boolean apply(@Nullable Entity input) {
+            // getSecondaries relies on instanceof check
+            return input != null
+                    && input instanceof MongoDBServer
+                    && ReplicaSetMemberStatus.SECONDARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+        }
+    };
+
+    /**
+     * {@link Function} for use as the cluster's removal strategy. Chooses any entity with
+     * {@link MongoDBServer#IS_PRIMARY_FOR_REPLICA_SET} true last of all.
+     */
+    private static final Function<Collection<Entity>, Entity> NON_PRIMARY_REMOVAL_STRATEGY = new Function<Collection<Entity>, Entity>() {
+        @Override
+        public Entity apply(@Nullable Collection<Entity> entities) {
+            checkArgument(entities != null && entities.size() > 0, "Expect list of MongoDBServers to have at least one entry");
+            return Iterables.tryFind(entities, Predicates.not(IS_PRIMARY)).or(Iterables.get(entities, 0));
+        }
+    };
+
+    /** @return {@link #NON_PRIMARY_REMOVAL_STRATEGY} */
+    @Override
+    public Function<Collection<Entity>, Entity> getRemovalStrategy() {
+        return NON_PRIMARY_REMOVAL_STRATEGY;
+    }
+
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        return getConfig(MEMBER_SPEC, EntitySpec.create(MongoDBServer.class));
+    }
+
+    /**
+     * Sets {@link MongoDBServer#REPLICA_SET}.
+     */
+    @Override
+    protected Map<?,?> getCustomChildFlags() {
+        return ImmutableMap.builder()
+                .putAll(super.getCustomChildFlags())
+                .put(MongoDBServer.REPLICA_SET, getProxy())
+                .build();
+    }
+
+    @Override
+    public String getName() {
+        // FIXME: Names must be unique if the replica sets are used in a sharded cluster
+        return getConfig(REPLICA_SET_NAME) + this.getId();
+    }
+
+    @Override
+    public MongoDBServer getPrimary() {
+        return Iterables.tryFind(getReplicas(), IS_PRIMARY).orNull();
+    }
+
+    @Override
+    public Collection<MongoDBServer> getSecondaries() {
+        return FluentIterable.from(getReplicas())
+                .filter(IS_SECONDARY)
+                .toList();
+    }
+
+    @Override
+    public Collection<MongoDBServer> getReplicas() {
+        return FluentIterable.from(getMembers())
+                .transform(new Function<Entity, MongoDBServer>() {
+                    @Override public MongoDBServer apply(Entity input) {
+                        return MongoDBServer.class.cast(input);
+                    }
+                })
+                .toList();
+    }
+
+    /**
+     * Initialises the replica set with the given server as primary if {@link #mustInitialise} is true,
+     * otherwise schedules the addition of a new secondary.
+     */
+    private void serverAdded(MongoDBServer server) {
+        LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.getAttribute(MongoDBServer.SERVICE_UP));
+
+        // Set the primary if the replica set hasn't been initialised.
+        if (mustInitialise.compareAndSet(true, false)) {
+            if (LOG.isInfoEnabled())
+                LOG.info("First server up in {} is: {}", getName(), server);
+            boolean replicaSetInitialised = server.initializeReplicaSet(getName(), nextMemberId.getAndIncrement());
+            if (replicaSetInitialised) {
+                setAttribute(PRIMARY_ENTITY, server);
+                setAttribute(Startable.SERVICE_UP, true);
+            } else {
+                ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+            }
+        } else {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Scheduling addition of member to {}: {}", getName(), server);
+            addSecondaryWhenPrimaryIsNonNull(server);
+        }
+    }
+
+    /**
+     * Adds a server as a secondary in the replica set.
+     * <p/>
+     * If {@link #getPrimary} returns non-null submit the secondary to the primary's
+     * {@link MongoDBClientSupport}. Otherwise, reschedule the task to run again in three
+     * seconds time (in the hope that next time the primary will be available).
+     */
+    private void addSecondaryWhenPrimaryIsNonNull(final MongoDBServer secondary) {
+        // TODO Don't use executor, use ExecutionManager
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                // SERVICE_UP is not guaranteed when additional members are added to the set.
+                Boolean isAvailable = secondary.getAttribute(MongoDBServer.SERVICE_UP);
+                MongoDBServer primary = getPrimary();
+                boolean reschedule;
+                if (Boolean.TRUE.equals(isAvailable) && primary != null) {
+                    boolean added = primary.addMemberToReplicaSet(secondary, nextMemberId.incrementAndGet());
+                    if (added) {
+                        LOG.info("{} added to replica set {}", secondary, getName());
+                        reschedule = false;
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("{} could not be added to replica set via {}; rescheduling", secondary, getName());
+                        }
+                        reschedule = true;
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Rescheduling addition of member {} to replica set {}: service_up={}, primary={}",
+                            new Object[] {secondary, getName(), isAvailable, primary});
+                    }
+                    reschedule = true;
+                }
+                
+                if (reschedule) {
+                    // TODO Could limit number of retries
+                    executor.schedule(this, 3, TimeUnit.SECONDS);
+                }
+            }
+        });
+    }
+
+    /**
+     * Removes a server from the replica set.
+     * <p/>
+     * Submits a task that waits for the member to be down and for the replica set to have a primary
+     * member, then reconfigures the set to remove the member, to {@link #executor}. If either of the
+     * two conditions are not met then the task reschedules itself.
+     *
+     * @param member The server to be removed from the replica set.
+     */
+    private void serverRemoved(final MongoDBServer member) {
+        if (LOG.isDebugEnabled())
+            LOG.debug("Scheduling removal of member from {}: {}", getName(), member);
+        // FIXME is there a chance of race here?
+        if (member.equals(getAttribute(PRIMARY_ENTITY)))
+            setAttribute(PRIMARY_ENTITY, null);
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                // Wait until the server has been stopped before reconfiguring the set. Quoth the MongoDB doc:
+                // for best results always shut down the mongod instance before removing it from a replica set.
+                Boolean isAvailable = member.getAttribute(MongoDBServer.SERVICE_UP);
+                // Wait for the replica set to elect a new primary if the set is reconfiguring itself.
+                MongoDBServer primary = getPrimary();
+                boolean reschedule;
+                
+                if (primary != null && !isAvailable) {
+                    boolean removed = primary.removeMemberFromReplicaSet(member);
+                    if (removed) {
+                        LOG.info("Removed {} from replica set {}", member, getName());
+                        reschedule = false;
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("{} could not be removed from replica set via {}; rescheduling", member, getName());
+                        }
+                        reschedule = true;
+                    }
+
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Rescheduling removal of member {} from replica set {}: service_up={}, primary={}",
+                            new Object[]{member, getName(), isAvailable, primary});
+                    }
+                    reschedule = true;
+                }
+                
+                if (reschedule) {
+                    // TODO Could limit number of retries
+                    executor.schedule(this, 3, TimeUnit.SECONDS);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        // Promises that all the cluster's members have SERVICE_UP true on returning.
+        super.start(locations);
+        policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName(getName() + " membership tracker")
+                .configure("group", this));
+
+        for (AttributeSensor<Long> sensor: SENSORS_TO_SUM)
+            addEnricher(Enrichers.builder()
+                    .aggregating(sensor)
+                    .publishing(sensor)
+                    .fromMembers()
+                    .computingSum()
+                    .valueToReportIfNoSensors(null)
+                    .defaultValueForUnreportedSensors(null)
+                    .build());
+        
+        // FIXME would it be simpler to have a *subscription* on four or five sensors on allMembers, including SERVICE_UP
+        // (which we currently don't check), rather than an enricher, and call to an "update" method?
+        addEnricher(Enrichers.builder()
+                .aggregating(MongoDBServer.REPLICA_SET_PRIMARY_ENDPOINT)
+                .publishing(MongoDBServer.REPLICA_SET_PRIMARY_ENDPOINT)
+                .fromMembers()
+                .valueToReportIfNoSensors(null)
+                .computing(new Function<Collection<String>, String>() {
+                        @Override
+                        public String apply(Collection<String> input) {
+                            if (input==null || input.isEmpty()) return null;
+                            Set<String> distinct = MutableSet.of();
+                            for (String endpoint: input)
+                                if (!Strings.isBlank(endpoint))
+                                    distinct.add(endpoint);
+                            if (distinct.size()>1)
+                                LOG.warn("Mongo replica set "+MongoDBReplicaSetImpl.this+" detetcted multiple masters (transitioning?): "+distinct);
+                            return input.iterator().next();
+                        }})
+                .build());
+
+        addEnricher(Enrichers.builder()
+                .aggregating(MongoDBServer.MONGO_SERVER_ENDPOINT)
+                .publishing(REPLICA_SET_ENDPOINTS)
+                .fromMembers()
+                .valueToReportIfNoSensors(null)
+                .computing(new Function<Collection<String>, List<String>>() {
+                        @Override
+                        public List<String> apply(Collection<String> input) {
+                            Set<String> endpoints = new TreeSet<String>();
+                            for (String endpoint: input) {
+                                if (!Strings.isBlank(endpoint)) {
+                                    endpoints.add(endpoint);
+                                }
+                            }
+                            return MutableList.copyOf(endpoints);
+                        }})
+                .build());
+
+        subscribeToMembers(this, MongoDBServer.IS_PRIMARY_FOR_REPLICA_SET, new SensorEventListener<Boolean>() {
+            @Override public void onEvent(SensorEvent<Boolean> event) {
+                if (Boolean.TRUE == event.getValue())
+                    setAttribute(PRIMARY_ENTITY, (MongoDBServer)event.getSource());
+            }
+        });
+
+    }
+
+    @Override
+    public void stop() {
+        // Do we want to remove the members from the replica set?
+        //  - if the set is being stopped forever it's irrelevant
+        //  - if the set might be restarted I think it just inconveniences us
+        // Terminate the executor immediately.
+        // TODO Note that after this the executor will not run if the set is restarted.
+        executor.shutdownNow();
+        super.stop();
+        setAttribute(Startable.SERVICE_UP, false);
+    }
+
+    @Override
+    public void onManagementStopped() {
+        super.onManagementStopped();
+        executor.shutdownNow();
+    }
+    
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override protected void onEntityChange(Entity member) {
+            // Ignored
+        }
+        @Override protected void onEntityAdded(Entity member) {
+            ((MongoDBReplicaSetImpl)entity).serverAdded((MongoDBServer) member);
+        }
+        @Override protected void onEntityRemoved(Entity member) {
+            ((MongoDBReplicaSetImpl)entity).serverRemoved((MongoDBServer) member);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java
new file mode 100644
index 0000000..5300684
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import org.bson.BasicBSONObject;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.AttributeSensor.SensorPersistenceMode;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+@Catalog(name="MongoDB Server",
+    description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database",
+    iconUrl="classpath:///mongodb-logo.png")
+@ImplementedBy(MongoDBServerImpl.class)
+public interface MongoDBServer extends AbstractMongoDBServer {
+
+    @SetFromFlag("mongodbConfTemplateUrl")
+    ConfigKey<String> MONGODB_CONF_TEMPLATE_URL = ConfigKeys.newConfigKeyWithDefault(
+            AbstractMongoDBServer.MONGODB_CONF_TEMPLATE_URL,
+            "classpath://org/apache/brooklyn/entity/nosql/mongodb/default-mongod.conf");
+
+    // See http://docs.mongodb.org/ecosystem/tools/http-interfaces/#http-console
+    // This is *always* 1000 more than port. We disable if it is not available.
+    PortAttributeSensorAndConfigKey HTTP_PORT =
+        new PortAttributeSensorAndConfigKey("mongodb.server.httpPort", "HTTP port for the server (estimated)", "28017+");
+
+    @SetFromFlag("enableRestInterface")
+    ConfigKey<Boolean> ENABLE_REST_INTERFACE = ConfigKeys.newBooleanConfigKey(
+            "mongodb.config.enable_rest", "Adds --rest to server startup flags when true", Boolean.FALSE);
+
+    AttributeSensor<String> HTTP_INTERFACE_URL = Sensors.newStringSensor(
+            "mongodb.server.http_interface", "URL of the server's HTTP console");
+
+    AttributeSensor<BasicBSONObject> STATUS_BSON = Sensors.builder(BasicBSONObject.class, "mongodb.server.status.bson")
+            .description("Server status (BSON/JSON map ojbect)")
+            .persistence(SensorPersistenceMode.NONE)
+            .build();
+    
+    AttributeSensor<Double> UPTIME_SECONDS = Sensors.newDoubleSensor(
+            "mongodb.server.uptime", "Server uptime in seconds");
+
+    AttributeSensor<Long> OPCOUNTERS_INSERTS = Sensors.newLongSensor(
+            "mongodb.server.opcounters.insert", "Server inserts");
+
+    AttributeSensor<Long> OPCOUNTERS_QUERIES = Sensors.newLongSensor(
+            "mongodb.server.opcounters.query", "Server queries");
+
+    AttributeSensor<Long> OPCOUNTERS_UPDATES = Sensors.newLongSensor(
+            "mongodb.server.opcounters.update", "Server updates");
+
+    AttributeSensor<Long> OPCOUNTERS_DELETES = Sensors.newLongSensor(
+            "mongodb.server.opcounters.delete", "Server deletes");
+
+    AttributeSensor<Long> OPCOUNTERS_GETMORE = Sensors.newLongSensor(
+            "mongodb.server.opcounters.getmore", "Server getmores");
+
+    AttributeSensor<Long> OPCOUNTERS_COMMAND = Sensors.newLongSensor(
+            "mongodb.server.opcounters.command", "Server commands");
+
+    AttributeSensor<Long> NETWORK_BYTES_IN = Sensors.newLongSensor(
+            "mongodb.server.network.bytesIn", "Server incoming network traffic (in bytes)");
+
+    AttributeSensor<Long> NETWORK_BYTES_OUT = Sensors.newLongSensor(
+            "mongodb.server.network.bytesOut", "Server outgoing network traffic (in bytes)");
+
+    AttributeSensor<Long> NETWORK_NUM_REQUESTS = Sensors.newLongSensor(
+            "mongodb.server.network.numRequests", "Server network requests");
+
+    /** A single server's replica set configuration **/
+    ConfigKey<MongoDBReplicaSet> REPLICA_SET = new BasicConfigKey<MongoDBReplicaSet>(MongoDBReplicaSet.class,
+            "mongodb.replicaset", "The replica set to which the server belongs. " +
+            "Users should not set this directly when creating a new replica set.");
+
+    AttributeSensor<ReplicaSetMemberStatus> REPLICA_SET_MEMBER_STATUS = Sensors.newSensor(
+            ReplicaSetMemberStatus.class, "mongodb.server.replicaSet.memberStatus", "The status of this server in the replica set");
+
+    AttributeSensor<Boolean> IS_PRIMARY_FOR_REPLICA_SET = Sensors.newBooleanSensor(
+            "mongodb.server.replicaSet.isPrimary", "True if this server is the write master for the replica set");
+
+    AttributeSensor<Boolean> IS_SECONDARY_FOR_REPLICA_SET = Sensors.newBooleanSensor(
+            "mongodb.server.replicaSet.isSecondary", "True if this server is a secondary server in the replica set");
+
+    AttributeSensor<String> REPLICA_SET_PRIMARY_ENDPOINT = Sensors.newStringSensor(
+            "mongodb.server.replicaSet.primary.endpoint", "The host:port of the server which is acting as primary (master) for the replica set");
+
+    AttributeSensor<String> MONGO_SERVER_ENDPOINT = Sensors.newStringSensor(
+            "mongodb.server.endpoint", "The host:port where this server is listening");
+
+    /**
+     * @return The replica set the server belongs to, or null if the server is a standalone instance.
+     */
+    MongoDBReplicaSet getReplicaSet();
+
+    /**
+     * @return True if the server is a child of {@link MongoDBReplicaSet}.
+     */
+    boolean isReplicaSetMember();
+
+    /**
+     * Initialises a replica set at the server the method is invoked on.
+     * @param replicaSetName The name for the replica set.
+     * @param id The id to be given to this server in the replica set configuration.
+     * @return True if initialisation is successful.
+     */
+    boolean initializeReplicaSet(String replicaSetName, Integer id);
+
+    /**
+     * Reconfigures the replica set that the server the method is invoked on is the primary member of
+     * to include a new member.
+     * <p/>
+     * Note that this can cause long downtime (typically 10-20s, even up to a minute).
+     *
+     * @param secondary New member of the set.
+     * @param id The id for the new set member. Must be unique within the set; its validity is not checked.
+     * @return True if addition is successful. False if the server this is called on is not the primary
+     *         member of the replica set.
+     */
+    boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id);
+
+    /**
+     * Reconfigures the replica set that the server the method is invoked on is the primary member of
+     * to remove the given server.
+     * @param server The server to remove.
+     * @return True if removal is successful. False if the server this is called on is not the primary
+     *         member of the replica set.
+     */
+    boolean removeMemberFromReplicaSet(MongoDBServer server);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
new file mode 100644
index 0000000..346b1ee
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.bson.BasicBSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.render.RendererHints;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.location.access.BrooklynAccessUtils;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects;
+import com.google.common.net.HostAndPort;
+
+public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBServerImpl.class);
+
+    static {
+        RendererHints.register(HTTP_INTERFACE_URL, RendererHints.namedActionWithUrl());
+    }
+
+    private FunctionFeed serviceStats;
+    private FunctionFeed replicaSetStats;
+    private MongoDBClientSupport client;
+
+    public MongoDBServerImpl() {
+    }
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return MongoDBDriver.class;
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        connectServiceUpIsRunning();
+
+        int port = getAttribute(MongoDBServer.PORT);
+        HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port);
+        setAttribute(MONGO_SERVER_ENDPOINT, String.format("http://%s:%d",
+                accessibleAddress.getHostText(), accessibleAddress.getPort()));
+
+        int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(HTTP_PORT)).getPort();
+        setAttribute(HTTP_INTERFACE_URL, String.format("http://%s:%d",
+                accessibleAddress.getHostText(), httpConsolePort));
+
+        try {
+            client = MongoDBClientSupport.forServer(this);
+        } catch (UnknownHostException e) {
+            LOG.warn("Unable to create client connection to {}, not connecting sensors: {} ", this, e.getMessage());
+            return;
+        }
+
+        serviceStats = FunctionFeed.builder()
+                .entity(this)
+                .poll(new FunctionPollConfig<Object, BasicBSONObject>(STATUS_BSON)
+                        .period(2, TimeUnit.SECONDS)
+                        .callable(new Callable<BasicBSONObject>() {
+                            @Override
+                            public BasicBSONObject call() throws Exception {
+                                return MongoDBServerImpl.this.getAttribute(SERVICE_UP)
+                                    ? client.getServerStatus()
+                                    : null;
+                            }
+                        })
+                        .onException(Functions.<BasicBSONObject>constant(null)))
+                .build();
+
+        if (isReplicaSetMember()) {
+            replicaSetStats = FunctionFeed.builder()
+                    .entity(this)
+                    .poll(new FunctionPollConfig<Object, ReplicaSetMemberStatus>(REPLICA_SET_MEMBER_STATUS)
+                            .period(2, TimeUnit.SECONDS)
+                            .callable(new Callable<ReplicaSetMemberStatus>() {
+                                /**
+                                 * Calls {@link MongoDBClientSupport#getReplicaSetStatus} and
+                                 * extracts <code>myState</code> from the response.
+                                 * @return
+                                 *      The appropriate {@link org.apache.brooklyn.entity.nosql.mongodb.ReplicaSetMemberStatus}
+                                 *      if <code>myState</code> was non-null, {@link ReplicaSetMemberStatus#UNKNOWN} otherwise.
+                                 */
+                                @Override
+                                public ReplicaSetMemberStatus call() {
+                                    BasicBSONObject serverStatus = client.getReplicaSetStatus();
+                                    int state = serverStatus.getInt("myState", -1);
+                                    return ReplicaSetMemberStatus.fromCode(state);
+                                }
+                            })
+                            .onException(Functions.constant(ReplicaSetMemberStatus.UNKNOWN)))
+                    .build();
+        } else {
+            setAttribute(IS_PRIMARY_FOR_REPLICA_SET, false);
+            setAttribute(IS_SECONDARY_FOR_REPLICA_SET, false);
+        }
+
+        // Take interesting details from STATUS.
+        subscribe(this, STATUS_BSON, new SensorEventListener<BasicBSONObject>() {
+                @Override public void onEvent(SensorEvent<BasicBSONObject> event) {
+                    BasicBSONObject map = event.getValue();
+                    if (map != null && !map.isEmpty()) {
+                        setAttribute(UPTIME_SECONDS, map.getDouble("uptime", 0));
+
+                        // Operations
+                        BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters");
+                        setAttribute(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
+                        setAttribute(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
+                        setAttribute(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
+                        setAttribute(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
+                        setAttribute(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
+                        setAttribute(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
+
+                        // Network stats
+                        BasicBSONObject network = (BasicBSONObject) map.get("network");
+                        setAttribute(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
+                        setAttribute(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
+                        setAttribute(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
+
+                        // Replica set stats
+                        BasicBSONObject repl = (BasicBSONObject) map.get("repl");
+                        if (isReplicaSetMember() && repl != null) {
+                            setAttribute(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
+                            setAttribute(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
+                            setAttribute(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
+                        }
+                    }
+                }
+        });
+    }
+
+    @Override
+    protected void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (serviceStats != null) serviceStats.stop();
+        if (replicaSetStats != null) replicaSetStats.stop();
+    }
+
+    @Override
+    public MongoDBReplicaSet getReplicaSet() {
+        return getConfig(MongoDBServer.REPLICA_SET);
+    }
+
+    @Override
+    public boolean isReplicaSetMember() {
+        return getReplicaSet() != null;
+    }
+
+    @Override
+    public boolean initializeReplicaSet(String replicaSetName, Integer id) {
+        return client.initializeReplicaSet(replicaSetName, id);
+    }
+
+    @Override
+    public boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id) {
+        // TODO The attributes IS_PRIMARY_FOR_REPLICA_SET and REPLICA_SET_MEMBER_STATUS can be out-of-sync.
+        // The former is obtained by an enricher that listens to STATUS_BSON (set by client.getServerStatus()).
+        // The latter is set by a different feed doing client.getReplicaSetStatus().getInt("myState").
+        // The ReplicaSet uses REPLICA_SET_MEMBER_STATUS to determine which node to call.
+        // 
+        // Relying on caller to respect the `false` result, to retry.
+        if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+            LOG.warn("Attempted to add {} to replica set at server that is not primary: {}", secondary, this);
+            return false;
+        }
+        return client.addMemberToReplicaSet(secondary, id);
+    }
+
+    @Override
+    public boolean removeMemberFromReplicaSet(MongoDBServer server) {
+        if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+            LOG.warn("Attempted to remove {} from replica set at server that is not primary: {}", server, this);
+            return false;
+        }
+        return client.removeMemberFromReplicaSet(server);
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("id", getId())
+                .add("hostname", getAttribute(HOSTNAME))
+                .add("port", getAttribute(PORT))
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java
new file mode 100644
index 0000000..819014d
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import static com.google.common.base.Preconditions.checkState;
+import brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+public class MongoDBSshDriver extends AbstractMongoDBSshDriver implements MongoDBDriver {
+
+    public MongoDBSshDriver(MongoDBServerImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    public MongoDBServerImpl getEntity() {
+        return MongoDBServerImpl.class.cast(super.getEntity());
+    }
+
+    @Override
+    public void launch() {
+        MongoDBServer server = getEntity();
+
+        ImmutableList.Builder<String> argsBuilder = getArgsBuilderWithDefaults(server)
+            .add("--dbpath", getDataDirectory());
+
+        if (server.isReplicaSetMember()) {
+            String replicaSetName = server.getReplicaSet().getName();
+            checkState(!Strings.isNullOrEmpty(replicaSetName), "Replica set name must not be null or empty");
+            argsBuilder.add("--replSet", replicaSetName);
+        }
+
+        if (Boolean.TRUE.equals(server.getConfig(MongoDBServer.ENABLE_REST_INTERFACE)))
+            argsBuilder.add("--rest");
+
+        launch(argsBuilder);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java
new file mode 100644
index 0000000..a4ecebb
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONObject;
+import org.bson.types.BasicBSONList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.net.HostAndPort;
+
+import brooklyn.location.access.BrooklynAccessUtils;
+
+/**
+ * Simplifies the creation of configuration objects for Mongo DB replica sets.
+ * <p/>
+ * A configuration object is structured like this:
+ * <pre>
+ * {
+ *    "_id" : "replica-set-name",
+ *     "version" : 3,
+ *    "members" : [
+ *        { "_id" : 0, "host" : "Sams.local:27017" },
+ *        { "_id" : 1, "host" : "Sams.local:27018" },
+ *        { "_id" : 2, "host" : "Sams.local:27019" }
+ *    ]
+ * }
+ * </pre>
+ * To add or remove servers to a replica set you must redefine this configuration
+ * (run <code>replSetReconfig</code> on the primary) with the new <code>members</code>
+ * list and the <code>version</code> updated.
+ */
+public class ReplicaSetConfig {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetConfig.class);
+    static final int MAXIMUM_REPLICA_SET_SIZE = 12;
+    static final int MAXIMUM_VOTING_MEMBERS = 7;
+
+    private Optional<HostAndPort> primary = Optional.absent();
+
+    private String name;
+    private Integer version;
+    BasicBSONList members;
+
+    public ReplicaSetConfig(String name) {
+        this(name, new BasicBSONList());
+    }
+
+    public ReplicaSetConfig(String name, BasicBSONList existingMembers) {
+        this.name = name;
+        this.members = existingMembers;
+        this.version = 1;
+    }
+
+    /**
+     * Creates a configuration with the given name.
+     */
+    public static ReplicaSetConfig builder(String name) {
+        return new ReplicaSetConfig(name);
+    }
+
+    /**
+     * Creates a configuration from an existing configuration.
+     * <p/>
+     * Automatically increments the replica set's version number.
+     */
+    public static ReplicaSetConfig fromExistingConfig(BSONObject config) {
+        checkNotNull(config);
+        checkArgument(config.containsField("_id"), "_id missing from replica set config");
+        checkArgument(config.containsField("version"), "version missing from replica set config");
+        checkArgument(config.containsField("members"), "members missing from replica set config");
+
+        String name = (String) config.get("_id");
+        Integer version = (Integer) config.get("version");
+        BasicBSONList members = (BasicBSONList) config.get("members");
+
+        return new ReplicaSetConfig(name, members).version(++version);
+    }
+
+    /**
+     * Sets the version of the configuration. The version number must increase as the replica set changes.
+     */
+    public ReplicaSetConfig version(Integer version) {
+        this.version = version;
+        return this;
+    }
+
+    /**
+     * Notes the primary member of the replica. Primary members will always be voting members.
+     */
+    public ReplicaSetConfig primary(HostAndPort primary) {
+        this.primary = Optional.of(primary);
+        return this;
+    }
+
+    /**
+     * Adds a new member to the replica set config using {@link MongoDBServer#HOSTNAME} and {@link MongoDBServer#PORT}
+     * for hostname and port. Doesn't attempt to check that the id is free.
+     */
+    public ReplicaSetConfig member(MongoDBServer server, Integer id) {
+        // TODO: Switch to SUBNET_HOSTNAME and there should be no need for a Brooklyn accessible
+        // address. It will require modification to MongoDBClientSupport, though, since it sets
+        // the primary to the host/port accessible from Brooklyn.
+        HostAndPort hap = BrooklynAccessUtils.getBrooklynAccessibleAddress(server, server.getAttribute(MongoDBServer.PORT));
+        return member(hap.getHostText(), hap.getPort(), id);
+    }
+
+    /**
+     * Adds a new member to the replica set config using the given {@link HostAndPort} for hostname and port.
+     * Doesn't attempt to check that the id is free.
+     */
+    public ReplicaSetConfig member(HostAndPort address, Integer id) {
+        return member(address.getHostText(), address.getPort(), id);
+    }
+
+    /**
+     * Adds a new member to the replica set config with the given hostname, port and id. Doesn't attempt to check
+     * that the id is free.
+     */
+    public ReplicaSetConfig member(String hostname, Integer port, Integer id) {
+        if (members.size() == MAXIMUM_REPLICA_SET_SIZE) {
+            throw new IllegalStateException(String.format(
+                    "Replica set {} exceeds maximum size of {} with addition of member at {}:{}",
+                    new Object[]{name, MAXIMUM_REPLICA_SET_SIZE, hostname, port}));
+        }
+        BasicBSONObject member = new BasicBSONObject();
+        member.put("_id", id);
+        member.put("host", String.format("%s:%s", hostname, port));
+        members.add(member);
+        return this;
+    }
+
+    /** Removes the first entity using {@link MongoDBServer#HOSTNAME} and {@link MongoDBServer#PORT}. */
+    public ReplicaSetConfig remove(MongoDBServer server) {
+        HostAndPort hap = BrooklynAccessUtils.getBrooklynAccessibleAddress(server, server.getAttribute(MongoDBServer.PORT));
+        return remove(hap.getHostText(), hap.getPort());
+    }
+
+    /** Removes the first entity with host and port matching the given address. */
+    public ReplicaSetConfig remove(HostAndPort address) {
+        return remove(address.getHostText(), address.getPort());
+    }
+
+    /**
+     * Removes the first entity with the given hostname and port from the list of members
+     */
+    public ReplicaSetConfig remove(String hostname, Integer port) {
+        String host = String.format("%s:%s", hostname, port);
+        Iterator<Object> it = this.members.iterator();
+        while (it.hasNext()) {
+            Object next = it.next();
+            if (next instanceof BasicBSONObject) {
+                BasicBSONObject basicBSONObject = (BasicBSONObject) next;
+                if (host.equals(basicBSONObject.getString("host"))) {
+                    it.remove();
+                    break;
+                }
+            }
+        }
+        return this;
+    }
+
+    /**
+     * @return A {@link BasicBSONObject} representing the configuration that is suitable for a MongoDB server.
+     */
+    public BasicBSONObject build() {
+        setVotingMembers();
+        BasicBSONObject config = new BasicBSONObject();
+        config.put("_id", name);
+        config.put("version", version);
+        config.put("members", members);
+        return config;
+    }
+
+    /**
+     * Selects 1, 3, 5 or 7 members to have a vote. The primary member (as set by
+     * {@link #primary(com.google.common.net.HostAndPort)}) is guaranteed a vote if
+     * it is in {@link #members}.
+     * <p/>
+     *
+     * Reconfiguring a server to be voters when they previously did not have votes generally triggers
+     * a primary election. This confuses the MongoDB Java driver, which logs an error like:
+     * <pre>
+     * WARN  emptying DBPortPool to sams.home/192.168.1.64:27019 b/c of error
+     * java.io.EOFException: null
+     *    at org.bson.io.Bits.readFully(Bits.java:48) ~[mongo-java-driver-2.11.3.jar:na]
+     * WARN  Command { "replSetReconfig" : ... } on sams.home/192.168.1.64:27019 failed
+     * com.mongodb.MongoException$Network: Read operation to server sams.home/192.168.1.64:27019 failed on database admin
+     *    at com.mongodb.DBTCPConnector.innerCall(DBTCPConnector.java:253) ~[mongo-java-driver-2.11.3.jar:na]
+     * Caused by: java.io.EOFException: null
+     *    at org.bson.io.Bits.readFully(Bits.java:48) ~[mongo-java-driver-2.11.3.jar:na]
+     * </pre>
+     *
+     * The MongoDB documentation on <a href=http://docs.mongodb.org/manual/tutorial/configure-a-non-voting-replica-set-member/">
+     * non-voting members</a> says:
+     * <blockquote>
+     *     Initializes a new replica set configuration. Disconnects the shell briefly and forces a
+     *     reconnection as the replica set renegotiates which member will be primary. As a result,
+     *     the shell will display an error even if this command succeeds.
+     * </blockquote>
+     *
+     * So the problem is more that the MongoDB Java driver does not understand why the server
+     * may have disconnected and is to eager to report a problem.
+     */
+    private void setVotingMembers() {
+        if (LOG.isDebugEnabled())
+            LOG.debug("Setting voting and non-voting members of replica set: {}", name);
+        boolean seenPrimary = false;
+        String expectedPrimary = primary.isPresent()
+                ? primary.get().getHostText() + ":" + primary.get().getPort()
+                : "";
+
+        // Ensure an odd number of voters
+        int setSize = this.members.size();
+        int nonPrimaryVotingMembers = Math.min(setSize % 2 == 0 ? setSize - 1 : setSize, MAXIMUM_VOTING_MEMBERS);
+        if (primary.isPresent()) {
+            if (LOG.isTraceEnabled())
+                LOG.trace("Reserving vote for primary: " + expectedPrimary);
+            nonPrimaryVotingMembers -= 1;
+        }
+
+        for (Object member : this.members) {
+            if (member instanceof BasicBSONObject) {
+                BasicBSONObject bsonObject = BasicBSONObject.class.cast(member);
+                String host = bsonObject.getString("host");
+
+                // is this member noted as the primary?
+                if (this.primary.isPresent() && expectedPrimary.equals(host)) {
+                    bsonObject.put("votes", 1);
+                    seenPrimary = true;
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Voting member (primary) of set {}: {}", name, host);
+                } else if (nonPrimaryVotingMembers-- > 0) {
+                    bsonObject.put("votes", 1);
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Voting member of set {}: {}", name, host);
+                } else {
+                    bsonObject.put("votes", 0);
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Non-voting member of set {}: {}", name, host);
+                }
+            } else {
+                LOG.error("Unexpected entry in replica set members list: " + member);
+            }
+        }
+
+        if (primary.isPresent() && !seenPrimary) {
+            LOG.warn("Cannot give replica set primary a vote in reconfigured set: " +
+                    "primary was indicated as {} but no member with that host and port was seen in the set. " +
+                    "The replica set now has an even number of voters.",
+                    this.primary);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java
new file mode 100644
index 0000000..16df3a7
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb;
+
+/**
+ * @see <a href="http://docs.mongodb.org/manual/reference/replica-status/">Replica set status reference</a>
+ */
+public enum ReplicaSetMemberStatus {
+
+    STARTUP("Start up, phase 1 (parsing configuration)"),
+    PRIMARY("Primary"),
+    SECONDARY("Secondary"),
+    RECOVERING("Member is recovering (initial sync, post-rollback, stale members)"),
+    FATAL("Member has encountered an unrecoverable error"),
+    STARTUP2("Start up, phase 2 (forking threads)"),
+    UNKNOWN("Unknown (the set has never connected to the member)"),
+    ARBITER("Member is an arbiter"),
+    DOWN("Member is not accessible to the set"),
+    ROLLBACK("Member is rolling back data. See rollback"),
+    SHUNNED("Member has been removed from replica set");
+
+    private final String description;
+
+    ReplicaSetMemberStatus(String description) {
+        this.description = description;
+    }
+
+    public static ReplicaSetMemberStatus fromCode(int code) {
+        switch (code) {
+            case 0: return STARTUP;
+            case 1: return PRIMARY;
+            case 2: return SECONDARY;
+            case 3: return RECOVERING;
+            case 4: return FATAL;
+            case 5: return STARTUP2;
+            case 6: return UNKNOWN;
+            case 7: return ARBITER;
+            case 8: return DOWN;
+            case 9: return ROLLBACK;
+            case 10: return SHUNNED;
+            default: return UNKNOWN;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name() + ": " + description;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java
new file mode 100644
index 0000000..48c9c63
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.List;
+import java.util.Map;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SameServerEntity;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.reflect.TypeToken;
+
+@ImplementedBy(CoLocatedMongoDBRouterImpl.class)
+public interface CoLocatedMongoDBRouter extends SameServerEntity {
+    @SuppressWarnings("serial")
+    @SetFromFlag("siblingSpecs")
+    ConfigKey<Iterable<EntitySpec<?>>> SIBLING_SPECS = ConfigKeys.newConfigKey(new TypeToken<Iterable<EntitySpec<?>>>(){}, 
+            "mongodb.colocatedrouter.sibling.specs", "Collection of (configured) specs for entities to be co-located with the router");
+    
+    @SetFromFlag("shardedDeployment")
+    ConfigKey<MongoDBShardedDeployment> SHARDED_DEPLOYMENT = ConfigKeys.newConfigKey(MongoDBShardedDeployment.class, 
+            "mongodb.colocatedrouter.shardeddeployment", "Sharded deployment to which the router should report");
+
+    /** Deprecated since 0.7.0 use {@link #PROPAGATING_SENSORS} instead. */
+    @Deprecated
+    @SuppressWarnings("serial")
+    @SetFromFlag("propogatingSensors")
+    ConfigKey<List<Map<String, ?>>> PROPOGATING_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, ?>>>(){}, 
+            "mongodb.colocatedrouter.propogating.sensors", "List of sensors to be propogated from child members");
+
+    @SetFromFlag("propagatingSensors")
+    ConfigKey<List<Map<String, ?>>> PROPAGATING_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, ?>>>(){},
+            "mongodb.colocatedrouter.propagating.sensors", "List of sensors to be propogated from child members");
+
+    public static AttributeSensor<MongoDBRouter> ROUTER = Sensors.newSensor(MongoDBRouter.class,
+            "mongodb.colocatedrouter.router", "Router");
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java
new file mode 100644
index 0000000..35252ae
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.Collection;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SameServerEntityImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.location.Location;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+public class CoLocatedMongoDBRouterImpl extends SameServerEntityImpl implements CoLocatedMongoDBRouter {
+    @Override
+    public void init() {
+        super.init();
+        
+        for (EntitySpec<?> siblingSpec : getConfig(CoLocatedMongoDBRouter.SIBLING_SPECS)) {
+            addChild(siblingSpec);
+        }
+    }
+
+    @Override
+    protected void doStart(Collection<? extends Location> locations) {
+        // TODO Changed to create the router child after init as a workaround.
+        // When we use `mongo-sharded.yaml`, and we call 
+        // `getConfig(CoLocatedMongoDBRouter.SHARDED_DEPLOYMENT)`,
+        // the value is `$brooklyn:component("shardeddeployment")`.
+        // To look up the component, it tries to do `entity().getApplication()` to
+        // search the entities for one with the correct id. However if being done
+        // during `init()`, then this (which is returned by `entity()`) has not had its parent
+        // set, so `entity().getApplication()` returns null.
+        //
+        // We should move this code back to `init()` once we have a solution for that.
+        // We can also remove the call to Entities.manage() once this is in init() again.
+        
+        MongoDBRouter router = addChild(EntitySpec.create(MongoDBRouter.class)
+                .configure(MongoDBRouter.CONFIG_SERVERS,
+                        DependentConfiguration.attributeWhenReady(
+                                getConfig(CoLocatedMongoDBRouter.SHARDED_DEPLOYMENT), 
+                                MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES)));
+        Entities.manage(router);
+        setAttribute(ROUTER, (MongoDBRouter) Iterables.tryFind(getChildren(), Predicates.instanceOf(MongoDBRouter.class)).get());
+        addEnricher(Enrichers.builder().propagating(MongoDBRouter.PORT).from(router).build());
+        
+        super.doStart(locations);
+        setAttribute(Startable.SERVICE_UP, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java
new file mode 100644
index 0000000..acecbc4
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBServer;
+
+import brooklyn.entity.proxying.ImplementedBy;
+
+@ImplementedBy(MongoDBConfigServerImpl.class)
+public interface MongoDBConfigServer extends AbstractMongoDBServer {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java
new file mode 100644
index 0000000..79f78ac
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+
+import com.google.common.reflect.TypeToken;
+
+@ImplementedBy(MongoDBConfigServerClusterImpl.class)
+public interface MongoDBConfigServerCluster extends DynamicCluster {
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Iterable<String>> CONFIG_SERVER_ADDRESSES = Sensors.newSensor(new TypeToken<Iterable<String>>() {}, 
+            "mongodb.config.server.addresses", "List of config server hostnames and ports");
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java
new file mode 100644
index 0000000..34651bb
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.Collection;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.Location;
+import brooklyn.location.access.BrooklynAccessUtils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+
+public class MongoDBConfigServerClusterImpl extends DynamicClusterImpl implements MongoDBConfigServerCluster {
+    
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        if (super.getMemberSpec() != null)
+            return super.getMemberSpec();
+        return EntitySpec.create(MongoDBConfigServer.class);
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locs) {
+        super.start(locs);
+        
+        // TODO this should be an enricher
+        Iterable<String> memberHostNamesAndPorts = Iterables.transform(getMembers(), new Function<Entity, String>() {
+            @Override
+            public String apply(Entity entity) {
+                return entity.getAttribute(MongoDBConfigServer.SUBNET_HOSTNAME) + ":" + entity.getAttribute(MongoDBConfigServer.PORT);
+            }
+        });
+        setAttribute(MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES, ImmutableList.copyOf(memberHostNamesAndPorts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java
new file mode 100644
index 0000000..7963b22
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface MongoDBConfigServerDriver extends SoftwareProcessDriver {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java
new file mode 100644
index 0000000..b8ce2b8
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import brooklyn.entity.basic.SoftwareProcessImpl;
+
+public class MongoDBConfigServerImpl extends SoftwareProcessImpl implements MongoDBConfigServer {
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return MongoDBConfigServerDriver.class;
+    }
+    
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        connectServiceUpIsRunning();
+    }
+
+}



Mime
View raw message