metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject metron git commit: METRON-1249 Improve Metron MPack Service Checks (nickwallen) closes apache/metron#799
Date Mon, 16 Oct 2017 21:53:54 GMT
Repository: metron
Updated Branches:
  refs/heads/master 4c908b95b -> fef8833c1


METRON-1249 Improve Metron MPack Service Checks (nickwallen) closes apache/metron#799


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fef8833c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fef8833c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fef8833c

Branch: refs/heads/master
Commit: fef8833c153fabef597084f4aace8303d9f7116e
Parents: 4c908b9
Author: nickwallen <nick@nickallen.org>
Authored: Mon Oct 16 17:53:20 2017 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Mon Oct 16 17:53:20 2017 -0400

----------------------------------------------------------------------
 .../package/scripts/enrichment_commands.py      |  62 +++-
 .../package/scripts/indexing_commands.py        |  41 ++-
 .../package/scripts/management_ui_commands.py   |  26 ++
 .../package/scripts/management_ui_master.py     |  12 +-
 .../CURRENT/package/scripts/metron_service.py   | 281 ++++++++++++++++---
 .../CURRENT/package/scripts/parser_commands.py  |  66 +++--
 .../package/scripts/profiler_commands.py        |  44 ++-
 .../CURRENT/package/scripts/rest_commands.py    |  29 +-
 .../CURRENT/package/scripts/service_check.py    |  49 +++-
 9 files changed, 520 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 794b6a5..90a690e 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -18,6 +18,7 @@ limitations under the License.
 import os
 import time
 from datetime import datetime
+from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
@@ -47,6 +48,12 @@ class EnrichmentCommands:
         self.__hbase_acl_configured = os.path.isfile(self.__params.enrichment_hbase_acl_configured_flag_file)
         self.__geo_configured = os.path.isfile(self.__params.enrichment_geo_configured_flag_file)
 
+    def __get_topics(self):
+        return [self.__enrichment_topic, self.__params.enrichment_error_topic]
+
+    def __get_kafka_acl_groups(self):
+        return [self.__enrichment_topic]
+
     def is_kafka_configured(self):
         return self.__kafka_configured
 
@@ -105,15 +112,15 @@ class EnrichmentCommands:
     def init_kafka_topics(self):
         Logger.info('Creating Kafka topics for enrichment')
         # All errors go to indexing topics, so create it here if it's not already
-        metron_service.init_kafka_topics(self.__params, [self.__enrichment_topic, self.__params.enrichment_error_topic])
+        metron_service.init_kafka_topics(self.__params, self.__get_topics())
         self.set_kafka_configured()
 
     def init_kafka_acls(self):
         Logger.info('Creating Kafka ACls for enrichment')
+        metron_service.init_kafka_acls(self.__params, self.__get_topics())
+
         # Enrichment topic names matches group
-        metron_service.init_kafka_acls(self.__params,
-                                       [self.__enrichment_topic, self.__params.enrichment_error_topic],
-                                       [self.__enrichment_topic])
+        metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
 
         self.set_kafka_acl_configured()
 
@@ -182,6 +189,7 @@ class EnrichmentCommands:
                   self.__params.hbase_keytab_path,
                   self.__params.hbase_principal_name,
                   execute_user=self.__params.hbase_user)
+
         cmd = "echo \"create '{0}','{1}'\" | hbase shell -n"
         add_enrichment_cmd = cmd.format(self.__params.enrichment_hbase_table, self.__params.enrichment_hbase_cf)
         Execute(add_enrichment_cmd,
@@ -211,6 +219,7 @@ class EnrichmentCommands:
                   self.__params.hbase_keytab_path,
                   self.__params.hbase_principal_name,
                   execute_user=self.__params.hbase_user)
+
         cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
         add_enrichment_acl_cmd = cmd.format(self.__params.metron_user, self.__params.enrichment_hbase_table)
         Execute(add_enrichment_acl_cmd,
@@ -232,3 +241,48 @@ class EnrichmentCommands:
 
         Logger.info("Done setting HBase ACLs")
         self.set_hbase_acl_configured()
+
+    def service_check(self, env):
+        """
+        Performs a service check for Enrichment.
+        :param env: Environment
+        """
+        Logger.info("Checking for Geo database")
+        metron_service.check_hdfs_file_exists(self.__params, self.__params.geoip_hdfs_dir
+ "/GeoLite2-City.mmdb.gz")
+
+        Logger.info('Checking Kafka topics for Enrichment')
+        metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+        Logger.info("Checking HBase for Enrichment")
+        metron_service.check_hbase_table(
+          self.__params,
+          self.__params.enrichment_hbase_table)
+        metron_service.check_hbase_column_family(
+          self.__params,
+          self.__params.enrichment_hbase_table,
+          self.__params.enrichment_hbase_cf)
+
+        Logger.info("Checking HBase for Threat Intel")
+        metron_service.check_hbase_table(
+          self.__params,
+          self.__params.threatintel_hbase_table)
+        metron_service.check_hbase_column_family(
+          self.__params,
+          self.__params.threatintel_hbase_table,
+          self.__params.threatintel_hbase_cf)
+
+        if self.__params.security_enabled:
+
+          Logger.info('Checking Kafka ACLs for Enrichment')
+          metron_service.check_kafka_acls(self.__params, self.__get_topics())
+          metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+          Logger.info("Checking HBase ACLs for Enrichment")
+          metron_service.check_hbase_acls(self.__params, self.__params.enrichment_hbase_table)
+          metron_service.check_hbase_acls(self.__params, self.__params.threatintel_hbase_table)
+
+        Logger.info("Checking for Enrichment topology")
+        if not self.is_topology_active(env):
+            raise Fail("Enrichment topology not running")
+
+        Logger.info("Enrichment service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index 50457d0..17374eb 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -19,6 +19,7 @@ import os
 import time
 
 from datetime import datetime
+from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
@@ -49,6 +50,13 @@ class IndexingCommands:
         self.__hbase_acl_configured = os.path.isfile(self.__params.indexing_hbase_acl_configured_flag_file)
         self.__hdfs_perm_configured = os.path.isfile(self.__params.indexing_hdfs_perm_configured_flag_file)
 
+    def __get_topics(self):
+        return [self.__indexing_topic]
+
+    def __get_kafka_acl_groups(self):
+        # Indexed topic names matches the group
+        return [self.__indexing_topic]
+
     def is_configured(self):
         return self.__configured
 
@@ -121,12 +129,12 @@ class IndexingCommands:
 
     def init_kafka_topics(self):
         Logger.info('Creating Kafka topics for indexing')
-        metron_service.init_kafka_topics(self.__params, [self.__indexing_topic])
+        metron_service.init_kafka_topics(self.__params, self.__get_topics())
 
     def init_kafka_acls(self):
         Logger.info('Creating Kafka ACLs for indexing')
-        # Indexed topic names matches the group
-        metron_service.init_kafka_acls(self.__params, [self.__indexing_topic], [self.__indexing_topic])
+        metron_service.init_kafka_acls(self.__params, self.__get_topics())
+        metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
 
     def init_hdfs_dir(self):
         Logger.info('Setting up HDFS indexing directory')
@@ -213,3 +221,30 @@ class IndexingCommands:
             is_running = topologies[self.__indexing_topology] in ['ACTIVE', 'REBALANCING']
         active &= is_running
         return active
+
+    def service_check(self, env):
+        """
+        Performs a service check for Indexing.
+        :param env: Environment
+        """
+        Logger.info('Checking Kafka topics for Indexing')
+        metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+        Logger.info("Checking HBase for Indexing")
+        metron_service.check_hbase_table(self.__params, self.__params.update_hbase_table)
+        metron_service.check_hbase_column_family(self.__params, self.__params.update_hbase_table,
self.__params.update_hbase_cf)
+
+        if self.__params.security_enabled:
+
+            Logger.info('Checking Kafka ACLs for Indexing')
+            metron_service.check_kafka_acls(self.__params, self.__get_topics())
+            metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+            Logger.info("Checking HBase ACLs for Indexing")
+            metron_service.check_hbase_acls(self.__params, self.__params.update_hbase_table)
+
+        Logger.info("Checking for Indexing topology")
+        if not self.is_topology_active(env):
+            raise Fail("Indexing topology not running")
+
+        Logger.info("Indexing service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
index de67f64..7427046 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py
@@ -20,6 +20,9 @@ limitations under the License.
 
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.core.exceptions import ExecutionFailed
+from resource_management.libraries.functions.get_user_call_output import get_user_call_output
 
 # Wrap major operations and functionality in this class
 class ManagementUICommands:
@@ -44,3 +47,26 @@ class ManagementUICommands:
         Logger.info('Restarting the Management UI')
         Execute('service metron-management-ui restart')
         Logger.info('Done restarting the Management UI')
+
+    def check_status(self, env):
+        Logger.info('Status check the Management UI')
+        cmd = "curl --max-time 3 {0}:{1}"
+        try:
+          Execute(
+            cmd.format(self.__params.hostname, self.__params.metron_management_ui_port),
+            tries=3,
+            try_sleep=5,
+            logoutput=False,
+            user=self.__params.metron_user)
+        except:
+          raise ComponentIsNotRunning()
+
+    def service_check(self, env):
+        """
+        Performs a service check for the Management UI
+        :param env: Environment
+        """
+        from params import status_params
+        env.set_params(status_params)
+        self.check_status(env)
+        Logger.info("Management UI service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
index 54e91aa..15bcd94 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py
@@ -17,16 +17,12 @@ limitations under the License.
 
 """
 
-from resource_management.core.exceptions import ComponentIsNotRunning
-from resource_management.core.exceptions import ExecutionFailed
 from resource_management.core.resources.system import Directory
 from resource_management.core.resources.system import File
 from resource_management.core.source import Template
 from resource_management.libraries.functions.format import format
-from resource_management.libraries.functions.get_user_call_output import get_user_call_output
 from resource_management.libraries.script import Script
 from resource_management.core.resources.system import Execute
-
 from resource_management.core.logger import Logger
 
 from management_ui_commands import ManagementUICommands
@@ -40,7 +36,6 @@ class ManagementUIMaster(Script):
         self.install_packages(env)
 
     def configure(self, env, upgrade_type=None, config_dir=None):
-        print 'configure managment_ui'
         from params import params
         env.set_params(params)
         File(format("/etc/default/metron"),
@@ -77,11 +72,8 @@ class ManagementUIMaster(Script):
     def status(self, env):
         from params import status_params
         env.set_params(status_params)
-        cmd = format('curl --max-time 3 {hostname}:{metron_management_ui_port}')
-        try:
-            get_user_call_output(cmd, user=status_params.metron_user)
-        except ExecutionFailed:
-            raise ComponentIsNotRunning()
+        commands = ManagementUICommands(status_params)
+        commands.check_status(env)
 
     def restart(self, env):
         from params import params

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 84dc805..2ae0b08 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -20,12 +20,12 @@ import subprocess
 
 from datetime import datetime
 from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
 from resource_management.core.resources.system import Directory, File
 from resource_management.core.resources.system import Execute
 from resource_management.core.source import InlineTemplate
 from resource_management.libraries.functions import format as ambari_format
-from resource_management.libraries.functions.get_user_call_output import \
-  get_user_call_output
+from resource_management.libraries.functions.get_user_call_output import get_user_call_output
 from metron_security import kinit
 
 
@@ -134,7 +134,6 @@ def refresh_configs(params):
 
 def get_running_topologies(params):
   Logger.info('Getting Running Storm Topologies from Storm REST Server')
-
   Logger.info('Security enabled? ' + str(params.security_enabled))
 
   # Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's
Kerberos tickets.
@@ -201,36 +200,246 @@ def init_kafka_topics(params, topics):
             user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
   Logger.info("Done creating Kafka topics")
 
-def init_kafka_acls(params, topics, groups):
-  Logger.info('Creating Kafka ACLs')
-
-  acl_template = """{0}/kafka-acls.sh \
-                                  --authorizer kafka.security.auth.SimpleAclAuthorizer \
-                                  --authorizer-properties zookeeper.connect={1} \
-                                  --add \
-                                  --allow-principal User:{2} \
-                                  --topic {3}"""
-
-  for topic in topics:
-    Logger.info("Creating ACL for topic '{0}'".format(topic))
-    Execute(acl_template.format(params.kafka_bin_dir,
-                                params.zookeeper_quorum,
-                                params.metron_user,
-                                topic),
-            user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
-
-  acl_template = """{0}/kafka-acls.sh \
-                                  --authorizer kafka.security.auth.SimpleAclAuthorizer \
-                                  --authorizer-properties zookeeper.connect={1} \
-                                  --add \
-                                  --allow-principal User:{2} \
-                                  --group {3}"""
-
-  for group in groups:
-    Logger.info("Creating ACL for group '{0}'".format(group))
-    Execute(acl_template.format(params.kafka_bin_dir,
-                                params.zookeeper_quorum,
-                                params.metron_user,
-                                group),
-            user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
-  Logger.info("Done creating Kafka ACLs")
+def init_kafka_acls(params, topics):
+    Logger.info('Creating Kafka topic ACLs')
+    acl_template = """{0}/kafka-acls.sh \
+    --authorizer kafka.security.auth.SimpleAclAuthorizer \
+    --authorizer-properties zookeeper.connect={1} \
+    --add \
+    --allow-principal User:{2} \
+    --topic {3}"""
+
+    for topic in topics:
+        Logger.info("Creating ACL for topic '{0}'".format(topic))
+        Execute(acl_template.format(params.kafka_bin_dir,
+                                    params.zookeeper_quorum,
+                                    params.metron_user,
+                                    topic),
+                user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
+
+def init_kafka_acl_groups(params, groups):
+    Logger.info('Creating Kafka group ACLs')
+    acl_template = """{0}/kafka-acls.sh \
+    --authorizer kafka.security.auth.SimpleAclAuthorizer \
+    --authorizer-properties zookeeper.connect={1} \
+    --add \
+    --allow-principal User:{2} \
+    --group {3}"""
+
+    for group in groups:
+        Logger.info("Creating ACL for group '{0}'".format(group))
+        Execute(acl_template.format(params.kafka_bin_dir,
+                                    params.zookeeper_quorum,
+                                    params.metron_user,
+                                    group),
+                user=params.kafka_user, tries=3, try_sleep=5, logoutput=True)
+
+def execute(cmd, user, err_msg=None, tries=3, try_sleep=5, logoutput=True, path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'):
+    """
+    Executes a command and raises an appropriate error message if the command
+    fails.
+    :param cmd: The command to execute.
+    :param user: The user to execute the command as.
+    :param err_msg: The error message to display if the command fails.
+    :param tries: The number of attempts to execute the command.
+    :param try_sleep: The time between attempts.
+    :param logoutput: If true, log the command output.
+    :param path: The path use when running the command.
+    :return:
+    """
+    try:
+        Execute(cmd, tries=tries, try_sleep=try_sleep, logoutput=logoutput, user=user, path=path)
+    except:
+        if err_msg is None:
+            err_msg = "Execution failed: cmd={0}, user={1}".format(cmd, user)
+        raise Fail(err_msg)
+
+def check_kafka_topics(params, topics):
+    """
+    Validates that the Kafka topics exist.  An exception is raised if any of the
+    topics do not exist.
+    :param params:
+    :param topics: A list of topic names.
+    """
+
+    # if needed kinit as 'metron'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.metron_keytab_path,
+              params.metron_principal_name,
+              execute_user=params.metron_user)
+
+    template = """{0}/kafka-topics.sh \
+      --zookeeper {1} \
+      --list | \
+      awk 'BEGIN {{cnt=0;}} /{2}/ {{cnt++}} END {{if (cnt > 0) {{exit 0}} else {{exit
1}}}}'"""
+
+    for topic in topics:
+        Logger.info("Checking existence of Kafka topic '{0}'".format(topic))
+        cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic)
+        err_msg = "Missing Kafka topic; topic={0}".format(topic)
+        execute(cmd, user=params.kafka_user, err_msg=err_msg)
+
+def check_hbase_table(params, table):
+    """
+    Validates that an HBase table exists.  An exception is raised if the table
+    does not exist.
+    :param params:
+    :param table: The name of the HBase table.
+    """
+    Logger.info("Checking HBase table '{0}'".format(table))
+
+    # if needed kinit as 'hbase'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.hbase_keytab_path,
+              params.hbase_principal_name,
+              execute_user=params.hbase_user)
+
+    template = "echo \"exists '{0}'\" | hbase shell -n | grep 'Table {1} does exist'"
+    cmd = template.format(table, table)
+    err_msg = "Missing HBase table; table={0}".format(table)
+    execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hbase_column_family(params, table, column_family):
+    """
+    Validates that an HBase column family exists.  An exception is raised if the
+    column family does not exist.
+    :param params:
+    :param table: The name of the HBase table.
+    :param column_family: The name of the HBase column family.
+    """
+    Logger.info("Checking column family '{0}:{1}'".format(table, column_family))
+
+    # if needed kinit as 'hbase'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.hbase_keytab_path,
+              params.hbase_principal_name,
+              execute_user=params.hbase_user)
+
+    template = "echo \"desc '{0}'\" | hbase shell -n | grep \"NAME => '{1}'\""
+    cmd = template.format(table, column_family)
+    err_msg = "Missing HBase column family; table={0}, cf={1}".format(table, column_family)
+    execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hbase_acls(params, table, user=None, permissions="READ,WRITE"):
+    """
+    Validates that HBase table permissions exist for a user. An exception is
+    raised if the permissions do not exist.
+    :param params:
+    :param table: The name of the HBase table.
+    :param user: The name of the user.
+    :param permissions: The permissions that should exist.
+    """
+    if user is None:
+        user = params.metron_user
+    Logger.info("Checking HBase ACLs; table={0}, user={1}, permissions={2}".format(table,
user, permissions))
+
+    # if needed kinit as 'hbase'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.hbase_keytab_path,
+              params.hbase_principal_name,
+              execute_user=params.hbase_user)
+
+
+
+    template = """echo "user_permission '{0}'" | \
+      hbase shell -n | \
+      grep " {1} " | \
+      grep "actions={2}"
+    """
+    cmd = template.format(table, user, permissions)
+    err_msg = "Missing HBase access; table={0}, user={1}, permissions={2}".format(table,
user, permissions)
+    execute(cmd, user=params.hbase_user, err_msg=err_msg)
+
+def check_hdfs_dir_exists(params, path, user=None):
+    """
+    Validate that a directory exists in HDFS.
+    :param params:
+    :param path: The directory path in HDFS.
+    :param user: The user to execute the check under.
+    """
+    if user is None:
+        user = params.metron_user
+    Logger.info("Checking HDFS; directory={0} user={1}".format(path, user))
+
+    # if needed kinit as 'metron'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.metron_keytab_path,
+              params.metron_principal_name,
+              execute_user=params.metron_user)
+
+    template = "{0}/hdfs dfs -test -d {1}"
+    cmd = template.format(params.hadoop_bin_dir, path)
+    err_msg = "Missing directory in HDFS: directory={0} user={1}".format(path, user)
+    execute(cmd, user=params.metron_user, err_msg=err_msg)
+
+def check_hdfs_file_exists(params, path, user=None):
+    """
+    Validate that a file exists in HDFS.
+    :param params:
+    :param path: The file path in HDFS.
+    :param user: The user to execute the check under.
+    """
+    if user is None:
+        user = params.metron_user
+    Logger.info("Checking HDFS; file={0}, user={1}".format(path, user))
+
+    # if needed kinit as 'metron'
+    if params.security_enabled:
+        kinit(params.kinit_path_local,
+              params.metron_keytab_path,
+              params.metron_principal_name,
+              execute_user=params.metron_user)
+
+    template = "{0}/hdfs dfs -test -f {1}"
+    cmd = template.format(params.hadoop_bin_dir, path)
+    err_msg = "Missing file in HDFS; file={0}".format(path)
+    execute(cmd, user=user, err_msg=err_msg)
+
+def check_kafka_acls(params, topics, user=None):
+    """
+    Validate that permissions have been granted for a list of Kakfa topics.
+    :param params:
+    :param topics: A list of topic names.
+    :param user: The user whose access is checked.
+    """
+    if user is None:
+        user = params.metron_user
+
+    template = """{0}/kafka-acls.sh \
+        --authorizer kafka.security.auth.SimpleAclAuthorizer \
+        --authorizer-properties zookeeper.connect={1} \
+        --topic {2} \
+        --list | grep 'User:{3}'"""
+
+    for topic in topics:
+        Logger.info("Checking ACL; topic={0}, user={1}'".format(topic, user))
+        cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic, user)
+        err_msg = "Missing Kafka access; topic={0}, user={1}".format(topic, user)
+        execute(cmd, user=params.kafka_user, err_msg=err_msg)
+
+def check_kafka_acl_groups(params, groups, user=None):
+    """
+    Validate that Kafka group permissions have been granted.
+    :param params:
+    :param groups: A list of group name.
+    :param user: The user whose access is checked.
+    """
+    if user is None:
+        user = params.metron_user
+
+    template = """{0}/kafka-acls.sh \
+        --authorizer kafka.security.auth.SimpleAclAuthorizer \
+        --authorizer-properties zookeeper.connect={1} \
+        --group {2} \
+        --list | grep 'User:{3}'"""
+
+    for group in groups:
+        Logger.info("Checking group ACL for topic '{0}'".format(group))
+        cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, group, user)
+        err_msg = "Missing Kafka group access; group={0}, user={1}".format(group, user)
+        execute(cmd, user=params.kafka_user, err_msg=err_msg)

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 9483498..274306a 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -24,6 +24,7 @@ import subprocess
 import time
 
 from datetime import datetime
+from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
@@ -50,6 +51,17 @@ class ParserCommands:
     def __get_parsers(self, params):
         return params.parsers.replace(' ', '').split(',')
 
+    def __get_topics(self):
+        # All errors go to indexing topics, so create it here if it's not already
+        # Getting topics this way is a bit awkward, but I don't want to append to actual
list, so copy it
+        topics = list(self.get_parser_list())
+        topics.append(self.__params.enrichment_error_topic)
+        return topics
+
+    def __get_kafka_acl_groups(self):
+        # Parser group is the parser name + '_parser'
+        return [parser + '_parser' for parser in self.get_parser_list()]
+
     def is_configured(self):
         return self.__configured
 
@@ -63,9 +75,13 @@ class ParserCommands:
         metron_service.set_configured(self.__params.metron_user, self.__params.parsers_acl_configured_flag_file,
"Setting Parsers ACL configured to true")
 
     def init_parsers(self):
-        Logger.info(
-            "Copying grok patterns from local directory '{0}' to HDFS '{1}'".format(self.__params.local_grok_patterns_dir,
-                                                                                    self.__params.hdfs_grok_patterns_dir))
+        self.init_grok_patterns()
+        Logger.info("Done initializing parser configuration")
+
+    def init_grok_patterns(self):
+        Logger.info("Copying grok patterns from local directory '{0}' to HDFS '{1}'"
+            .format(self.__params.local_grok_patterns_dir,
+                    self.__params.hdfs_grok_patterns_dir))
 
         self.__params.HdfsResource(self.__params.hdfs_grok_patterns_dir,
                                    type="directory",
@@ -75,29 +91,17 @@ class ParserCommands:
                                    source=self.__params.local_grok_patterns_dir,
                                    recursive_chown = True)
 
-        Logger.info("Done initializing parser configuration")
-
     def get_parser_list(self):
         return self.__parser_list
 
     def init_kafka_topics(self):
-        Logger.info('Creating Kafka topics for parsers')
-        # All errors go to indexing topics, so create it here if it's not already
-        # Getting topics this way is a bit awkward, but I don't want to append to actual
list, so copy it
-        topics = list(self.get_parser_list())
-        topics.append(self.__params.enrichment_error_topic)
-        metron_service.init_kafka_topics(self.__params, topics)
+        Logger.info('Creating Kafka topics for Parsers')
+        metron_service.init_kafka_topics(self.__params, self.__get_topics())
 
     def init_kafka_acls(self):
-        Logger.info('Creating Kafka ACLs for parsers')
-
-        # Getting topics this way is a bit awkward, but I don't want to modify the actual
list, so copy it
-        topics = list(self.get_parser_list())
-        topics.append(self.__params.enrichment_error_topic)
-        # Parser group is the parser name + '_parser'
-        metron_service.init_kafka_acls(self.__params,
-                                       topics,
-                                       [parser + '_parser' for parser in self.get_parser_list()])
+        Logger.info('Creating Kafka ACLs for Parsers')
+        metron_service.init_kafka_acls(self.__params, self.__get_topics())
+        metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
 
     def start_parser_topologies(self, env):
         Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list()))
@@ -212,3 +216,25 @@ class ParserCommands:
 
     def __is_running(self, status):
         return status in ['ACTIVE', 'REBALANCING']
+
+    def service_check(self, env):
+        """
+        Performs a service check for the Parsers.
+        :param env: Environment
+        """
+        Logger.info("Checking for grok patterns in HDFS for Parsers")
+        metron_service.check_hdfs_dir_exists(self.__params, self.__params.hdfs_grok_patterns_dir)
+
+        Logger.info('Checking Kafka topics for Parsers')
+        metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+        if self.__params.security_enabled:
+            Logger.info('Checking Kafka ACLs for Parsers')
+            metron_service.check_kafka_acls(self.__params, self.__get_topics())
+            metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+        Logger.info("Checking for Parser topologies")
+        if not self.topologies_running(env):
+            raise Fail("Parser topologies not running")
+
+        Logger.info("Parser service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
index 21c1225..41cab06 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py
@@ -19,6 +19,7 @@ import os
 import time
 
 from datetime import datetime
+from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute, File
 
@@ -47,6 +48,12 @@ class ProfilerCommands:
         self.__hbase_configured = os.path.isfile(self.__params.profiler_hbase_configured_flag_file)
         self.__hbase_acl_configured = os.path.isfile(self.__params.profiler_hbase_acl_configured_flag_file)
 
+    def __get_topics(self):
+        return [self.__profiler_topic]
+
+    def __get_kafka_acl_groups(self):
+        return ['profiler']
+
     def is_configured(self):
         return self.__configured
 
@@ -72,7 +79,7 @@ class ProfilerCommands:
         metron_service.set_configured(self.__params.metron_user, self.__params.profiler_hbase_acl_configured_flag_file,
"Setting HBase ACL configured to True for profiler")
 
     def create_hbase_tables(self):
-        Logger.info("Creating HBase Tables for profiler")
+        Logger.info("Creating HBase table '{0}' for profiler".format(self.__params.profiler_hbase_table))
         if self.__params.security_enabled:
             metron_security.kinit(self.__params.kinit_path_local,
                   self.__params.hbase_keytab_path,
@@ -88,12 +95,13 @@ class ProfilerCommands:
                 user=self.__params.hbase_user
                 )
 
-        Logger.info("Done creating HBase Tables for profiler")
         self.set_hbase_configured()
+        Logger.info("Done creating HBase Tables for profiler")
 
     def init_kafka_acls(self):
         Logger.info('Creating Kafka ACls for profiler')
-        metron_service.init_kafka_acls(self.__params, [self.__profiler_topic], ['profiler'])
+        metron_service.init_kafka_acls(self.__params, self.__get_topics())
+        metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
 
     def set_hbase_acls(self):
         Logger.info("Setting HBase ACLs for profiler")
@@ -102,6 +110,7 @@ class ProfilerCommands:
                   self.__params.hbase_keytab_path,
                   self.__params.hbase_principal_name,
                   execute_user=self.__params.hbase_user)
+                  
         cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n"
         add_table_acl_cmd = cmd.format(self.__params.metron_user, self.__params.profiler_hbase_table)
         Execute(add_table_acl_cmd,
@@ -112,8 +121,8 @@ class ProfilerCommands:
                 user=self.__params.hbase_user
                 )
 
-        Logger.info("Done setting HBase ACLs for profiler")
         self.set_hbase_acl_configured()
+        Logger.info("Done setting HBase ACLs for profiler")
 
     def start_profiler_topology(self, env):
         Logger.info('Starting ' + self.__profiler_topology)
@@ -182,3 +191,30 @@ class ProfilerCommands:
             is_running = topologies[self.__profiler_topology] in ['ACTIVE', 'REBALANCING']
         active &= is_running
         return active
+
+    def service_check(self, env):
+        """
+        Performs a service check for the Profiler.
+        :param env: Environment
+        """
+        Logger.info('Checking Kafka topics for Profiler')
+        metron_service.check_kafka_topics(self.__params, [self.__params.profiler_input_topic])
+
+        Logger.info("Checking HBase table for profiler")
+        metron_service.check_hbase_table(self.__params, self.__params.profiler_hbase_table)
+        metron_service.check_hbase_column_family(self.__params, self.__params.profiler_hbase_table,
self.__params.profiler_hbase_cf)
+
+        if self.__params.security_enabled:
+
+            Logger.info('Checking Kafka ACLs for Profiler')
+            metron_service.check_kafka_acls(self.__params, self.__get_topics())
+            metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
+
+            Logger.info('Checking Kafka ACLs for Profiler')
+            metron_service.check_hbase_acls(self.__params, self.__params.profiler_hbase_table)
+
+        Logger.info("Checking for Profiler topology")
+        if not self.is_topology_active(env):
+            raise Fail("Profiler topology not running")
+
+        Logger.info("Profiler service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
index 09d7106..542fc08 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
@@ -59,16 +59,22 @@ class RestCommands:
     def set_acl_configured(self):
         metron_service.set_configured(self.__params.metron_user, self.__params.rest_acl_configured_flag_file,
"Setting REST ACL configured to true")
 
+    def __get_topics(self):
+        return [self.__params.metron_escalation_topic]
+
     def init_kafka_topics(self):
         Logger.info('Creating Kafka topics for rest')
-        topics = [self.__params.metron_escalation_topic]
-        metron_service.init_kafka_topics(self.__params, topics)
+        metron_service.init_kafka_topics(self.__params, self.__get_topics())
 
     def init_kafka_acls(self):
         Logger.info('Creating Kafka ACLs for rest')
+
         # The following topics must be permissioned for the rest application list operation
-        topics = [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic,
self.__params.metron_escalation_topic]
-        metron_service.init_kafka_acls(self.__params, topics, ['metron-rest'])
+        topics = self.__get_topics() + [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic]
+        metron_service.init_kafka_acls(self.__params, topics)
+
+        groups = ['metron-rest']
+        metron_service.init_kafka_acl_groups(self.__params, groups)
 
     def start_rest_application(self):
         Logger.info('Starting REST application')
@@ -151,3 +157,18 @@ class RestCommands:
         self.stop_rest_application()
         self.start_rest_application()
         Logger.info('Done restarting the REST application')
+
+    def service_check(self, env):
+        """
+        Performs a service check for the Metron API.
+        :param env: Environment
+        """
+        Logger.info('Checking Kafka topics for the REST application')
+        metron_service.check_kafka_topics(self.__params, self.__get_topics())
+
+        if self.__params.security_enabled:
+
+            Logger.info('Checking Kafka topic ACL for the REST application')
+            metron_service.check_kafka_acls(self.__params, self.__get_topics())
+
+        Logger.info("REST application service check completed successfully")

http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
index 7dd9dfb..1aebecb 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py
@@ -19,22 +19,53 @@ limitations under the License.
 """
 from __future__ import print_function
 
+from resource_management.core.logger import Logger
 from resource_management.libraries.script import Script
 
-from indexing_commands import IndexingCommands
 from parser_commands import ParserCommands
-
+from enrichment_commands import EnrichmentCommands
+from indexing_commands import IndexingCommands
+from profiler_commands import ProfilerCommands
+from rest_commands import RestCommands
+from management_ui_commands import ManagementUICommands
 
 class ServiceCheck(Script):
+
     def service_check(self, env):
         from params import params
-        parsercommands = ParserCommands(params)
-        indexingcommands = IndexingCommands(params)
-        all_found = parsercommands.topologies_running(env) and indexingcommands.is_topology_active(env)
-        if all_found:
-            exit(0)
-        else:
-            exit(1)
+
+        # check the parsers
+        Logger.info("Performing Parser service check")
+        parser_cmds = ParserCommands(params)
+        parser_cmds.service_check(env)
+
+        # check enrichment
+        Logger.info("Performing Enrichment service check")
+        enrichment_cmds = EnrichmentCommands(params)
+        enrichment_cmds.service_check(env)
+
+        # check indexing
+        Logger.info("Performing Indexing service check")
+        indexing_cmds = IndexingCommands(params)
+        indexing_cmds.service_check(env)
+
+        # check the profiler
+        Logger.info("Performing Profiler service check")
+        profiler_cmds = ProfilerCommands(params)
+        profiler_cmds.service_check(env)
+
+        # check the rest api
+        Logger.info("Performing REST application service check")
+        rest_cmds = RestCommands(params)
+        rest_cmds.service_check(env)
+
+        # check the management UI
+        Logger.info("Performing Management UI service check")
+        mgmt_cmds = ManagementUICommands(params)
+        mgmt_cmds.service_check(env)
+
+        Logger.info("Metron service check completed successfully")
+        exit(0)
 
 
 if __name__ == "__main__":


Mime
View raw message