brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [4/6] brooklyn-library git commit: ZooKeeperNode ID is config rather than a sensor
Date Wed, 16 Nov 2016 17:20:28 GMT
ZooKeeperNode ID is config rather than a sensor

Defaults to 1. ZooKeeperEnsemble increments it on each member. This fixes
communication between nodes.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/ae000a50
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/ae000a50
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/ae000a50

Branch: refs/heads/master
Commit: ae000a503823c89eb9e49908e160b64047c90026
Parents: 9ce90b8
Author: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Authored: Tue Nov 15 10:40:38 2016 +0000
Committer: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Committed: Wed Nov 16 14:52:19 2016 +0000

----------------------------------------------------------------------
 .../entity/zookeeper/AbstractZooKeeperImpl.java |   4 +-
 .../entity/zookeeper/ZooKeeperEnsemble.java     |  18 ++-
 .../entity/zookeeper/ZooKeeperEnsembleImpl.java |  73 ++++++------
 .../entity/zookeeper/ZooKeeperNode.java         |  10 +-
 .../entity/zookeeper/ZooKeeperNodeImpl.java     |  17 ++-
 .../entity/zookeeper/ZooKeeperSshDriver.java    |  13 +-
 .../zookeeper/ZooKeeperEnsembleLiveTest.java    | 118 ++++++-------------
 .../zookeeper/ZooKeeperTestSupport.java         |   9 +-
 8 files changed, 127 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
index 60175c9..b3ced27 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
@@ -94,9 +94,9 @@ public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl
implemen
 
     @Override
     public void disconnectSensors() {
-        super.disconnectSensors();
-        disconnectServiceUpIsRunning();
         if (jmxFeed != null) jmxFeed.stop();
+        disconnectServiceUpIsRunning();
+        super.disconnectSensors();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
index a5ba570..2ed6206 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
@@ -29,7 +29,10 @@ import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.entity.group.DynamicCluster;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.apache.brooklyn.util.guava.Suppliers;
 
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
 import com.google.common.reflect.TypeToken;
 
 @Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. "
@@ -38,15 +41,24 @@ import com.google.common.reflect.TypeToken;
 public interface ZooKeeperEnsemble extends DynamicCluster {
 
     @SetFromFlag("clusterName")
-    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String
-            .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class,
+            "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
 
     @SetFromFlag("initialSize")
-    public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE,
3);
+    ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE,
3);
+
+    ConfigKey<Supplier<Integer>> NODE_ID_SUPPLIER = ConfigKeys.builder(new TypeToken<Supplier<Integer>>()
{})
+            .name("zookeeper.nodeId.supplier")
+            .description("Supplies values for members id in zoo.cfg")
+            .defaultValue(Suppliers.incrementing())
+            .constraint(Predicates.notNull())
+            .build();
 
     @SuppressWarnings("serial")
     AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>()
{ },
             "zookeeper.servers", "Hostnames to connect to cluster with");
 
+    /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperEnsemble.CLUSTER_NAME)</code>
instead */
+    @Deprecated
     String getClusterName();
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
index c2c3e3f..06ea472 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
@@ -20,36 +20,33 @@ package org.apache.brooklyn.entity.zookeeper;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
 import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.util.guava.Suppliers;
+
+import com.google.common.base.Supplier;
 
 import com.google.common.collect.Lists;
 
 public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble
{
 
-    private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class);
-    private static final AtomicInteger myId = new AtomicInteger();
-    
-    private MemberTrackingPolicy policy;
-
     public ZooKeeperEnsembleImpl() {}
 
     /**
      * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes.
+     * Overwrites any value configured for {@link ZooKeeperNode#MY_ID} to use
+     * the value given by {@link ZooKeeperEnsemble#NODE_ID_SUPPLIER}.
      */
     @Override
     protected EntitySpec<?> getMemberSpec() {
-        return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+        EntitySpec<?> spec = getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+        spec.configure(ZooKeeperNode.MY_ID, config().get(ZooKeeperEnsemble.NODE_ID_SUPPLIER).get());
+        return spec;
     }
 
     @Override
@@ -58,33 +55,6 @@ public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements
ZooKeep
     }
 
     @Override
-    public void init() {
-        log.info("Initializing the ZooKeeper Ensemble");
-        super.init();
-
-        policy = policies().add(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName("Members tracker")
-                .configure("group", this));
-    }
-
-    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
-        @Override
-        protected void onEntityChange(Entity member) {
-        }
-
-        @Override
-        protected void onEntityAdded(Entity member) {
-            if (member.getAttribute(ZooKeeperNode.MY_ID) == null) {
-                ((EntityInternal) member).sensors().set(ZooKeeperNode.MY_ID, myId.incrementAndGet());
-            }
-        }
-
-        @Override
-        protected void onEntityRemoved(Entity member) {
-        }
-    };
-
-    @Override
     protected void initEnrichers() {
         super.initEnrichers();
         
@@ -101,4 +71,31 @@ public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements
ZooKeep
         sensors().set(ZOOKEEPER_SERVERS, zookeeperServers);
     }
 
+    /**
+     * @deprecated since 0.10.0 class is unused but kept for persistence backwards compatibility
+     */
+    @Deprecated
+    private static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        private final Object[] mutex = new Object[0];
+
+        @Override
+        protected void onEntityAdded(Entity member) {
+            if (member.config().get(ZooKeeperNode.MY_ID) == null) {
+                Supplier<Integer> id;
+                synchronized (mutex) {
+                    // Entities may not have been created with NODE_ID_SUPPLIER, so create
it if
+                    // it's not there. We can't provide any good guarantees about what number
to
+                    // start with, but then again the previous version of the entity gave
no
+                    // guarantee either.
+                    id = entity.config().get(ZooKeeperEnsemble.NODE_ID_SUPPLIER);
+                    if (id == null) {
+                        id = Suppliers.incrementing();
+                        entity.config().set(ZooKeeperEnsemble.NODE_ID_SUPPLIER, id);
+                    }
+                }
+                member.config().set(ZooKeeperNode.MY_ID, id.get());
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
index e0644df..13a9c7e 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
@@ -24,6 +24,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess;
@@ -64,12 +65,19 @@ public interface ZooKeeperNode extends SoftwareProcess {
             "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker
format)",
             "classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg");
 
+    @SetFromFlag("zookeeperId")
+    BasicAttributeSensorAndConfigKey<Integer> MY_ID = new BasicAttributeSensorAndConfigKey<>(Integer.class,
+            "zookeeper.myid", "ZooKeeper node's myId", 1);
+
     AttributeSensor<Long> OUTSTANDING_REQUESTS = Sensors.newLongSensor("zookeeper.outstandingRequests",
"Outstanding request count");
     AttributeSensor<Long> PACKETS_RECEIVED = Sensors.newLongSensor("zookeeper.packets.received",
"Total packets received");
     AttributeSensor<Long> PACKETS_SENT = Sensors.newLongSensor("zookeeper.packets.sent",
"Total packets sent");
-    AttributeSensor<Integer> MY_ID = Sensors.newIntegerSensor("zookeeper.myid", "ZooKeeper
node's myId");
 
+    /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperNode.ZOOKEEPER_PORT)</code>
instead */
+    @Deprecated
     Integer getZookeeperPort();
 
+    /** @deprecated since 0.10.0 use <code>sensors().get(ZooKeeperNode.HOSTNAME)</code>
instead */
+    @Deprecated
     String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
index f0eee04..cbf055e 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
@@ -26,13 +26,20 @@ public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements
ZooKeepe
     public ZooKeeperNodeImpl() {}
 
     @Override
-    public Class<?> getDriverInterface() {
-        return ZooKeeperDriver.class;
+    public void init() {
+        super.init();
+        // MY_ID was changed from a sensor to config. Publish it as a sensor to maintain
+        // compatibility with any blueprints that reference it.
+        Integer myId = config().get(MY_ID);
+        if (myId == null) {
+            throw new NullPointerException("Require value for " + MY_ID.getName());
+        }
+        sensors().set(MY_ID, myId);
     }
 
     @Override
-    public void init() {
-        super.init();
-        sensors().set(ZooKeeperNode.MY_ID, 1);
+    public Class<?> getDriverInterface() {
+        return ZooKeeperDriver.class;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
index e4ed338..c7b1dc9 100644
--- a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
@@ -54,8 +54,8 @@ public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements
         return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE);
     }
 
-    protected int getMyId() {
-        return entity.getAttribute(ZooKeeperNode.MY_ID);
+    protected Integer getMyId() {
+        return entity.config().get(ZooKeeperNode.MY_ID);
     }
 
     // FIXME All for one, and one for all! If any node fails then we're stuck waiting for
its hostname/port forever.
@@ -64,16 +64,19 @@ public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements
     public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException,
InterruptedException {
         List<ZooKeeperServerConfig> result = Lists.newArrayList();
 
-        if (entity.getParent().getClass().isAssignableFrom(ZooKeeperEnsemble.class)) {
+        if (entity.getParent() instanceof ZooKeeperEnsemble) {
             ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent();
 
             for (Entity member : ensemble.getMembers()) {
-                Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get();
+                Integer memberId = member.config().get(ZooKeeperNode.MY_ID);
+                if (memberId == null) {
+                    throw new IllegalStateException(member + " has null value for " + ZooKeeperNode.MY_ID);
+                }
                 String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get();
                 Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get();
                 Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get();
                 Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get();
-                result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort));
+                result.add(new ZooKeeperServerConfig(memberId, hostname, port, leaderPort,
electionPort));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
index 1015c76..16d7047 100644
--- a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
@@ -20,17 +20,13 @@ package org.apache.brooklyn.entity.messaging.zookeeper;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
-import java.net.Socket;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
 import org.apache.brooklyn.core.entity.trait.Startable;
@@ -51,12 +47,10 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * A live test of the {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble} entity.
@@ -70,10 +64,9 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport
{
     private static final String DEFAULT_LOCATION = "jclouds:aws-ec2:eu-west-1";
 
     private Location testLocation;
-    private ZooKeeperEnsemble cluster;
     private String locationSpec;
 
-    @BeforeClass(groups = "Live")
+    @BeforeClass(alwaysRun = true)
     @Parameters({"locationSpec"})
     public void setLocationSpec(@Optional String locationSpec) {
         this.locationSpec = !Strings.isBlank(locationSpec)
@@ -96,60 +89,46 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport
{
     public void testStartUpConnectAndResize() throws Exception {
         final String zkDataPath = "/ensembletest";
         final int initialSize = 3;
-        try {
-            cluster = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
-                    .configure(DynamicCluster.INITIAL_SIZE, initialSize)
-                    .configure(ZooKeeperEnsemble.CLUSTER_NAME, "ZooKeeperEnsembleLiveTest"));
-
-            app.start(ImmutableList.of(testLocation));
-
-            Entities.dumpInfo(app);
-            EntityAsserts.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE,
3);
-            EntityAsserts.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP,
true);
-            Set<Integer> nodeIds = Sets.newHashSet();
-            for (Entity zkNode : cluster.getMembers()) {
-                assertSocketOpen(zkNode);
-                nodeIds.add(zkNode.sensors().get(ZooKeeperNode.MY_ID));
-            }
-            assertEquals(nodeIds.size(), initialSize, "expected " + initialSize + " node
ids, found " + Iterables.toString(nodeIds));
-
-            // Write data to one and read from the others.
-            List<String> servers = cluster.sensors().get(ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
-            assertNotNull(servers, "value for sensor should not be null: " + ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
-            assertEquals(servers.size(), initialSize, "expected " + initialSize + " entries
in " + servers);
-
-            // Write to one
-            String firstServer = servers.get(0);
-            HostAndPort conn = HostAndPort.fromString(firstServer);
-            log.info("Writing data to {}", conn);
-            try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(conn)) {
-                zkts.create(zkDataPath, "data".getBytes());
-                assertEquals(new String(zkts.get(zkDataPath)), "data");
-            }
-
-            // And read from the others.
-            for (int i = 1; i < servers.size(); i++) {
-                conn = HostAndPort.fromString(servers.get(i));
-                log.info("Asserting that data can be read from {}", conn);
-                assertPathDataEventually(conn, zkDataPath, "data");
-            }
-
-            cluster.resize(1);
-            EntityAsserts.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE,
1);
-            EntityAsserts.assertAttributeEqualsContinually(cluster, Startable.SERVICE_UP,
true);
-
-            // TODO: assert that data can still be read.
-            for (Entity zkNode : cluster.getMembers()) {
-                assertSocketOpen(zkNode);
-            }
-        } catch (Throwable e) {
-            throw Throwables.propagate(e);
+        ZooKeeperEnsemble ensemble = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
+                .configure(DynamicCluster.INITIAL_SIZE, initialSize)
+                .configure(ZooKeeperEnsemble.CLUSTER_NAME, "ZooKeeperEnsembleLiveTest"));
+
+        app.start(ImmutableList.of(testLocation));
+        Entities.dumpInfo(app);
+
+        EntityAsserts.assertAttributeEqualsEventually(ensemble, ZooKeeperEnsemble.GROUP_SIZE,
3);
+        EntityAsserts.assertAttributeEqualsEventually(ensemble, Startable.SERVICE_UP, true);
+        Set<Integer> nodeIds = Sets.newHashSet();
+        for (Entity zkNode : ensemble.getMembers()) {
+            nodeIds.add(zkNode.config().get(ZooKeeperNode.MY_ID));
+        }
+        assertEquals(nodeIds.size(), initialSize, "expected " + initialSize + " node ids,
found " + Iterables.toString(nodeIds));
+
+        // Write data to one and read from the others.
+        List<String> servers = ensemble.sensors().get(ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+        assertNotNull(servers, "value for sensor should not be null: " + ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+        assertEquals(servers.size(), initialSize, "expected " + initialSize + " entries in
" + servers);
+
+        // Write to one
+        String firstServer = servers.get(0);
+        HostAndPort conn = HostAndPort.fromString(firstServer);
+        log.info("Writing data to {}", conn);
+        try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(conn)) {
+            zkts.create(zkDataPath, "data".getBytes());
+            assertEquals(new String(zkts.get(zkDataPath)), "data");
+        }
+
+        // And read from the others.
+        for (int i = 1; i < servers.size(); i++) {
+            conn = HostAndPort.fromString(servers.get(i));
+            log.info("Asserting that data can be read from {}", conn);
+            assertPathDataEventually(conn, zkDataPath, "data");
         }
     }
 
     protected void assertPathDataEventually(HostAndPort hostAndPort, final String path, String
expected) throws Exception {
         try (ZooKeeperTestSupport zkts = new ZooKeeperTestSupport(hostAndPort)) {
-            Asserts.eventually(new Supplier<String>() {
+            final Supplier<String> dataSupplier = new Supplier<String>() {
                 @Override
                 public String get() {
                     try {
@@ -158,30 +137,9 @@ public class ZooKeeperEnsembleLiveTest extends BrooklynAppLiveTestSupport
{
                         throw Exceptions.propagate(e);
                     }
                 }
-            }, Predicates.equalTo(expected));
+            };
+            Asserts.eventually(dataSupplier, Predicates.equalTo(expected));
         }
-
     }
 
-    protected void assertSocketOpen(Entity node) {
-        assertTrue(isSocketOpen(node));
-    }
-
-    protected static boolean isSocketOpen(Entity node) {
-        int attempt = 0, maxAttempts = 20;
-        while(attempt < maxAttempts) {
-            try {
-                final String host = node.sensors().get(Attributes.HOSTNAME);
-                final int port = node.sensors().get(ZooKeeperNode.ZOOKEEPER_PORT);
-                Socket s = new Socket(host, port);
-                s.close();
-                return true;
-            } catch (Exception e) {
-                attempt++;
-            }
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-        return false;
-    }
-    
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ae000a50/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
index 042e9bf..f1987d2 100644
--- a/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
+++ b/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperTestSupport.java
@@ -22,6 +22,7 @@ package org.apache.brooklyn.entity.messaging.zookeeper;
 import java.io.Closeable;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.brooklyn.location.paas.PaasLocation;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
@@ -29,6 +30,8 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
@@ -37,16 +40,20 @@ import com.google.common.net.HostAndPort;
  */
 public class ZooKeeperTestSupport implements Closeable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperTestSupport.class);
     private final ZooKeeper zk;
     private final CountDownLatch connSignal = new CountDownLatch(1);
 
-    public ZooKeeperTestSupport(HostAndPort hostAndPort) throws Exception {
+    public ZooKeeperTestSupport(final HostAndPort hostAndPort) throws Exception {
         final int sessionTimeout = 3000;
         zk = new ZooKeeper(hostAndPort.toString(), sessionTimeout, new Watcher() {
             @Override
             public void process(WatchedEvent event) {
                 if (event.getState() == Event.KeeperState.SyncConnected) {
+                    LOG.debug("Connected to ZooKeeper at {}", hostAndPort);
                     connSignal.countDown();
+                } else {
+                    LOG.info("WatchedEvent at {}: {}", hostAndPort, event.getState());
                 }
             }
         });


Mime
View raw message