ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vbrodets...@apache.org
Subject ambari git commit: AMBARI-17929. Kafka brokers went down after Ambari upgrade due to IllegalArgumentException.(vbrodetskyi)
Date Wed, 27 Jul 2016 20:08:44 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk cb1109d69 -> d6b861716


AMBARI-17929. Kafka brokers went down after Ambari upgrade due to IllegalArgumentException.(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d6b86171
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d6b86171
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d6b86171

Branch: refs/heads/trunk
Commit: d6b8617167484d22ceccfc6f1eb71d0b246392f4
Parents: cb1109d
Author: Vitaly Brodetskyi <vbrodetskyi@hortonworks.com>
Authored: Wed Jul 27 23:08:03 2016 +0300
Committer: Vitaly Brodetskyi <vbrodetskyi@hortonworks.com>
Committed: Wed Jul 27 23:08:03 2016 +0300

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog240.java       | 23 +++++++
 .../server/upgrade/UpgradeCatalog240Test.java   | 70 ++++++++++++++++++--
 2 files changed, 88 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d6b86171/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index 5495655..a3d9c89 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -176,6 +176,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
   protected static final String SLIDER_SERVICE_NAME = "SLIDER";
 
   private static final String OOZIE_ENV_CONFIG = "oozie-env";
+  protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
   private static final String SLIDER_CLIENT_CONFIG = "slider-client";
   private static final String HIVE_ENV_CONFIG = "hive-env";
   private static final String AMS_SITE = "ams-site";
@@ -393,6 +394,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     addManageUserPersistedDataPermission();
     allowClusterOperatorToManageCredentials();
     updateHDFSConfigs();
+    updateKAFKAConfigs();
     updateHIVEConfigs();
     updateAMSConfigs();
     updateClusterEnv();
@@ -1900,6 +1902,27 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     }
   }
 
+  protected void updateKAFKAConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+    Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+
+    for (final Cluster cluster : clusterMap.values()) {
+      Set<String> installedServices = cluster.getServices().keySet();
+
+      if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS)
{
+        Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
+        if (kafkaBroker != null) {
+          String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
+          if (StringUtils.isNotEmpty(listenersPropertyValue)) {
+            String newListenersPropertyValue = listenersPropertyValue.replaceAll("PLAINTEXT",
"PLAINTEXTSASL");
+            updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners",
newListenersPropertyValue), true, false);
+          }
+        }
+      }
+    }
+  }
+
 
   protected void updateHIVEConfigs() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/d6b86171/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index 34ca199..5bbfebd 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -56,8 +56,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import javax.persistence.EntityManager;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -118,8 +116,12 @@ import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.springframework.security.crypto.password.PasswordEncoder;
 
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
@@ -130,8 +132,6 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.Provider;
 
-import org.springframework.security.crypto.password.PasswordEncoder;
-
 public class UpgradeCatalog240Test {
   private static final String CAPACITY_SCHEDULER_CONFIG_TYPE = "capacity-scheduler";
   private static final String WEBHCAT_SITE_CONFIG_TYPE = "webhcat-site";
@@ -584,6 +584,7 @@ public class UpgradeCatalog240Test {
     Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML");
     Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert");
     Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties");
+    Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs");
 
     Capture<String> capturedStatements = newCapture(CaptureType.ALL);
 
@@ -633,6 +634,7 @@ public class UpgradeCatalog240Test {
             .addMockedMethod(updateRecoveryConfigurationDML)
             .addMockedMethod(removeAtlasMetaserverAlert)
             .addMockedMethod(updateRangerHbasePluginProperties)
+            .addMockedMethod(updateKAFKAConfigs)
             .createMock();
 
     Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -674,6 +676,7 @@ public class UpgradeCatalog240Test {
     upgradeCatalog240.updateRecoveryConfigurationDML();
     upgradeCatalog240.removeAtlasMetaserverAlert();
     upgradeCatalog240.updateRangerHbasePluginProperties();
+    upgradeCatalog240.updateKAFKAConfigs();
 
     replay(upgradeCatalog240, dbAccessor);
 
@@ -1108,6 +1111,63 @@ public class UpgradeCatalog240Test {
     assertTrue(Maps.difference(newPropertiesYarnEnv, updatedProperties).areEqual());
   }
 
+  @Test
+  public void testUpdateKAFKAConfigs() throws Exception{
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+    final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
+    expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{
+      put("listeners", "PLAINTEXT://localhost:PLAINTEXT6667,PLAINTEXTSSL://localhost:6666PLAINTEXT");
+    }}
+    ).anyTimes();
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
+      }
+    });
+
+    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+    expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", mockClusterExpected);
+    }}).atLeastOnce();
+    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
+    expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS);
+    expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>()
{
+      {
+        put("KAFKA", null);
+      }
+    }).atLeastOnce();
+
+    UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class)
+            .withConstructor(Injector.class)
+            .withArgs(mockInjector)
+            .addMockedMethod("updateConfigurationProperties", String.class,
+                    Map.class, boolean.class, boolean.class)
+            .createMock();
+
+    Map<String, String> expectedUpdates = new HashMap<>();
+    expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:PLAINTEXTSASL6667,PLAINTEXTSASLSSL://localhost:6666PLAINTEXTSASL");
+
+    upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates,
+            true, false);
+    expectLastCall().once();
+
+    easyMockSupport.replayAll();
+    replay(upgradeCatalog240);
+    upgradeCatalog240.updateKAFKAConfigs();
+    easyMockSupport.verifyAll();
+  }
+
   /**
    * Test that queue names updated in mapred-site, webhcat-site, tez-site, yarn-env
    * @throws Exception


Mime
View raw message