brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [04/17] git commit: Fixed bucket creation
Date Mon, 01 Sep 2014 16:14:42 GMT
Fixed bucket creation


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7d0467d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7d0467d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7d0467d8

Branch: refs/heads/master
Commit: 7d0467d887629225b919d9fce93d5abf3c9a0b20
Parents: 4f70aa1
Author: Martin Harris <github@nakomis.com>
Authored: Wed Jun 25 16:44:11 2014 +0100
Committer: Martin Harris <github@nakomis.com>
Committed: Mon Aug 18 15:45:43 2014 +0100

----------------------------------------------------------------------
 .../nosql/couchbase/CouchbaseClusterImpl.java   |  47 +++++++-
 .../nosql/couchbase/CouchbaseNodeSshDriver.java | 116 +++++++++++++++++--
 .../couchbase/CouchbaseSyncGatewayImpl.java     |   5 +
 .../CouchbaseSyncGatewaySshDriver.java          |  34 ++++--
 4 files changed, 175 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
index b7c2026..f424eaf 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
@@ -48,23 +48,27 @@ import brooklyn.event.basic.DependentConfiguration;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.Location;
 import brooklyn.policy.PolicySpec;
 import brooklyn.util.collections.MutableSet;
+import brooklyn.util.guava.Functionals;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.TaskBuilder;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.text.Strings;
 import brooklyn.util.time.Time;
 
 import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 
 public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster
{
     private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class);
@@ -141,6 +145,8 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
 
         connectSensors();
         connectEnrichers();
+        
+        setAttribute(BUCKET_CREATION_IN_PROGRESS, false);
 
         //start timeout before adding the servers
         Time.sleep(getConfig(SERVICE_UP_TIME_OUT));
@@ -167,10 +173,12 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
                 } finally {
                     Tasks.resetBlockingDetails();
                 }
-                Entities.invokeEffector(this, getPrimaryNode(), CouchbaseNode.REBALANCE);
+                
+                ((CouchbaseNode)getPrimaryNode()).rebalance();
                 
                 if (Optional.fromNullable(CREATE_BUCKETS).isPresent()) {
                     createBuckets();
+                    DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS,
Predicates.equalTo(false));
                 }
 
                 setAttribute(IS_CLUSTER_INITIALIZED, true);
@@ -187,6 +195,9 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
 
     @Override
     public void stop() {
+        if (resetBucketCreation[0] != null) {
+            resetBucketCreation[0].stop();
+        }
         super.stop();
     }
 
@@ -205,7 +216,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
                 .displayName("Controller targets tracker")
                 .configure("group", this));
     }
-
+    
     public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
         @Override protected void onEntityChange(Entity member) {
             ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
@@ -377,17 +388,41 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
                             CouchbaseClusterImpl.this.resetBucketCreation[0].stop();
                         }
                         setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
-
+                        
                         CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder()
                                 .entity(CouchbaseClusterImpl.this)
                                 .period(500, TimeUnit.MILLISECONDS)
                                 .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL),
bucketName))
                                 .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME),
primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
                                 .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
-                                        .onSuccess(HttpValueFunctions.responseCodeEquals(404))
-                                        .onFailureOrException(Functions.constant(false)))
+                                        .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(),
JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
+                                            @Override public Boolean apply(JsonElement input)
{
+                                                // Wait until bucket has been created on
all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
+                                                JsonArray servers = input.getAsJsonArray();
+                                                if (servers.size() != CouchbaseClusterImpl.this.getMembers().size())
{
+                                                    return true;
+                                                }
+                                                for (JsonElement server : servers) {
+                                                    Object api = server.getAsJsonObject().get("couchApiBase");
+                                                    if (api == null || Strings.isEmpty(String.valueOf(api)))
{
+                                                        return true;
+                                                    }
+                                                }
+                                                return false;
+                                            }
+                                        }))
+                                        .onFailureOrException(new Function<Object, Boolean>()
{
+                                            @Override
+                                            public Boolean apply(Object input) {
+                                                if (((brooklyn.util.http.HttpToolResponse)
input).getResponseCode() == 404) {
+                                                    return true;
+                                                }
+                                                throw new IllegalStateException("Unexpected
response when creating bucket:" + input);
+                                            }
+                                        }))
                                 .build();
 
+                        // TODO: Bail out if bucket creation fails, to allow next bucket
to proceed
                         Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode,
CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
                         DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this,
CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
                         if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
index e02ddc9..6f81b73 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
@@ -24,19 +24,38 @@ import static brooklyn.util.ssh.BashCommands.chainGroup;
 import static brooklyn.util.ssh.BashCommands.sudo;
 import static java.lang.String.format;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
 
 import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.OsDetails;
 import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.http.HttpTool;
+import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 
 public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver
{
 
@@ -193,8 +212,83 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver
imp
                 .failOnNonZeroResultCode()
                 .execute();
         entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "Rebalance Started");
+        // wait until the re-balance is complete
+        Repeater.create()
+            .every(Duration.millis(500))
+            .limitTimeTo(Duration.THIRTY_SECONDS)
+            .until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames())
{
+                        if (isNodeRebalancing(nodeHostName)) {
+                            return true;
+                        }
+                    }
+                    return false;
+                }
+            })
+            .run();
+        Repeater.create()
+            .every(Duration.FIVE_SECONDS)
+            .limitTimeTo(Duration.FIVE_MINUTES)
+            .until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames())
{
+                        if (isNodeRebalancing(nodeHostName)) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+            })
+            .run();
+        log.info("rebalanced cluster via primary node {}", getEntity());
     }
 
+    private Iterable<String> getNodeHostNames() throws URISyntaxException {
+        Function<JsonElement, Iterable<String>> getNodesAsList = new Function<JsonElement,
Iterable<String>>() {
+            @Override public Iterable<String> apply(JsonElement input) {
+                if (input == null) {
+                    return Collections.emptyList();
+                }
+                Collection<String> names = Lists.newArrayList();
+                JsonArray nodes = input.getAsJsonArray();
+                for (JsonElement element : nodes) {
+                    // NOTE: the 'hostname' element also includes the port
+                    names.add(element.getAsJsonObject().get("hostname").toString().replace("\"",
""));
+                }
+                return names;
+            }
+        };
+        HttpToolResponse nodesResponse = getAPIResponse(String.format("http://%s:%s/pools/nodes",
getHostname(), getWebPort()));
+        return Functionals.chain(
+            HttpValueFunctions.jsonContents(),
+            JsonFunctions.walkN("nodes"),
+            getNodesAsList
+        ).apply(nodesResponse);
+    }
+    
+    private boolean isNodeRebalancing(String nodeHostName) throws URISyntaxException {
+        HttpToolResponse response = getAPIResponse("http://" + nodeHostName + "/pools/nodes/rebalanceProgress");
+        if (response.getResponseCode() != 200) {
+            throw new IllegalStateException("failed to rebalance cluster: " + response);
+        }
+        return !HttpValueFunctions.jsonContents("status", String.class).apply(response).equals("none");
+    }
+    
+    private HttpToolResponse getAPIResponse(String path) throws URISyntaxException {
+        URI uri = new URI(path);
+        Credentials credentials = new UsernamePasswordCredentials(getUsername(), getPassword());
+        return HttpTool.httpGet(HttpTool.httpClientBuilder()
+                // the uri is required by the HttpClientBuilder in order to set the AuthScope
of the credentials
+                .uri(uri)
+                .credentials(credentials)
+                .build(), 
+            uri, 
+            ImmutableMap.<String, String>of());
+    }
+    
     @Override
     public void serverAdd(String serverToAdd, String username, String password) {
         newScript("serverAdd").body.append(couchbaseCli("server-add")
@@ -219,15 +313,15 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver
imp
     }
 
     @Override
-        public void bucketCreate(String bucketName, String bucketType, Integer bucketPort,
Integer bucketRamSize, Integer bucketReplica) {
-            newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
-                + getCouchbaseHostnameAndCredentials() +
-                " --bucket=" + bucketName +
-                " --bucket-type=" + bucketType +
-                " --bucket-port=" + bucketPort +
-                " --bucket-ramsize=" + bucketRamSize +
-                " --bucket-replica=" + bucketReplica)
-                .failOnNonZeroResultCode()
-                .execute();
-        }
+    public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer
bucketRamSize, Integer bucketReplica) {
+        newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
+            + getCouchbaseHostnameAndCredentials() +
+            " --bucket=" + bucketName +
+            " --bucket-type=" + bucketType +
+            " --bucket-port=" + bucketPort +
+            " --bucket-ramsize=" + bucketRamSize +
+            " --bucket-replica=" + bucketReplica)
+            .failOnNonZeroResultCode()
+            .execute();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java
index 09ef569..0ead110 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java
@@ -1,6 +1,7 @@
 package brooklyn.entity.nosql.couchbase;
 
 
+import brooklyn.config.render.RendererHints;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
@@ -58,4 +59,8 @@ public class CouchbaseSyncGatewayImpl extends SoftwareProcessImpl implements
Cou
             httpFeed.stop();
         }
     }
+    
+    static {
+        RendererHints.register(MANAGEMENT_URL, new RendererHints.NamedActionWithUrl("Open"));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java
index c7c9129..7339c40 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java
@@ -12,18 +12,21 @@ import javax.annotation.Nullable;
 
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
-import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityLocal;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.event.basic.DependentConfiguration;
 import brooklyn.location.OsDetails;
 import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
 import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.time.Duration;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDriver implements
CouchbaseSyncGatewayDriver {
@@ -32,11 +35,6 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri
     }
 
     @Override
-    public boolean isRunning() {
-        return Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP));
-    }
-
-    @Override
     public void stop() {
 
     }
@@ -67,8 +65,13 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri
     public void launch() {
         Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER);
         Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR);
-
-
+        DependentConfiguration.waitInTaskForAttributeReady(cbNode, CouchbaseCluster.IS_CLUSTER_INITIALIZED,
Predicates.equalTo(true));
+        try {
+            // Even once the bucket has published its API URL, it can still take a couple
of seconds for it to become available
+            Thread.sleep(10 * 1000);
+        } catch (InterruptedException e) {
+            // no-op
+        }
         if (cbNode instanceof CouchbaseCluster) {
             Optional<Entity> cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS),
new Predicate<Entity>() {
 
@@ -106,10 +109,21 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri
         String options = format("-url %s -bucket %s -adminInterface 0.0.0.0:%s -interface
0.0.0.0:%s -pool %s %s %s",
                 serverWebAdminUrl, bucketName, adminRestApiPort, syncRestApiPort, pool, pretty,
verbose);
 
-        newScript(LAUNCHING)
-                .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s &",
options))
+        newScript(ImmutableMap.of("usePidFile", true), LAUNCHING)
+                .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s ", options)
+ "> out.log 2> err.log < /dev/null &")
+                .failOnNonZeroResultCode()
                 .execute();
     }
+    
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of("usePidFile", true), CHECK_RUNNING).execute() == 0;
+    }
+    
+    @Override
+    public void kill() {
+        newScript(MutableMap.of("usePidFile", true), KILLING).execute();
+    }
 
     private List<String> installLinux(List<String> urls, String saveAs) {
 


Mime
View raw message