brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rich...@apache.org
Subject [01/10] incubator-brooklyn git commit: MySqlCluster - provide slave status as an attribute
Date Fri, 07 Aug 2015 17:53:34 GMT
Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master cc4ffb9d7 -> 83acab217


MySqlCluster - provide slave status as an attribute

Also tie it to service.notUp.indicators


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/51deebac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/51deebac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/51deebac

Branch: refs/heads/master
Commit: 51deebac6323404cc8148bb858da0f3fb0f2f872
Parents: bae9628
Author: Svetoslav Neykov <svetoslav.neykov@cloudsoftcorp.com>
Authored: Thu Jul 30 21:56:49 2015 +0300
Committer: Svetoslav Neykov <svetoslav.neykov@cloudsoftcorp.com>
Committed: Wed Aug 5 15:23:06 2015 +0300

----------------------------------------------------------------------
 .../entity/database/mysql/MySqlCluster.java     |  4 +
 .../entity/database/mysql/MySqlClusterImpl.java | 92 +++++++++++++++-----
 .../entity/database/mysql/MySqlRowParser.java   | 39 +++++++++
 3 files changed, 113 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
index 8b19ef7..322ada9 100644
--- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
@@ -39,6 +39,10 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
         AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor("mysql.master.log_file",
"The binary log file master is writing to");
         AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor("mysql.master.log_position",
"The position in the log file to start replication");
     }
+    interface MySqlSlave {
+        AttributeSensor<Boolean> SLAVE_HEALTHY = Sensors.newBooleanSensor("mysql.slave.healthy",
"Indicates that the replication state of the slave is healthy");
+        AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master",
"How many seconds behind master is the replication state on the slave");
+    }
 
     ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey(
             "mysql.slave.username", "The user name slaves will use to connect to the master",
"slave");

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
index 3eaa335..190a0d7 100644
--- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
@@ -19,16 +19,15 @@
 package brooklyn.entity.database.mysql;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
 
+import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
@@ -38,6 +37,7 @@ import com.google.common.reflect.TypeToken;
 import brooklyn.config.ConfigKey;
 import brooklyn.enricher.Enrichers;
 import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.basic.EntityLocal;
 import brooklyn.entity.basic.EntityPredicates;
@@ -49,12 +49,17 @@ import brooklyn.event.SensorEvent;
 import brooklyn.event.SensorEventListener;
 import brooklyn.event.basic.DependentConfiguration;
 import brooklyn.event.basic.Sensors;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
 import brooklyn.location.Location;
 import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.guava.Functionals;
 import brooklyn.util.guava.IfFunctions;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.TaskBuilder;
 import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.StringPredicates;
+import brooklyn.util.time.Duration;
 
 // https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html
 
@@ -68,7 +73,6 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
 
     private static final String MASTER_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_master.conf";
     private static final String SLAVE_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_slave.conf";
-    private static final String NOT_UP_REPLICATION = "replication_not_configured";
     private static final int MASTER_SERVER_ID = 1;
     private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID,
MASTER_SERVER_ID);
 
@@ -214,13 +218,64 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements
MySqlCluster
     @Override
     protected Entity createNode(Location loc, Map<?, ?> flags) {
         Entity node = super.createNode(loc, flags);
-        Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID);
-        if (serverId > 0) {
-            ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION,
"Replication not started");
+        if (!IS_MASTER.apply(node)) {
+            ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY,
"Replication not started");
+
+            FunctionFeed.builder()
+                .entity((EntityLocal)node)
+                .period(Duration.FIVE_SECONDS)
+                .poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY)
+                        .callable(new SlaveStateCallable(node))
+                        .checkSuccess(StringPredicates.isNonBlank())
+                        .onSuccess(new SlaveStateParser(node))
+                        .setOnFailure(false)
+                        .description("Polls SHOW SLAVE STATUS"))
+                .build();
+
+            node.addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+                    .from(MySqlSlave.SLAVE_HEALTHY)
+                    .computing(Functionals.ifNotEquals(true).value("Slave replication status
is not healthy") )
+                    .build());
         }
         return node;
     }
 
+    public class SlaveStateCallable implements Callable<String> {
+        private Entity slave;
+        public SlaveStateCallable(Entity slave) {
+            this.slave = slave;
+        }
+
+        @Override
+        public String call() throws Exception {
+            if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING)))
{
+                return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands",
"SHOW SLAVE STATUS \\G")).asTask().getUnchecked();
+            } else {
+                return null;
+            }
+        }
+
+    }
+
+    public class SlaveStateParser implements Function<String, Boolean> {
+        private Entity slave;
+
+        public SlaveStateParser(Entity slave) {
+            this.slave = slave;
+        }
+
+        @Override
+        public Boolean apply(String result) {
+            Map<String, String> status = MySqlRowParser.parseSingle(result);
+            String secondsBehindMaster = status.get("Seconds_Behind_Master");
+            if (secondsBehindMaster != null && !"NULL".equals(secondsBehindMaster))
{
+                ((EntityLocal)slave).setAttribute(MySqlSlave.SLAVE_SECONDS_BEHIND_MASTER,
new Integer(secondsBehindMaster));
+            }
+            return "Yes".equals(status.get("Slave_IO_Running")) && "Yes".equals(status.get("Slave_SQL_Running"));
+        }
+
+    }
+
     private static class NextServerIdSupplier implements Supplier<Integer> {
         private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1);
 
@@ -252,25 +307,18 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements
MySqlCluster
             } else if (serverId > MASTER_SERVER_ID) {
                 initSlave(node);
             }
-            ServiceNotUpLogic.clearNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION);

         }
 
         private void initMaster(MySqlNode master) {
             String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW
MASTER STATUS \\G UNLOCK TABLES;");
-            Iterator<String> splitIter = Splitter.on(Pattern.compile("\\n|:"))
-                    .omitEmptyStrings()
-                    .trimResults()
-                    .split(binLogInfo)
-                    .iterator();
-            while (splitIter.hasNext()) {
-                String part = splitIter.next();
-                if (part.equals("File")) {
-                    String file = splitIter.next();
-                    ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file);
-                } else if (part.equals("Position")) {
-                    Integer position = new Integer(splitIter.next());
-                    ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION,
position);
-                }
+            Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo);
+            String file = status.get("File");
+            if (file != null) {
+                ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file);
+            }
+            String position = status.get("Position");
+            if (position != null) {
+                ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, new
Integer(position));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java
b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java
new file mode 100644
index 0000000..cdd5149
--- /dev/null
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.database.mysql;
+
+import java.util.Map;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.text.Strings;
+
+public class MySqlRowParser {
+    public static Map<String, String> parseSingle(String row) {
+        Map<String, String> values = MutableMap.of();
+        String[] lines = row.split("\\n");
+        for (String line : lines) {
+            if (line.startsWith("*")) continue; // row delimiter
+            String[] arr = line.split(":", 2);
+            String key = arr[0].trim();
+            String value = Strings.emptyToNull(arr[1].trim());
+            values.put(key, value);
+        }
+        return values;
+    };
+}


Mime
View raw message