ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rle...@apache.org
Subject ambari git commit: AMBARI-17479. authorizer.class.name not being set on secure kafka clusters (rlevas)
Date Thu, 07 Jul 2016 16:01:18 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 629a893aa -> 2913de53c


AMBARI-17479. authorizer.class.name not being set on secure kafka clusters (rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2913de53
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2913de53
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2913de53

Branch: refs/heads/trunk
Commit: 2913de53c82524ae44270ce52b561f457f47a37c
Parents: 629a893
Author: Robert Levas <rlevas@hortonworks.com>
Authored: Thu Jul 7 12:01:05 2016 -0400
Committer: Robert Levas <rlevas@hortonworks.com>
Committed: Thu Jul 7 12:01:13 2016 -0400

----------------------------------------------------------------------
 .../stacks/HDP/2.0.6/services/stack_advisor.py  |  14 +++
 .../stacks/HDP/2.3/services/stack_advisor.py    | 118 ++++++++++++-------
 .../stacks/2.3/common/test_stack_advisor.py     |  17 ++-
 3 files changed, 106 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2913de53/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 8358438..21594d1 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -1649,6 +1649,20 @@ def getOldValue(self, services, configType, propertyName):
   return None
 
 # Validation helper methods
+def isSecurityEnabled(services):
+  """
+  Determines if security is enabled by testing the value of cluster-env/security enabled.
+
+  If the property exists and is equal to "true", then is it enabled; otherwise is it assumed
to be
+  disabled.
+
+  :param services: the services structure containing the current configurations
+  :return: true if security is enabled; otherwise false
+  """
+  return "cluster-env" in services["configurations"] \
+         and "security_enabled" in services["configurations"]["cluster-env"]["properties"]
\
+         and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower()
== "true"
+
 def getSiteProperties(configurations, siteName):
   siteConfig = configurations.get(siteName)
   if siteConfig is None:

http://git-wip-us.apache.org/repos/asf/ambari/blob/2913de53/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 879008b..858fe16 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -293,27 +293,85 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
       putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete',
'true')
 
   def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts):
+
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     kafka_broker = getServicesSiteProperties(services, "kafka-broker")
 
-    # kerberos security for kafka is decided from `security.inter.broker.protocol` property
value
-    security_enabled = (kafka_broker is not None and 'security.inter.broker.protocol' in
 kafka_broker
-                        and 'SASL' in kafka_broker['security.inter.broker.protocol'])
+    security_enabled = isSecurityEnabled(services)
+
     putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services)
     putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services)
     putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker")
 
+    if security_enabled:
+      kafka_env = getServicesSiteProperties(services, "kafka-env")
+      kafka_user = kafka_env.get('kafka_user') if kafka_env is not None else None
+
+      if kafka_user is not None:
+        kafka_super_users = kafka_broker.get('super.users') if kafka_broker is not None else
None
+
+        # kafka_super_super_users is expected to be formatted as:  User:user1;User:user2
+        if kafka_super_users is not None and kafka_super_users != '':
+          # Parse kafka_super_users to get a set of unique user names and rebuild the property
value
+          user_names = set()
+          user_names.add(kafka_user)
+          for match in re.findall('User:([^;]*)', kafka_super_users):
+            user_names.add(match)
+          kafka_super_users = 'User:' + ";User:".join(user_names)
+        else:
+          kafka_super_users = 'User:' + kafka_user
+
+        putKafkaBrokerProperty("super.users", kafka_super_users)
+
+      putKafkaBrokerProperty("principal.to.local.class", "kafka.security.auth.KerberosPrincipalToLocal")
+      putKafkaBrokerProperty("security.inter.broker.protocol", "PLAINTEXTSASL")
+      putKafkaBrokerProperty("zookeeper.set.acl", "true")
+
+    else:  # not security_enabled
+      # remove unneeded properties
+      putKafkaBrokerAttributes('super.users', 'delete', 'true')
+      putKafkaBrokerAttributes('principal.to.local.class', 'delete', 'true')
+      putKafkaBrokerAttributes('security.inter.broker.protocol', 'delete', 'true')
+
+    # Update ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled to match ranger-env/ranger-kafka-plugin-enabled
+    if "ranger-env" in services["configurations"] \
+      and "ranger-kafka-plugin-properties" in services["configurations"] \
+      and "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
+      putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties",
services)
+      ranger_kafka_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
+      putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", ranger_kafka_plugin_enabled)
+
+    # Determine if the Ranger/Kafka Plugin is enabled
+    ranger_plugin_enabled = "RANGER" in servicesList
+    # Only if the RANGER service is installed....
+    if ranger_plugin_enabled:
+      # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled,
+      # determine if the Ranger/Kafka plug-in enabled enabled or not
+      if 'ranger-kafka-plugin-properties' in configurations and \
+          'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']:
+        ranger_plugin_enabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower()
== 'yes'
+      # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled was not changed,
+      # determine if the Ranger/Kafka plug-in enabled enabled or not
+      elif 'ranger-kafka-plugin-properties' in services['configurations'] and \
+          'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']:
+        ranger_plugin_enabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower()
== 'yes'
+
+    # Determine the value for kafka-broker/authorizer.class.name
+    if ranger_plugin_enabled:
+      # If the Ranger plugin for Kafka is enabled, set authorizer.class.name to
+      # "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" whether
Kerberos is
+      # enabled or not.
+      putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
+    elif security_enabled:
+      putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer')
+    else:
+      putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
+
     #If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting.
Default is ''.
-    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     if "AMBARI_METRICS" in servicesList:
       putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter')
 
-    if "ranger-env" in services["configurations"] and "ranger-kafka-plugin-properties" in
services["configurations"] and \
-        "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
-      putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties",
services)
-      rangerEnvKafkaPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
-      putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", rangerEnvKafkaPluginProperty)
-
-    if 'ranger-kafka-plugin-properties' in services['configurations'] and ('ranger-kafka-plugin-enabled'
in services['configurations']['ranger-kafka-plugin-properties']['properties']):
+    if ranger_plugin_enabled:
       kafkaLog4jRangerLines = [{
         "name": "log4j.appender.rangerAppender",
         "value": "org.apache.log4j.DailyRollingFileAppender"
@@ -339,37 +397,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
           "value": "INFO, rangerAppender"
         }]
 
-      rangerPluginEnabled=''
-      if 'ranger-kafka-plugin-properties' in configurations and 'ranger-kafka-plugin-enabled'
in  configurations['ranger-kafka-plugin-properties']['properties']:
-        rangerPluginEnabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
-      elif 'ranger-kafka-plugin-properties' in services['configurations'] and 'ranger-kafka-plugin-enabled'
in services['configurations']['ranger-kafka-plugin-properties']['properties']:
-        rangerPluginEnabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
-
-      if  rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower():
-        # recommend authorizer.class.name
-        putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
-        # change kafka-log4j when ranger plugin is installed
-
-        if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']:
-          kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content']
-          for item in range(len(kafkaLog4jRangerLines)):
-            if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
-              kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"]
-          putKafkaLog4jProperty("content",kafkaLog4jContent)
-
-
-      else:
-        # Kerberized Cluster with Ranger plugin disabled
-        if security_enabled and 'kafka-broker' in services['configurations'] and 'authorizer.class.name'
in services['configurations']['kafka-broker']['properties'] and \
-          services['configurations']['kafka-broker']['properties']['authorizer.class.name']
== 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer':
-          putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer')
-        # Non-kerberos Cluster with Ranger plugin disabled
-        else:
-          putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
-
-    # Non-Kerberos Cluster without Ranger
-    elif not security_enabled:
-      putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
+      # change kafka-log4j when ranger plugin is installed
+      if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']:
+        kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content']
+        for item in range(len(kafkaLog4jRangerLines)):
+          if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
+            kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"]
+        putKafkaLog4jProperty("content",kafkaLog4jContent)
 
   def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts):
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]

http://git-wip-us.apache.org/repos/asf/ambari/blob/2913de53/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
index 2d98558..94ca579 100644
--- a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
@@ -294,6 +294,12 @@ class TestHDP23StackAdvisor(TestCase):
           },
           {
             "StackServices": {
+              "service_name": "RANGER",
+              "service_version": "0.5.0.2.3"
+            }
+          },
+          {
+            "StackServices": {
               "service_name": "AMBARI_METRICS"
             },
             "components": [{
@@ -318,6 +324,12 @@ class TestHDP23StackAdvisor(TestCase):
         "core-site": {
           "properties": { },
         },
+        "cluster-env": {
+          "properties": {
+            "security_enabled" : "true"
+          },
+          "property_attributes": {}
+        },
         "kafka-broker": {
           "properties": {
             "authorizer.class.name" : "kafka.security.auth.SimpleAclAuthorizer"
@@ -338,10 +350,12 @@ class TestHDP23StackAdvisor(TestCase):
     }
 
     # Test authorizer.class.name with Ranger Kafka plugin disabled in non-kerberos environment
+    services['configurations']['cluster-env']['properties']['security_enabled'] = "false"
     self.stackAdvisor.recommendKAFKAConfigurations(configurations, clusterData, services,
None)
     self.assertEquals(configurations['kafka-broker']['property_attributes']['authorizer.class.name'],
{'delete': 'true'}, "Test authorizer.class.name with Ranger Kafka plugin is disabled in non-kerberos
environment")
 
     # Test authorizer.class.name with Ranger Kafka plugin disabled in kerberos environment
+    services['configurations']['cluster-env']['properties']['security_enabled'] = "true"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
= 'PLAINTEXTSASL'
@@ -350,6 +364,7 @@ class TestHDP23StackAdvisor(TestCase):
     self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'],
'kafka.security.auth.SimpleAclAuthorizer' , "Test authorizer.class.name with Ranger Kafka
plugin disabled in kerberos environment")
 
     # Test authorizer.class.name with Ranger Kafka plugin enabled in non-kerberos environment
+    services['configurations']['cluster-env']['properties']['security_enabled'] = "false"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     del services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
@@ -358,7 +373,7 @@ class TestHDP23StackAdvisor(TestCase):
     self.stackAdvisor.recommendKAFKAConfigurations(configurations, clusterData, services,
None)
     self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'],
'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer', "Test authorizer.class.name
with Ranger Kafka plugin enabled in kerberos environment")
 
-    # Test authorizer.class.name with Ranger Kafka plugin enabled in kerberos environment
+    services['configurations']['cluster-env']['properties']['security_enabled'] = "false"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
= 'PLAINTEXTSASL'


Mime
View raw message