ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject ambari git commit: AMBARI-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (Anita Jebaraj via jonathanhurley)
Date Mon, 22 Aug 2016 16:53:41 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 169dd4b63 -> 4b9a81684


AMBARI-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos
is enabled (Anita Jebaraj via jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4b9a8168
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4b9a8168
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4b9a8168

Branch: refs/heads/trunk
Commit: 4b9a816846d56dc027fd3492211ec9ec057c7890
Parents: 169dd4b
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Mon Aug 22 12:53:18 2016 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Mon Aug 22 12:53:18 2016 -0400

----------------------------------------------------------------------
 .../kerberos/VariableReplacementHelper.java     | 34 ++++++++++-
 .../server/upgrade/UpgradeCatalog240.java       | 25 ++++++++
 .../common-services/KAFKA/0.10.0/kerberos.json  |  3 +-
 .../KAFKA/0.8.1/package/scripts/kafka.py        | 15 ++---
 .../common-services/KAFKA/0.9.0/kerberos.json   |  3 +-
 .../stacks/HDP/2.3/services/stack_advisor.py    | 17 +++++-
 .../kerberos/VariableReplacementHelperTest.java |  8 ++-
 .../server/upgrade/UpgradeCatalog240Test.java   | 61 ++++++++++++++++++++
 8 files changed, 148 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
index 66be3bf..d472b79 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
@@ -20,7 +20,6 @@ package org.apache.ambari.server.state.kerberos;
 
 import com.google.inject.Singleton;
 import org.apache.ambari.server.AmbariException;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -49,6 +48,7 @@ public class VariableReplacementHelper {
     {
       put("each", new EachFunction());
       put("toLower", new ToLowerFunction());
+      put("replace", new ReplaceValue());
     }
   };
 
@@ -226,7 +226,37 @@ public class VariableReplacementHelper {
       return "";
     }
   }
-
+  /**
+   * ReplaceValue is a Function implementation that replaces the value in the string
+   * <p/>
+   * This function expects the following arguments (in order) within the args array:
+   * <ol>
+   * <li>regular expression that should be replaced</li>
+   * <li>replacement value for the string</li>
+   * </ol>
+   */ 
+  private static class ReplaceValue implements Function {
+    
+    @Override
+    public String perform(String[] args, String data) {
+      if ((args == null) || (args.length != 2)) {
+        throw new IllegalArgumentException("Invalid number of arguments encountered");
+      }
+      if (data != null) {
+        StringBuffer builder = new StringBuffer();
+        String regex = args[0];
+        String replacement = args[1];
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(data);
+        while(matcher.find()) {
+          matcher.appendReplacement(builder, replacement);
+        }
+        matcher.appendTail(builder);
+        return builder.toString();
+      }
+      return "";
+    }
+  }
   /**
    * ToLowerFunction is a Function implementation that converts a String to lowercase
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index 12553a5..5cd8685 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -191,6 +191,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
   protected static final String EXTENSION_ID_COLUMN = "extension_id";
   protected static final String EXTENSION_LINK_TABLE = "extensionlink";
   protected static final String EXTENSION_LINK_ID_COLUMN = "link_id";
+  protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
 
   private static final Map<String, Integer> ROLE_ORDER;
   private static final String AMS_HBASE_SITE = "ams-hbase-site";
@@ -390,6 +391,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     addManageUserPersistedDataPermission();
     allowClusterOperatorToManageCredentials();
     updateHDFSConfigs();
+    updateKAFKAConfigs();
     updateHIVEConfigs();
     updateAMSConfigs();
     updateClusterEnv();
@@ -1932,7 +1934,30 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
       }
     }
   }
+ 
+  protected void updateKAFKAConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+          Set<String> installedServices = cluster.getServices().keySet();
 
+          if (installedServices.contains("KAFKA") && cluster.getSecurityType() ==
SecurityType.KERBEROS) {
+            Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
+            if (kafkaBroker != null) {
+              String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
+              if (StringUtils.isNotEmpty(listenersPropertyValue)) {
+                String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b",
"PLAINTEXTSASL");
+                updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners",
newListenersPropertyValue), true, false);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 
   protected void updateHIVEConfigs() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json
index e1e6461..1f02092 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json
@@ -14,7 +14,8 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true"
+              "zookeeper.set.acl": "true",
+              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
           }
         },
         {

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
index 9066ab5..6cc85f4 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
@@ -80,21 +80,16 @@ def kafka(upgrade_type=None):
 
        listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
        Logger.info(format("Kafka listeners: {listeners}"))
+       kafka_server_config['listeners'] = listeners       
 
        if params.security_enabled and params.kafka_kerberos_enabled:
          Logger.info("Kafka kerberos security is enabled.")
-         if "SASL" not in listeners:
-           listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
-
-         kafka_server_config['listeners'] = listeners
          kafka_server_config['advertised.listeners'] = listeners
          Logger.info(format("Kafka advertised listeners: {listeners}"))
-       else:
-         kafka_server_config['listeners'] = listeners
-         if 'advertised.listeners' in kafka_server_config:
-           advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost",
params.hostname)
-           kafka_server_config['advertised.listeners'] = advertised_listeners
-           Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+       elif 'advertised.listeners' in kafka_server_config:
+         advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost",
params.hostname)
+         kafka_server_config['advertised.listeners'] = advertised_listeners
+         Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
     else:
       kafka_server_config['host.name'] = params.hostname
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
index 2b1c01b..ab1ed1f 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
@@ -14,7 +14,8 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true"
+              "zookeeper.set.acl": "true",
+              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
           }
         }
       ],

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 64e8e03..ee96cf8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -914,13 +914,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
   def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations,
services, hosts):
     kafka_broker = properties
     validationItems = []
-
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ 
     #Adding Ranger Plugin logic here
     ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties")
-    ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled']
+    ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties
else 'No'
     prop_name = 'authorizer.class.name'
     prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"
-    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
       if kafka_broker[prop_name] != prop_val:
         validationItems.append({"config-name": prop_name,
@@ -928,6 +928,17 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
                                 "If Ranger Kafka Plugin is enabled."\
                                 "{0} needs to be set to {1}".format(prop_name,prop_val))})
 
+    if 'KERBEROS' in servicesList and 'security.inter.broker.protocol' in properties:
+      interBrokerValue = properties['security.inter.broker.protocol']
+      prop_name = 'listeners'
+      prop_value =  properties[prop_name]
+      if interBrokerValue and interBrokerValue not in prop_value:
+        validationItems.append({"config-name": "listeners",
+                                "item": self.getWarnItem("If kerberos is enabled "\
+                                "{0}  need to contain {1} as one of "\
+                                "the protocol".format(prop_name, interBrokerValue))})
+
+
     return self.toConfigurationValidationProblems(validationItems, "kafka-broker")
 
   def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services,
hosts):

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
index ee2a671..8be0eb9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
@@ -158,6 +158,10 @@ public class VariableReplacementHelperTest {
           put("realm", "UNIT.TEST");
         }});
 
+        put("kafka-broker", new HashMap<String, String>() {{
+          put("listeners", "PLAINTEXT://localhost:6667");
+        }});
+        
         put("clusterHostInfo", new HashMap<String, String>() {{
           put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test");
// spaces are there on purpose.
         }});
@@ -171,6 +175,8 @@ public class VariableReplacementHelperTest {
         helper.replaceVariables("hive.metastore.local=false,hive.metastore.uris=${clusterHostInfo/hive_metastore_host
| each(thrift://%s:9083, \\\\,, \\s*\\,\\s*)},hive.metastore.sasl.enabled=true,hive.metastore.execute.setugi=true,hive.metastore.warehouse.dir=/apps/hive/warehouse,hive.exec.mode.local.auto=false,hive.metastore.kerberos.principal=hive/_HOST@${realm}",
configurations));
 
     Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}",
configurations));
+  
+    Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}",
configurations)); 
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index 854ce7d..099af7e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -585,6 +585,7 @@ public class UpgradeCatalog240Test {
     Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML");
     Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert");
     Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties");
+    Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs");
 
     Capture<String> capturedStatements = newCapture(CaptureType.ALL);
 
@@ -634,6 +635,7 @@ public class UpgradeCatalog240Test {
             .addMockedMethod(updateRecoveryConfigurationDML)
             .addMockedMethod(removeAtlasMetaserverAlert)
             .addMockedMethod(updateRangerHbasePluginProperties)
+            .addMockedMethod(updateKAFKAConfigs)
             .createMock();
 
     Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -675,6 +677,7 @@ public class UpgradeCatalog240Test {
     upgradeCatalog240.removeAtlasMetaserverAlert();
     upgradeCatalog240.updateRangerHbasePluginProperties();
     upgradeCatalog240.adjustHiveJobTimestamps();
+    upgradeCatalog240.updateKAFKAConfigs();
 
     replay(upgradeCatalog240, dbAccessor);
 
@@ -1160,6 +1163,64 @@ public class UpgradeCatalog240Test {
   }
 
   @Test
+  public void testUpdateKAFKAConfigs() throws Exception{
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+    final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
+    expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{
+      put("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
+    }}
+    ).anyTimes();
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
+      }
+    });
+
+    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+    expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", mockClusterExpected);
+    }}).atLeastOnce();
+    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
+    expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS);
+    expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>()
{
+      {
+        put("KAFKA", null);
+      }
+    }).atLeastOnce();
+
+    UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class)
+            .withConstructor(Injector.class)
+            .withArgs(mockInjector)
+            .addMockedMethod("updateConfigurationProperties", String.class,
+                    Map.class, boolean.class, boolean.class)
+            .createMock();
+
+    Map<String, String> expectedUpdates = new HashMap<>();
+    expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
+
+    upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates,
+            true, false);
+    expectLastCall().once();
+
+    easyMockSupport.replayAll();
+    replay(upgradeCatalog240);
+    upgradeCatalog240.updateKAFKAConfigs();
+    easyMockSupport.verifyAll();
+  }
+
+
+  @Test
   public void testSparkConfigUpdate() throws Exception{
 
     Map<String, String> oldPropertiesSparkDefaults = new HashMap<String, String>()
{


Mime
View raw message