ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: AMBARI-13390 Unable to set user value for kafka-broker/kafka.metrics.reporters (dsen)
Date Tue, 13 Oct 2015 13:11:58 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 232829c50 -> 0da6327ac


AMBARI-13390 Unable to set user value for kafka-broker/kafka.metrics.reporters (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0da6327a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0da6327a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0da6327a

Branch: refs/heads/trunk
Commit: 0da6327ac2290dac608c86551cac485c59a7972d
Parents: 232829c
Author: Dmytro Sen <dsen@apache.org>
Authored: Tue Oct 13 16:11:43 2015 +0300
Committer: Dmytro Sen <dsen@apache.org>
Committed: Tue Oct 13 16:11:43 2015 +0300

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog213.java       | 38 ++++++++++++++
 .../0.8.1.2.2/configuration/kafka-broker.xml    |  2 +-
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    |  5 +-
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   | 10 ----
 .../server/upgrade/UpgradeCatalog213Test.java   | 53 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0da6327a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index c932d95..3eeb6b9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -40,6 +40,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -48,6 +49,7 @@ import java.util.UUID;
 public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
   private static final String STORM_SITE = "storm-site";
+  private static final String KAFKA_BROKER = "kafka-broker";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
   private static final String HBASE_ENV_CONFIG = "hbase-env";
@@ -118,6 +120,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     updateStormConfigs();
     updateAMSConfigs();
     updateHbaseEnvConfig();
+    updateKafkaConfigs();
   }
 
   /**
@@ -280,6 +283,41 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
   }
 
+  protected void updateKafkaConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = clusters.getClusters();
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+          Set<String> installedServices =cluster.getServices().keySet();
+          Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER);
+          if (kafkaBroker != null) {
+            Map<String, String> newProperties = new HashMap<>();
+            Map<String, String> kafkaBrokerProperties = kafkaBroker.getProperties();
+            String kafkaMetricsReporters = kafkaBrokerProperties.get("kafka.metrics.reporters");
+            if (kafkaMetricsReporters == null ||
+              "{{kafka_metrics_reporters}}".equals(kafkaMetricsReporters)) {
+
+              if (installedServices.contains("AMBARI_METRICS")) {
+                newProperties.put("kafka.metrics.reporters", "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter");
+              } else if (installedServices.contains("GANGLIA")) {
+                newProperties.put("kafka.metrics.reporters", "kafka.ganglia.KafkaGangliaMetricsReporter");
+              } else {
+                newProperties.put("kafka.metrics.reporters", " ");
+              }
+
+            }
+            if (!newProperties.isEmpty()) {
+              updateConfigurationPropertiesForCluster(cluster, KAFKA_BROKER, newProperties,
true, true);
+            }
+          }
+        }
+      }
+    }
+  }
+
   protected String updateAmsEnvContent(String oldContent) {
     if (oldContent == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/0da6327a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
index 1cbfade..6a98648 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
@@ -271,7 +271,7 @@
   </property>
   <property>
     <name>kafka.metrics.reporters</name>
-    <value>{{kafka_metrics_reporters}}</value>
+    <value>org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter</value>
     <description>
       kafka ganglia metrics reporter and kafka timeline metrics reporter
     </description>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0da6327a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 4d28c41..7f9b4ed 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -21,8 +21,7 @@ limitations under the License.
 from resource_management import *
 from resource_management.libraries.resources.properties_file import PropertiesFile
 from resource_management.libraries.resources.template_config import TemplateConfig
-import sys, os
-from copy import deepcopy
+import os
 
 def kafka():
     import params
@@ -53,8 +52,6 @@ def kafka():
     else:
         kafka_server_config['host.name'] = params.hostname
 
-
-    kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters
     if(params.has_metric_collector):
             kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
             kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port

http://git-wip-us.apache.org/repos/asf/ambari/blob/0da6327a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 0a55504..dc0c087 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -85,13 +85,9 @@ if 'ganglia_server_host' in config['clusterHostInfo'] and \
 else:
   ganglia_installed = False
 
-kafka_metrics_reporters=""
 metric_collector_host = ""
 metric_collector_port = ""
 
-if ganglia_installed:
-  kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter"
-
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
 
@@ -101,12 +97,6 @@ if has_metric_collector:
   if metric_collector_port and metric_collector_port.find(':') != -1:
     metric_collector_port = metric_collector_port.split(':')[1]
 
-  if not len(kafka_metrics_reporters) == 0:
-      kafka_metrics_reporters = kafka_metrics_reporters + ','
-
-  kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"
-
-
 # Security-related params
 security_enabled = config['configurations']['cluster-env']['security_enabled']
 kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker']
and

http://git-wip-us.apache.org/repos/asf/ambari/blob/0da6327a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index b94fd64..c945186 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -37,6 +37,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.EasyMockSupport;
@@ -94,6 +95,7 @@ public class UpgradeCatalog213Test {
   @Test
   public void testExecuteDMLUpdates() throws Exception {
     Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+    Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs");
     Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
     Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
     Method updateHbaseEnvConfig = UpgradeCatalog213.class.getDeclaredMethod("updateHbaseEnvConfig");
@@ -105,6 +107,7 @@ public class UpgradeCatalog213Test {
         .addMockedMethod(addNewConfigurationsFromXml)
         .addMockedMethod(updateHbaseEnvConfig)
         .addMockedMethod(updateAlertDefinitions)
+        .addMockedMethod(updateKafkaConfigs)
         .createMock();
 
     upgradeCatalog213.updateHbaseEnvConfig();
@@ -117,6 +120,8 @@ public class UpgradeCatalog213Test {
     expectLastCall().once();
     upgradeCatalog213.updateAlertDefinitions();
     expectLastCall().once();
+    upgradeCatalog213.updateKafkaConfigs();
+    expectLastCall().once();
 
     replay(upgradeCatalog213);
 
@@ -328,6 +333,54 @@ public class UpgradeCatalog213Test {
     Assert.assertEquals(expected, upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource));
   }
 
+  @Test
+  public void testUpdateKafkaConfigs() throws Exception {
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class);
+
+    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+    final Map<String, String> propertiesAmsEnv = new HashMap<String, String>()
{
+      {
+        put("kafka.metrics.reporters", "{{kafka_metrics_reporters}}");
+      }
+    };
+    final Map<String, Service> installedServices = new HashMap<String, Service>()
{
+      {
+        put("KAFKA", null);
+        put("AMBARI_METRICS", null);
+      }
+    };
+
+    final Config mockAmsEnv = easyMockSupport.createNiceMock(Config.class);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(ConfigHelper.class).toInstance(mockConfigHelper);
+        bind(Clusters.class).toInstance(mockClusters);
+
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+      }
+    });
+
+    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+    expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", mockClusterExpected);
+    }}).once();
+
+    expect(mockClusterExpected.getServices()).andReturn(installedServices).atLeastOnce();
+    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(mockAmsEnv).atLeastOnce();
+    expect(mockAmsEnv.getProperties()).andReturn(propertiesAmsEnv).atLeastOnce();
+
+    easyMockSupport.replayAll();
+    mockInjector.getInstance(UpgradeCatalog213.class).updateKafkaConfigs();
+    easyMockSupport.verifyAll();
+  }
+
   /**
    * @param dbAccessor
    * @return


Mime
View raw message