brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [04/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages
Date Thu, 06 Aug 2015 16:32:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
deleted file mode 100644
index 4c7e08c..0000000
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.solr;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import org.apache.solr.common.SolrDocument;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.trait.Startable;
-import brooklyn.test.EntityTestUtils;
-import brooklyn.util.collections.MutableMap;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-/**
- * Solr integration tests.
- *
- * Test the operation of the {@link SolrServer} class.
- */
-public class SolrServerIntegrationTest extends AbstractSolrServerTest {
-
-    /**
-     * Test that a node starts and sets SERVICE_UP correctly.
-     */
-    @Test(groups = "Integration")
-    public void canStartupAndShutdown() {
-        solr = app.createAndManageChild(EntitySpec.create(SolrServer.class));
-        app.start(ImmutableList.of(testLocation));
-
-        EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
-        Entities.dumpInfo(app);
-
-        solr.stop();
-
-        EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, false);
-    }
-
-    /**
-     * Test that a core can be created and used with SolrJ client.
-     */
-    @Test(groups = "Integration")
-    public void testConnection() throws Exception {
-        solr = app.createAndManageChild(EntitySpec.create(SolrServer.class)
-                .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz")));
-        app.start(ImmutableList.of(testLocation));
-
-        EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
-
-        SolrJSupport client = new SolrJSupport(solr, "example");
-
-        Iterable<SolrDocument> results = client.getDocuments();
-        assertTrue(Iterables.isEmpty(results));
-
-        client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first"));
-        client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second"));
-        client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third"));
-        client.commit();
-
-        results = client.getDocuments();
-        assertEquals(Iterables.size(results), 3);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
deleted file mode 100644
index 82fb107..0000000
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.solr;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Map;
-
-import org.apache.solr.common.SolrDocument;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.trait.Startable;
-import brooklyn.test.EntityTestUtils;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.text.Strings;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-/**
- * Solr live tests.
- *
- * Test the operation of the {@link SolrServer} class using the jclouds {@code rackspace-cloudservers-uk}
- * and {@code aws-ec2} providers, with different OS images. The tests use the {@link SolrJSupport} class
- * to exercise the node, and will need to have {@code brooklyn.jclouds.provider.identity} and {@code .credential}
- * set, usually in the {@code .brooklyn/brooklyn.properties} file.
- */
-public class SolrServerLiveTest extends AbstractSolrServerTest {
-
-    private static final Logger log = LoggerFactory.getLogger(SolrServerLiveTest.class);
-
-    @DataProvider(name = "virtualMachineData")
-    public Object[][] provideVirtualMachineData() {
-        return new Object[][] { // ImageId, Provider, Region, Description (for logging)
-            new Object[] { "eu-west-1/ami-0307d674", "aws-ec2", "eu-west-1", "Ubuntu Server 14.04 LTS (HVM), SSD Volume Type" },
-            new Object[] { "LON/f9b690bf-88eb-43c2-99cf-391f2558732e", "rackspace-cloudservers-uk", "", "Ubuntu 12.04 LTS (Precise Pangolin)" }, 
-            new Object[] { "LON/a84b1592-6817-42da-a57c-3c13f3cfc1da", "rackspace-cloudservers-uk", "", "CentOS 6.5 (PVHVM)" }, 
-        };
-    }
-
-    @Test(groups = "Live", dataProvider = "virtualMachineData")
-    protected void testOperatingSystemProvider(String imageId, String provider, String region, String description) throws Exception {
-        log.info("Testing Solr on {}{} using {} ({})", new Object[] { provider, Strings.isNonEmpty(region) ? ":" + region : "", description, imageId });
-
-        Map<String, String> properties = MutableMap.of("imageId", imageId);
-        testLocation = app.getManagementContext().getLocationRegistry()
-                .resolve(provider + (Strings.isNonEmpty(region) ? ":" + region : ""), properties);
-        solr = app.createAndManageChild(EntitySpec.create(SolrServer.class)
-                .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz")));
-        app.start(ImmutableList.of(testLocation));
-
-        EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
-
-        SolrJSupport client = new SolrJSupport(solr, "example");
-
-        Iterable<SolrDocument> results = client.getDocuments();
-        assertTrue(Iterables.isEmpty(results));
-
-        client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first"));
-        client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second"));
-        client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third"));
-        client.commit();
-
-        results = client.getDocuments();
-        assertEquals(Iterables.size(results), 3);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
new file mode 100644
index 0000000..ab158bd
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.testng.annotations.BeforeMethod;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.location.Location;
+
+/**
+ * Cassandra test framework for integration and live tests.
+ */
+public class AbstractCassandraNodeTest extends BrooklynAppLiveTestSupport {
+
+    protected Location testLocation;
+    protected CassandraNode cassandra;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        testLocation = app.newLocalhostProvisioningLocation();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
new file mode 100644
index 0000000..b7587d7
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
+import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
+import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+
+/**
+ * Cassandra testing using Astyanax API.
+ */
+public class AstyanaxSupport {
+    private static final Logger log = LoggerFactory.getLogger(AstyanaxSupport.class);
+
+    public final String clusterName;
+    public final String hostname;
+    public final int thriftPort;
+    
+    public AstyanaxSupport(CassandraNode node) {
+        this(node.getClusterName(), node.getAttribute(Attributes.HOSTNAME), node.getThriftPort());
+    }
+    
+    public AstyanaxSupport(String clusterName, String hostname, int thriftPort) {
+        this.clusterName = clusterName;
+        this.hostname = hostname;
+        this.thriftPort = thriftPort;
+    }
+    
+    public AstyanaxContext<Keyspace> newAstyanaxContextForKeyspace(String keyspace) {
+        AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
+                .forCluster(clusterName)
+                .forKeyspace(keyspace)
+                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
+                        .setDiscoveryType(NodeDiscoveryType.NONE))
+                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool")
+                        .setPort(thriftPort)
+                        .setMaxConnsPerHost(1)
+                        .setConnectTimeout(5000) // 10s
+                        .setSeeds(String.format("%s:%d", hostname, thriftPort)))
+                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
+                .buildKeyspace(ThriftFamilyFactory.getInstance());
+
+        context.start();
+        return context;
+    }
+    
+    public AstyanaxContext<Cluster> newAstyanaxContextForCluster() {
+        AstyanaxContext<Cluster> context = new AstyanaxContext.Builder()
+                .forCluster(clusterName)
+                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
+                        .setDiscoveryType(NodeDiscoveryType.NONE))
+                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool")
+                        .setPort(thriftPort)
+                        .setMaxConnsPerHost(1)
+                        .setConnectTimeout(5000) // 10s
+                        .setSeeds(String.format("%s:%d", hostname, thriftPort)))
+                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
+                .buildCluster(ThriftFamilyFactory.getInstance());
+
+        context.start();
+        return context;
+    }
+    
+    public static class AstyanaxSample extends AstyanaxSupport {
+        
+        public static class Builder {
+            protected CassandraNode node;
+            protected String clusterName;
+            protected String hostname;
+            protected Integer thriftPort;
+            protected String columnFamilyName = Identifiers.makeRandomId(8);
+            
+            public Builder node(CassandraNode val) {
+                this.node = val;
+                clusterName = node.getClusterName();
+                hostname = node.getAttribute(Attributes.HOSTNAME);
+                thriftPort = node.getThriftPort();
+                return this;
+            }
+            public Builder host(String clusterName, String hostname, int thriftPort) {
+                this.clusterName = clusterName;
+                this.hostname = hostname;
+                this.thriftPort = thriftPort;
+                return this;
+            }
+            public Builder columnFamilyName(String val) {
+                this.columnFamilyName = val;
+                return this;
+            }
+            public AstyanaxSample build() {
+                return new AstyanaxSample(this);
+            }
+        }
+        
+        public static Builder builder() {
+            return new Builder();
+        }
+        
+        public final String columnFamilyName;
+        public final ColumnFamily<String, String> sampleColumnFamily;
+
+        public AstyanaxSample(CassandraNode node) {
+            this(builder().node(node));
+        }
+
+        public AstyanaxSample(String clusterName, String hostname, int thriftPort) {
+            this(builder().host(clusterName, hostname, thriftPort));
+        }
+
+        protected AstyanaxSample(Builder builder) {
+            super(builder.clusterName, builder.hostname, builder.thriftPort);
+            columnFamilyName = checkNotNull(builder.columnFamilyName, "columnFamilyName");
+            sampleColumnFamily = new ColumnFamily<String, String>(
+                    columnFamilyName, // Column Family Name
+                    StringSerializer.get(), // Key Serializer
+                    StringSerializer.get()); // Column Serializer
+        }
+
+        /**
+         * Exercise the {@link CassandraNode} using the Astyanax API.
+         */
+        public void astyanaxTest() throws Exception {
+            String keyspaceName = "BrooklynTests_"+Identifiers.makeRandomId(8);
+            writeData(keyspaceName);
+            readData(keyspaceName);
+        }
+
+        /**
+         * Write to a {@link CassandraNode} using the Astyanax API.
+         * @throws ConnectionException 
+         */
+        public void writeData(String keyspaceName) throws ConnectionException {
+            // Create context
+            AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName);
+            try {
+                Keyspace keyspace = context.getEntity();
+                try {
+                    checkNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName);
+                } catch (Exception ek) {
+                    // (Re) Create keyspace if needed (including if family name already existed, 
+                    // e.g. due to a timeout on previous attempt)
+                    log.debug("repairing Cassandra error by re-creating keyspace "+keyspace+": "+ek);
+                    try {
+                        log.debug("dropping Cassandra keyspace "+keyspace);
+                        keyspace.dropKeyspace();
+                    } catch (Exception e) {
+                        /* Ignore */ 
+                        log.debug("Cassandra keyspace "+keyspace+" could not be dropped (probably did not exist): "+e);
+                    }
+                    try {
+                        keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
+                                .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1"))
+                                .put("strategy_class", "SimpleStrategy")
+                                .build());
+                    } catch (SchemaDisagreementException e) {
+                        // discussion (but not terribly helpful) at http://stackoverflow.com/questions/6770894/schemadisagreementexception
+                        // let's just try again after a delay
+                        // (seems to have no effect; trying to fix by starting first node before others)
+                        log.warn("error creating Cassandra keyspace "+keyspace+" (retrying): "+e);
+                        Time.sleep(Duration.FIVE_SECONDS);
+                        keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
+                                .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1"))
+                                .put("strategy_class", "SimpleStrategy")
+                                .build());
+                    }
+                }
+                
+                assertNull(keyspace.describeKeyspace().getColumnFamily("Rabbits"), "key space for arbitrary column family Rabbits");
+                assertNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName);
+
+                // Create column family
+                keyspace.createColumnFamily(sampleColumnFamily, null);
+
+                // Insert rows
+                MutationBatch m = keyspace.prepareMutationBatch();
+                m.withRow(sampleColumnFamily, "one")
+                        .putColumn("name", "Alice", null)
+                        .putColumn("company", "Cloudsoft Corp", null);
+                m.withRow(sampleColumnFamily, "two")
+                        .putColumn("name", "Bob", null)
+                        .putColumn("company", "Cloudsoft Corp", null)
+                        .putColumn("pet", "Cat", null);
+
+                OperationResult<Void> insert = m.execute();
+                assertEquals(insert.getHost().getHostName(), hostname);
+                assertTrue(insert.getLatency() > 0L);
+            } finally {
+                context.shutdown();
+            }
+        }
+
+        /**
+         * Read from a {@link CassandraNode} using the Astyanax API.
+         * @throws ConnectionException 
+         */
+        public void readData(String keyspaceName) throws ConnectionException {
+            // Create context
+            AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName);
+            try {
+                Keyspace keyspace = context.getEntity();
+
+                // Query data
+                OperationResult<ColumnList<String>> query = keyspace.prepareQuery(sampleColumnFamily)
+                        .getKey("one")
+                        .execute();
+                assertEquals(query.getHost().getHostName(), hostname);
+                assertTrue(query.getLatency() > 0L);
+
+                ColumnList<String> columns = query.getResult();
+                assertEquals(columns.size(), 2);
+
+                // Lookup columns in response by name
+                String name = columns.getColumnByName("name").getStringValue();
+                assertEquals(name, "Alice");
+
+                // Iterate through the columns
+                for (Column<String> c : columns) {
+                    assertTrue(ImmutableList.of("name", "company").contains(c.getName()));
+                }
+            } finally {
+                context.shutdown();
+            }
+        }
+        
+
+        /**
+         * Returns the keyspace name to which the data has been written. If it fails the first time,
+         * then will increment the keyspace name. This is because the failure could be a response timeout,
+         * where the keyspace really has been created so subsequent attempts with the same name will 
+         * fail (because we assert that the keyspace did not exist).
+         */
+        public String writeData(String keyspacePrefix, int numRetries) throws ConnectionException {
+            int retryCount = 0;
+            while (true) {
+                try {
+                    String keyspaceName = keyspacePrefix + (retryCount > 0 ? "" : "_"+retryCount);
+                    writeData(keyspaceName);
+                    return keyspaceName;
+                } catch (Exception e) {
+                    log.warn("Error writing data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e);
+                    if (++retryCount > numRetries)
+                        throw Exceptions.propagate(e);
+                }
+            }
+        }
+
+        /**
+         * Repeatedly tries to read data from the given keyspace name. Asserts that the data is the
+         * same as would be written by calling {@code writeData(keyspaceName)}.
+         */
+        public void readData(String keyspaceName, int numRetries) throws ConnectionException {
+            int retryCount = 0;
+            while (true) {
+                try {
+                    readData(keyspaceName);
+                    return;
+                } catch (Exception e) {
+                    log.warn("Error reading data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e);
+                    if (++retryCount > numRetries)
+                        throw Exceptions.propagate(e);
+                }
+            }
+        }
+
+        /**
+         * Like {@link Assert#assertNull(Object, String)}, except throws IllegalStateException instead
+         */
+        private void checkNull(Object obj, String msg) {
+            if (obj != null) {
+                throw new IllegalStateException("Not null: "+msg+"; obj="+obj);
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        AstyanaxSample support = new AstyanaxSample("ignored", "ec2-79-125-32-2.eu-west-1.compute.amazonaws.com", 9160);
+        AstyanaxContext<Cluster> context = support.newAstyanaxContextForCluster();
+        try {
+            System.out.println(context.getEntity().describeSchemaVersions());
+        } finally {
+            context.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
new file mode 100644
index 0000000..ddd6243
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigInteger;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * An integration test of the {@link CassandraDatacenter} entity.
+ *
+ * Tests that a one node cluster can be started on localhost and data can be written/read, using the Astyanax API.
+ * 
+ * NOTE: If these tests fail with "Timeout waiting for SERVICE_UP" and "java.lang.IllegalStateException: Unable to contact any seeds!" 
+ * or "java.lang.RuntimeException: Unable to gossip with any seeds" appears in the log, it may be that the broadcast_address 
+ * (set to InetAddress.getLocalHost().getHostName()) is not resolving to the value specified in listen_address 
+ * (InetAddress.getLocalHost().getHostAddress()). You can work round this issue by ensuring that you machine has only one 
+ * address, e.g. by disabling wireless if you are also using a wired connection
+ */
+public class CassandraDatacenterIntegrationTest extends BrooklynAppLiveTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterIntegrationTest.class);
+
+    protected Location testLocation;
+    protected CassandraDatacenter cluster;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() throws Exception {
+        CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+        super.setUp();
+        testLocation = app.newLocalhostProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+    }
+    
+
+    @Test(groups = "Integration")
+    public void testStartAndShutdownClusterSizeOne() throws Exception {
+        EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+                .configure("initialSize", 1)
+                .configure("tokenShift", 42);
+        runStartAndShutdownClusterSizeOne(spec, true);
+    }
+    
+    /**
+     * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use
+     * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))}
+     */
+    @Test(groups = "Integration")
+    public void testStartAndShutdownClusterSizeOneCassandraVersion2() throws Exception {
+        String version = "2.0.9";
+        
+        EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+                .configure(CassandraNode.SUGGESTED_VERSION, version)
+                .configure("initialSize", 1);
+        runStartAndShutdownClusterSizeOne(spec, false);
+    }
+    
+    /**
+     * Test that a single node cluster starts up and allows access via the Astyanax API.
+     * Only one node because Cassandra can only run one node per VM!
+     */
+    protected void runStartAndShutdownClusterSizeOne(EntitySpec<CassandraDatacenter> datacenterSpec, final boolean assertToken) throws Exception {
+        cluster = app.createAndManageChild(datacenterSpec);
+        assertEquals(cluster.getCurrentSize().intValue(), 0);
+
+        app.start(ImmutableList.of(testLocation));
+        Entities.dumpInfo(app);
+        
+        final CassandraNode node = (CassandraNode) Iterables.get(cluster.getMembers(), 0);
+        String nodeAddr = checkNotNull(node.getAttribute(CassandraNode.HOSTNAME), "hostname") + ":" + checkNotNull(node.getAttribute(CassandraNode.THRIFT_PORT), "thriftPort");
+        
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 1);
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CASSANDRA_CLUSTER_NODES, ImmutableList.of(nodeAddr));
+
+        EntityTestUtils.assertAttributeEqualsEventually(node, Startable.SERVICE_UP, true);
+        if (assertToken) {
+            PosNeg63TokenGenerator tg = new PosNeg63TokenGenerator();
+            tg.growingCluster(1);
+            EntityTestUtils.assertAttributeEqualsEventually(node, CassandraNode.TOKEN, tg.newToken().add(BigInteger.valueOf(42)));
+        }
+
+        // may take some time to be consistent (with new thrift_latency checks on the node,
+        // contactability should not be an issue, but consistency still might be)
+        Asserts.succeedsEventually(MutableMap.of("timeout", 120*1000), new Runnable() {
+            public void run() {
+                boolean open = CassandraDatacenterLiveTest.isSocketOpen(node);
+                Boolean consistant = open ? CassandraDatacenterLiveTest.areVersionsConsistent(node) : null;
+                Integer numPeers = node.getAttribute(CassandraNode.PEERS);
+                Integer liveNodeCount = node.getAttribute(CassandraNode.LIVE_NODE_COUNT);
+                String msg = "consistency:  "
+                        + (!open ? "unreachable" : consistant==null ? "error" : consistant)+"; "
+                        + "peer group sizes: "+numPeers + "; live node count: " + liveNodeCount;
+                assertTrue(open, msg);
+                assertEquals(consistant, Boolean.TRUE, msg);
+                if (assertToken) {
+                    assertEquals(numPeers, (Integer)1, msg);
+                } else {
+                    assertTrue(numPeers != null && numPeers >= 1, msg);
+                }
+                assertEquals(liveNodeCount, (Integer)1, msg);
+            }});
+        
+        CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(node));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
new file mode 100644
index 0000000..d29bc1a
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigInteger;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+/**
+ * A live test of the {@link CassandraDatacenter} entity.
+ *
+ * Tests that a two node cluster can be started on Amazon EC2 and data written on one {@link CassandraNode}
+ * can be read from another, using the Astyanax API.
+ */
+public class CassandraDatacenterLiveTest extends BrooklynAppLiveTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterLiveTest.class);
+    
+    private String provider = 
+            "aws-ec2:eu-west-1";
+//            "rackspace-cloudservers-uk";
+//            "named:hpcloud-compute-at";
+//            "localhost";
+//            "jcloudsByon:(provider=\"aws-ec2\",region=\"us-east-1\",user=\"aled\",hosts=\"i-6f374743,i-35324219,i-1135453d\")";
+
+    protected Location testLocation;
+    protected CassandraDatacenter cluster;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        testLocation = mgmt.getLocationRegistry().resolve(provider);
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+    
+    @Test(groups = "Live")
+    public void testDatacenter() throws Exception {
+        EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+                .configure("initialSize", 2)
+                .configure("clusterName", "CassandraClusterLiveTest");
+        runCluster(spec, false);
+    }
+    
+    @Test(groups = "Live")
+    public void testDatacenterWithVnodes() throws Exception {
+        EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+                .configure("initialSize", 2)
+                .configure(CassandraDatacenter.USE_VNODES, true)
+                .configure("clusterName", "CassandraClusterLiveTest");
+        runCluster(spec, true);
+    }
+    
+    /*
+     * TODO on some distros (e.g. CentOS?), it comes pre-installed with java 6. Installing java 7 
+     * didn't seem to be enough. I also had to set JAVA_HOME:
+     *     .configure("shell.env", MutableMap.of("JAVA_HOME", "/etc/alternatives/java_sdk_1.7.0"))
+     * However, that would break other deployments such as on Ubuntu where JAVA_HOME would be different.
+     */
+    @Test(groups = "Live")
+    public void testDatacenterWithVnodesVersion2() throws Exception {
+        EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+                .configure("initialSize", 2)
+                .configure(CassandraNode.SUGGESTED_VERSION, "2.0.9")
+                .configure(CassandraDatacenter.USE_VNODES, true)
+                .configure("clusterName", "CassandraClusterLiveTest");
+        runCluster(spec, true);
+    }
+
+    @Test(groups = {"Live", "Acceptance"}, invocationCount=10)
+    public void testManyTimes() throws Exception {
+        testDatacenter();
+    }
+
+    /**
+     * Test a Cassandra Datacenter:
+     * <ol>
+     *   <li>Create two node datacenter
+     *   <li>Confirm allows access via the Astyanax API through both nodes.
+     *   <li>Confirm can size
+     * </ol>
+     */
+    protected void runCluster(EntitySpec<CassandraDatacenter> datacenterSpec, boolean usesVnodes) throws Exception {
+        cluster = app.createAndManageChild(datacenterSpec);
+        assertEquals(cluster.getCurrentSize().intValue(), 0);
+
+        app.start(ImmutableList.of(testLocation));
+
+        // Check cluster is up and healthy
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 2);
+        Entities.dumpInfo(app);
+        List<CassandraNode> members = castToCassandraNodes(cluster.getMembers());
+        assertNodesConsistent(members);
+
+        if (usesVnodes) {
+            assertVnodeTokensConsistent(members);
+        } else {
+            assertSingleTokenConsistent(members);
+        }
+        
+        // Can connect via Astyanax
+        checkConnectionRepeatedly(2, 5, members);
+
+        // Resize
+        cluster.resize(3);
+        assertEquals(cluster.getMembers().size(), 3, "members="+cluster.getMembers());
+        if (usesVnodes) {
+            assertVnodeTokensConsistent(castToCassandraNodes(cluster.getMembers()));
+        } else {
+            assertSingleTokenConsistent(castToCassandraNodes(cluster.getMembers()));
+        }
+        checkConnectionRepeatedly(2, 5, cluster.getMembers());
+    }
+
+    protected static List<CassandraNode> castToCassandraNodes(Collection<? extends Entity> rawnodes) {
+        final List<CassandraNode> nodes = Lists.newArrayList();
+        for (Entity node : rawnodes) {
+            nodes.add((CassandraNode) node);
+        }
+        return nodes;
+    }
+
+    protected static void assertNodesConsistent(final List<CassandraNode> nodes) {
+        final Integer expectedLiveNodeCount = nodes.size();
+        // may take some time to be consistent (with new thrift_latency checks on the node,
+        // contactability should not be an issue, but consistency still might be)
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+            public void run() {
+                for (Entity n : nodes) {
+                    CassandraNode node = (CassandraNode) n;
+                    EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+                    String errmsg = "node="+node+"; hostname="+node.getAttribute(Attributes.HOSTNAME)+"; port="+node.getThriftPort();
+                    assertTrue(isSocketOpen(node), errmsg);
+                    assertTrue(areVersionsConsistent(node), errmsg);
+                    EntityTestUtils.assertAttributeEquals(node, CassandraNode.LIVE_NODE_COUNT, expectedLiveNodeCount);
+                }
+            }});
+    }
+    
+    protected static void assertSingleTokenConsistent(final List<CassandraNode> nodes) {
+        final int numNodes = nodes.size();
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+            public void run() {
+                Set<BigInteger> alltokens = Sets.newLinkedHashSet();
+                for (Entity node : nodes) {
+                    EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+                    EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 1);
+                    EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, numNodes);
+                    BigInteger token = node.getAttribute(CassandraNode.TOKEN);
+                    Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
+                    assertNotNull(token);
+                    assertEquals(tokens, ImmutableSet.of(token));
+                    alltokens.addAll(tokens);
+                }
+                assertEquals(alltokens.size(), numNodes);
+            }});
+    }
+
+    protected static void assertVnodeTokensConsistent(final List<CassandraNode> nodes) {
+        final int numNodes = nodes.size();
+        final int tokensPerNode = Iterables.get(nodes, 0).getNumTokensPerNode();
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+            public void run() {
+                Set<BigInteger> alltokens = Sets.newLinkedHashSet();
+                for (Entity node : nodes) {
+                    EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+                    EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, tokensPerNode*numNodes);
+                    EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 256);
+                    BigInteger token = node.getAttribute(CassandraNode.TOKEN);
+                    Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
+                    assertNotNull(token);
+                    assertEquals(tokens.size(), tokensPerNode, "tokens="+tokens);
+                    alltokens.addAll(tokens);
+                }
+                assertEquals(alltokens.size(), tokensPerNode*numNodes);
+            }});
+    }
+
+    protected static void checkConnectionRepeatedly(int totalAttemptsAllowed, int numRetriesPerAttempt, Iterable<? extends Entity> nodes) throws Exception {
+        int attemptNum = 0;
+        while (true) {
+            try {
+                checkConnection(numRetriesPerAttempt, nodes);
+                return;
+            } catch (Exception e) {
+                attemptNum++;
+                if (attemptNum >= totalAttemptsAllowed) {
+                    log.warn("Cassandra not usable, "+attemptNum+" attempts; failing: "+e, e);
+                    throw e;                
+                }
+                log.warn("Cassandra not usable (attempt "+attemptNum+" of "+totalAttemptsAllowed+"), trying again after delay: "+e, e);
+                Time.sleep(Duration.TEN_SECONDS);
+            }
+        }
+    }
+
+    protected static void checkConnection(int numRetries, Iterable<? extends Entity> nodes) throws ConnectionException {
+        CassandraNode first = (CassandraNode) Iterables.get(nodes, 0);
+        
+        // have been seeing intermittent SchemaDisagreementException errors on AWS, probably due to Astyanax / how we are using it
+        // (confirmed that clocks are in sync)
+        String uniqueName = Identifiers.makeRandomId(8);
+        AstyanaxSample astyanaxFirst = AstyanaxSample.builder().node(first).columnFamilyName(uniqueName).build();
+        Map<String, List<String>> versions;
+        AstyanaxContext<Cluster> context = astyanaxFirst.newAstyanaxContextForCluster();
+        try {
+            versions = context.getEntity().describeSchemaVersions();
+        } finally {
+            context.shutdown();
+        }
+            
+        log.info("Cassandra schema versions are: "+versions);
+        if (versions.size() > 1) {
+            Assert.fail("Inconsistent versions on Cassandra start: "+versions);
+        }
+        String keyspacePrefix = "BrooklynTests_"+Identifiers.makeRandomId(8);
+
+        String keyspaceName = astyanaxFirst.writeData(keyspacePrefix, numRetries);
+
+        for (Entity node : nodes) {
+            AstyanaxSample astyanaxSecond = AstyanaxSample.builder().node((CassandraNode)node).columnFamilyName(uniqueName).build();
+            astyanaxSecond.readData(keyspaceName, numRetries);
+        }
+    }
+
+    protected static Boolean areVersionsConsistent(CassandraNode node) {
+        AstyanaxContext<Cluster> context = null;
+        try {
+            context = new AstyanaxSample(node).newAstyanaxContextForCluster();
+            Map<String, List<String>> v = context.getEntity().describeSchemaVersions();
+            return v.size() == 1;
+        } catch (Exception e) {
+            return null;
+        } finally {
+            if (context != null) context.shutdown();
+        }
+    }
+
+    protected static boolean isSocketOpen(CassandraNode node) {
+        try {
+            Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getThriftPort());
+            s.close();
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
new file mode 100644
index 0000000..4c2a248
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.math.BigInteger;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.proxy.nginx.NginxController;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.rebind.RebindOptions;
+import brooklyn.entity.rebind.RebindTestFixtureWithApp;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * Test the operation of the {@link NginxController} class.
+ */
+public class CassandraDatacenterRebindIntegrationTest extends RebindTestFixtureWithApp {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraDatacenterRebindIntegrationTest.class);
+
+    private LocalhostMachineProvisioningLocation localhostProvisioningLocation;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+        super.setUp();
+        localhostProvisioningLocation = origApp.newLocalhostProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+    }
+    
+    /**
+     * Test that Brooklyn can rebind to a single node datacenter.
+     */
+    @Test(groups = "Integration")
+    public void testRebindDatacenterOfSizeOne() throws Exception {
+        CassandraDatacenter origDatacenter = origApp.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+                .configure("initialSize", 1));
+
+        origApp.start(ImmutableList.of(localhostProvisioningLocation));
+        CassandraNode origNode = (CassandraNode) Iterables.get(origDatacenter.getMembers(), 0);
+
+        EntityTestUtils.assertAttributeEqualsEventually(origDatacenter, CassandraDatacenter.GROUP_SIZE, 1);
+        CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(origNode));
+        CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(origNode));
+        CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(origNode));
+        BigInteger origToken = origNode.getAttribute(CassandraNode.TOKEN);
+        Set<BigInteger> origTokens = origNode.getAttribute(CassandraNode.TOKENS);
+        assertNotNull(origToken);
+        
+        newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        final CassandraDatacenter newDatacenter = (CassandraDatacenter) Iterables.find(newApp.getChildren(), Predicates.instanceOf(CassandraDatacenter.class));
+        final CassandraNode newNode = (CassandraNode) Iterables.find(newDatacenter.getMembers(), Predicates.instanceOf(CassandraNode.class));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(newDatacenter, CassandraDatacenter.GROUP_SIZE, 1);
+        EntityTestUtils.assertAttributeEqualsEventually(newNode, Startable.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKEN, origToken);
+        EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKENS, origTokens);
+        CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(newNode));
+        CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(newNode));
+        CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(newNode));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
new file mode 100644
index 0000000..3a1d202
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EmptySoftwareProcess;
+import brooklyn.entity.basic.EmptySoftwareProcessSshDriver;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.LocationSpec;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.text.TemplateProcessor;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class CassandraDatacenterTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterTest.class);
+    
+    private LocalhostMachineProvisioningLocation loc;
+    private CassandraDatacenter cluster;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        loc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+    }
+    
+    @Test
+    public void testPopulatesInitialSeeds() throws Exception {
+        cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+                .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+        app.start(ImmutableList.of(loc));
+        EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0);
+        EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2));
+    }
+    
+    @Test(groups="Integration") // because takes approx 2 seconds
+    public void testUpdatesSeedsOnFailuresAndAdditions() throws Exception {
+        doTestUpdatesSeedsOnFailuresAndAdditions(true, false);
+    }
+    
+    protected void doTestUpdatesSeedsOnFailuresAndAdditions(boolean fast, boolean checkSeedsConstantOnRejoining) throws Exception {
+        cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+                .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+        app.start(ImmutableList.of(loc));
+        EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0);
+        EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1);
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2));
+        log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e1="+e1+" e2="+e2);
+        
+        // calling the driver stop for this entity will cause SERVICE_UP to become false, and stay false
+        // (and that's all it does, incidentally); if we just set the attribute it will become true on serviceUp sensor feed
+        ((EmptySoftwareProcess)e1).getDriver().stop();
+        // not necessary, but speeds things up:
+        if (fast)
+            ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, false);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2));
+
+        cluster.resize(3);
+        EmptySoftwareProcess e3 = (EmptySoftwareProcess) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), ImmutableSet.of(e1,e2)));
+        log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e3="+e3);
+        try {
+            EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3));
+        } finally {
+            log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; seeds "+cluster.getAttribute(CassandraDatacenter.CURRENT_SEEDS));
+        }
+        
+        if (!checkSeedsConstantOnRejoining) {
+            // cluster should not revert to e1+e2, simply because e1 has come back; but e1 should rejoin the group
+            // (not that important, and waits for 1s, so only done as part of integration)
+            ((EmptySoftwareProcessSshDriver)(((EmptySoftwareProcess)e1).getDriver())).launch();
+            if (fast)
+                ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsEventually(e1, CassandraNode.SERVICE_UP, true);
+            EntityTestUtils.assertAttributeEqualsContinually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3));
+        }
+    }
+    
+    @Test
+    public void testPopulatesInitialTokens() throws Exception {
+        cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+                .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+        app.start(ImmutableList.of(loc));
+
+        Set<BigInteger> tokens = Sets.newLinkedHashSet();
+        Set<BigInteger> tokens2 = Sets.newLinkedHashSet();
+        for (Entity member : cluster.getMembers()) {
+            BigInteger memberToken = member.getConfig(CassandraNode.TOKEN);
+            Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS);
+            if (memberToken != null) tokens.add(memberToken);
+            if (memberTokens != null) tokens2.addAll(memberTokens);
+        }
+        assertEquals(tokens, ImmutableSet.of(new BigInteger("-9223372036854775808"), BigInteger.ZERO));
+        assertEquals(tokens2, ImmutableSet.of());
+    }
+    
+    @Test
+    public void testDoesNotPopulateInitialTokens() throws Exception {
+        cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+                .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                .configure(CassandraDatacenter.USE_VNODES, true)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+        app.start(ImmutableList.of(loc));
+
+        Set<BigInteger> tokens = Sets.newLinkedHashSet();
+        Set<BigInteger> tokens2 = Sets.newLinkedHashSet();
+        for (Entity member : cluster.getMembers()) {
+            BigInteger memberToken = member.getConfig(CassandraNode.TOKEN);
+            Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS);
+            if (memberToken != null) tokens.add(memberToken);
+            if (memberTokens != null) tokens2.addAll(memberTokens);
+        }
+        assertEquals(tokens, ImmutableSet.of());
+        assertEquals(tokens2, ImmutableSet.of());
+    }
+    
+    public static class MockInputForTemplate {
+        public BigInteger getToken() { return new BigInteger("-9223372036854775808"); }
+        public String getTokensAsString() { return "" + getToken(); }
+        public int getNumTokensPerNode() { return 1; }
+        public String getSeeds() { return ""; }
+        public int getGossipPort() { return 1234; }
+        public int getSslGossipPort() { return 1234; }
+        public int getThriftPort() { return 1234; }
+        public int getNativeTransportPort() { return 1234; }
+        public String getClusterName() { return "Mock"; }
+        public String getEndpointSnitchName() { return ""; }
+        public String getListenAddress() { return "0"; }
+        public String getBroadcastAddress() { return "0"; }
+        public String getRpcAddress() { return "0"; }
+        public String getRunDir() { return "/tmp/mock"; }
+    }
+    
+    @Test
+    public void testBigIntegerFormattedCorrectly() {
+        Map<String, Object> substitutions = ImmutableMap.<String, Object>builder()
+                .put("entity", new MockInputForTemplate())
+                .put("driver", new MockInputForTemplate())
+                .build();
+
+        String templatedUrl = CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL.getDefaultValue();
+        String url = TemplateProcessor.processTemplateContents(templatedUrl, ImmutableMap.of("entity", ImmutableMap.of("majorMinorVersion", "1.2")));
+        String templateContents = new ResourceUtils(this).getResourceAsString(url);
+        String processedTemplate = TemplateProcessor.processTemplateContents(templateContents, substitutions);
+        Assert.assertEquals(processedTemplate.indexOf("775,808"), -1);
+        Assert.assertTrue(processedTemplate.indexOf("-9223372036854775808") > 0);
+    }
+    
+    @Test(groups="Integration") // because takes approx 30 seconds
+    public void testUpdatesSeedsFastishManyTimes() throws Exception {
+        final int COUNT = 20;
+        for (int i=0; i<COUNT; i++) {
+            log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT);
+            try {
+                doTestUpdatesSeedsOnFailuresAndAdditions(true, true);
+                tearDown();
+                setUp();
+            } catch (Exception e) {
+                log.warn("Error in "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT, e);
+                throw e;
+            }
+        }
+    }
+    
+    @Test(groups="Integration") // because takes approx 5 seconds
+    public void testUpdateSeedsSlowAndRejoining() throws Exception {
+        final int COUNT = 1;
+        for (int i=0; i<COUNT; i++) {
+            log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT);
+            doTestUpdatesSeedsOnFailuresAndAdditions(false, true);
+            tearDown();
+            setUp();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
new file mode 100644
index 0000000..cbf55ed
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraFabric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EmptySoftwareProcess;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.location.LocationSpec;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class CassandraFabricTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraFabricTest.class);
+    
+    private LocalhostMachineProvisioningLocation loc1;
+    private LocalhostMachineProvisioningLocation loc2;
+    private CassandraFabric fabric;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        loc1 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+        loc2 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+    }
+    
+    @Test
+    public void testPopulatesInitialSeeds() throws Exception {
+        fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class)
+                .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class)
+                        .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                        .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))));
+
+        app.start(ImmutableList.of(loc1, loc2));
+        CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0);
+        CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1);
+
+        final EmptySoftwareProcess d1a = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 0);
+        final EmptySoftwareProcess d1b = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 1);
+
+        final EmptySoftwareProcess d2a = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 0);
+        final EmptySoftwareProcess d2b = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 1);
+
+        Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() {
+            @Override public boolean apply(Set<Entity> input) {
+                return input != null && input.size() >= 2 &&
+                        Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 &&
+                        Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1;
+            }
+        };
+        EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate);
+        EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate);
+        EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate);
+        
+        Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS);
+        assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+        assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+        log.info("Seeds="+seeds);
+    }
+
+    @Test
+    public void testPopulatesInitialSeedsWhenNodesOfOneClusterComeUpBeforeTheOtherCluster() throws Exception {
+        fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class)
+                .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2)
+                .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+                .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class)
+                        .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+                        .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(DummyCassandraNode.class))));
+
+        Thread t = new Thread() {
+            public void run() {
+                app.start(ImmutableList.of(loc1, loc2));
+            }
+        };
+        t.start();
+        try {
+            EntityTestUtils.assertGroupSizeEqualsEventually(fabric, 2);
+            CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0);
+            CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1);
+    
+            EntityTestUtils.assertGroupSizeEqualsEventually(d1, 2);
+            final DummyCassandraNode d1a = (DummyCassandraNode) Iterables.get(d1.getMembers(), 0);
+            final DummyCassandraNode d1b = (DummyCassandraNode) Iterables.get(d1.getMembers(), 1);
+    
+            EntityTestUtils.assertGroupSizeEqualsEventually(d2, 2);
+            final DummyCassandraNode d2a = (DummyCassandraNode) Iterables.get(d2.getMembers(), 0);
+            final DummyCassandraNode d2b = (DummyCassandraNode) Iterables.get(d2.getMembers(), 1);
+
+            d1a.setAttribute(Attributes.HOSTNAME, "d1a");
+            d1b.setAttribute(Attributes.HOSTNAME, "d1b");
+            
+            Thread.sleep(1000);
+            d2a.setAttribute(Attributes.HOSTNAME, "d2a");
+            d2b.setAttribute(Attributes.HOSTNAME, "d2b");
+            
+            Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() {
+                @Override public boolean apply(Set<Entity> input) {
+                    return input != null && input.size() >= 2 &&
+                            Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 &&
+                            Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1;
+                }
+            };
+            EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate);
+            EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate);
+            EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate);
+            
+            Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS);
+            assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+            assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+            log.info("Seeds="+seeds);
+        } finally {
+            log.info("Failed seeds; fabric="+fabric.getAttribute(CassandraFabric.CURRENT_SEEDS));
+            t.interrupt();
+        }
+    }
+    
+    
+    @ImplementedBy(DummyCassandraNodeImpl.class)
+    public interface DummyCassandraNode extends Entity, Startable, EntityLocal, EntityInternal {
+    }
+    
+    public static class DummyCassandraNodeImpl extends AbstractEntity implements DummyCassandraNode {
+
+        @Override
+        public void start(Collection<? extends Location> locations) {
+            ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
+        }
+
+        @Override
+        public void stop() {
+            ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING);
+        }
+
+        @Override
+        public void restart() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
new file mode 100644
index 0000000..495843f
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.Location;
+import brooklyn.test.EntityTestUtils;
+
+import com.google.common.collect.ImmutableList;
+
+public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraNodeEc2LiveTest.class);
+
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        log.info("Testing Cassandra on {}", loc);
+
+        CassandraNode cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+                .configure("thriftPort", "9876+")
+                .configure("clusterName", "TestCluster"));
+        app.start(ImmutableList.of(loc));
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, CassandraNode.SERVICE_UP, true);
+
+        AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+        astyanax.astyanaxTest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
new file mode 100644
index 0000000..b5a657f
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.NetworkingTestUtils;
+import brooklyn.util.math.MathPredicates;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * Cassandra integration tests.
+ *
+ * Test the operation of the {@link CassandraNode} class.
+ */
+public class CassandraNodeIntegrationTest extends AbstractCassandraNodeTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraNodeIntegrationTest.class);
+
+    public static void assertCassandraPortsAvailableEventually() {
+        Map<String, Integer> ports = getCassandraDefaultPorts();
+        NetworkingTestUtils.assertPortsAvailableEventually(ports);
+        LOG.info("Confirmed Cassandra ports are available: "+ports);
+    }
+    
+    public static Map<String, Integer> getCassandraDefaultPorts() {
+        List<PortAttributeSensorAndConfigKey> ports = ImmutableList.of(
+                CassandraNode.GOSSIP_PORT, 
+                CassandraNode.SSL_GOSSIP_PORT, 
+                CassandraNode.THRIFT_PORT, 
+                CassandraNode.NATIVE_TRANSPORT_PORT, 
+                CassandraNode.RMI_REGISTRY_PORT);
+        Map<String, Integer> result = Maps.newLinkedHashMap();
+        for (PortAttributeSensorAndConfigKey key : ports) {
+            result.put(key.getName(), key.getConfigKey().getDefaultValue().iterator().next());
+        }
+        return result;
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() throws Exception {
+        assertCassandraPortsAvailableEventually();
+        super.setUp();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        assertCassandraPortsAvailableEventually();
+    }
+    
+    /**
+     * Test that a node starts and sets SERVICE_UP correctly.
+     */
+    @Test(groups = "Integration")
+    public void canStartupAndShutdown() {
+        cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+                .configure("jmxPort", "11099+")
+                .configure("rmiRegistryPort", "19001+"));
+        app.start(ImmutableList.of(testLocation));
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+        Entities.dumpInfo(app);
+
+        cassandra.stop();
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false);
+    }
+
+    /**
+     * Test that a keyspace and column family can be created and used with Astyanax client.
+     */
+    @Test(groups = "Integration")
+    public void testConnection() throws Exception {
+        cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+                .configure("jmxPort", "11099+")
+                .configure("rmiRegistryPort", "19001+")
+                .configure("thriftPort", "9876+"));
+        app.start(ImmutableList.of(testLocation));
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+
+        AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+        astyanax.astyanaxTest();
+    }
+    
+    /**
+     * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use
+     * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))}
+     */
+    @Test(groups = "Integration")
+    public void testCassandraVersion2() throws Exception {
+        // TODO In v2.0.10, the bin/cassandra script changed to add an additional check for JMX connectivity.
+        // This causes cassandera script to hang for us (presumably due to the CLASSPATH/JVM_OPTS we're passing
+        // in, regarding JMX agent).
+        // See:
+        //  - https://issues.apache.org/jira/browse/CASSANDRA-7254
+        //  - https://github.com/apache/cassandra/blame/trunk/bin/cassandra#L211-216
+        
+        String version = "2.0.9";
+        String majorMinorVersion = "2.0";
+        
+        cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+                .configure(CassandraNode.SUGGESTED_VERSION, version)
+                .configure(CassandraNode.NUM_TOKENS_PER_NODE, 256)
+                .configure("jmxPort", "11099+")
+                .configure("rmiRegistryPort", "19001+"));
+        app.start(ImmutableList.of(testLocation));
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+        Entities.dumpInfo(app);
+
+        AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+        astyanax.astyanaxTest();
+
+        assertEquals(cassandra.getMajorMinorVersion(), majorMinorVersion);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertNotNull(cassandra.getAttribute(CassandraNode.TOKEN));
+                assertNotNull(cassandra.getAttribute(CassandraNode.TOKENS));
+                assertEquals(cassandra.getAttribute(CassandraNode.TOKENS).size(), 256, "tokens="+cassandra.getAttribute(CassandraNode.TOKENS));
+                
+                assertEquals(cassandra.getAttribute(CassandraNode.PEERS), (Integer)256);
+                assertEquals(cassandra.getAttribute(CassandraNode.LIVE_NODE_COUNT), (Integer)1);
+        
+                assertTrue(cassandra.getAttribute(CassandraNode.SERVICE_UP_JMX));
+                assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY));
+        
+                assertNotNull(cassandra.getAttribute(CassandraNode.READ_PENDING));
+                assertNotNull(cassandra.getAttribute(CassandraNode.READ_ACTIVE));
+                EntityTestUtils.assertAttribute(cassandra, CassandraNode.READ_COMPLETED, MathPredicates.greaterThanOrEqual(1));
+                assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_PENDING));
+                assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_ACTIVE));
+                EntityTestUtils.assertAttribute(cassandra, CassandraNode.WRITE_COMPLETED, MathPredicates.greaterThanOrEqual(1));
+                
+                assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_LAST));
+                assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_LAST));
+        
+                assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW));
+                assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_IN_WINDOW));
+                assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_IN_WINDOW));
+                
+                // an example MXBean
+                EntityTestUtils.assertAttribute(cassandra, CassandraNode.MAX_HEAP_MEMORY, MathPredicates.greaterThanOrEqual(1));
+            }});
+
+        cassandra.stop();
+
+        EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false);
+    }
+}



Mime
View raw message