ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject ambari git commit: AMBARI-13355. Journal node went in critical state on Ambari : message on UI : Connection failed: [Errno 111] Connection refused to 0.0.0.0:8480 (aonishuk)
Date Mon, 12 Oct 2015 14:48:19 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk e8dbb1327 -> 9b0c24687


AMBARI-13355. Journal node went in critical state on Ambari : message on UI :  Connection
failed: [Errno 111] Connection refused to 0.0.0.0:8480 (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9b0c2468
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9b0c2468
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9b0c2468

Branch: refs/heads/trunk
Commit: 9b0c24687fe47624e8284daf3609b1134e330323
Parents: e8dbb13
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Mon Oct 12 17:47:33 2015 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Mon Oct 12 17:48:05 2015 +0300

----------------------------------------------------------------------
 .../server/upgrade/AbstractUpgradeCatalog.java  |  14 ++
 .../server/upgrade/SchemaUpgradeHelper.java     |   3 +
 .../server/upgrade/UpgradeCatalog213.java       |  80 ++++++++++-
 .../server/utils/EventBusSynchronizer.java      | 139 +++++++++++++++++++
 .../common-services/HDFS/2.1.0.2.0/alerts.json  |  25 ++--
 .../server/upgrade/UpgradeCatalog213Test.java   |  25 +++-
 .../server/utils/EventBusSynchronizer.java      | 139 -------------------
 7 files changed, 274 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
index 62cd868..ed68313 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
@@ -341,6 +341,20 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog
{
   }
 
   /**
+   * This method returns Map of clusters.
+   * Map can be empty or with some objects, but never be null.
+   */
+  protected Map<String, Cluster> getCheckedClusterMap(Clusters clusters) {
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = clusters.getClusters();
+      if (clusterMap != null) {
+        return clusterMap;
+      }
+    }
+    return new HashMap<>();
+  }
+
+  /**
    * Create a new cluster scoped configuration with the new properties added
    * with the values from the coresponding xml files.
    *

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
index ca7707b..c77b724 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
@@ -30,6 +30,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ControllerModule;
 import org.apache.ambari.server.orm.DBAccessor;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.apache.ambari.server.utils.VersionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -182,6 +183,8 @@ public class SchemaUpgradeHelper {
       catalogBinder.addBinding().to(UpgradeCatalog213.class);
       catalogBinder.addBinding().to(UpgradeCatalog220.class);
       catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
+
+      EventBusSynchronizer.synchronizeAmbariEventPublisher(binder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index dcdbb85..2c152e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -18,20 +18,28 @@
 
 package org.apache.ambari.server.upgrade;
 
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * Upgrade catalog for version 2.1.3.
@@ -46,7 +54,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   /**
    * Logger.
    */
-  private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog212.class);
+  private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog213.class);
 
   @Inject
   DaoUtils daoUtils;
@@ -104,6 +112,76 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   protected void executeDMLUpdates() throws AmbariException, SQLException {
     addMissingConfigs();
     updateAMSConfigs();
+    updateAlertDefinitions();
+  }
+
+  /**
+   * Modifies the JSON of some of the alert definitions which have changed
+   * between Ambari versions.
+   */
+  protected void updateAlertDefinitions() {
+    LOG.info("Updating alert definitions.");
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    AlertDefinitionDAO alertDefinitionDAO = injector.getInstance(AlertDefinitionDAO.class);
+    Clusters clusters = ambariManagementController.getClusters();
+
+    Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+    for (final Cluster cluster : clusterMap.values()) {
+      final AlertDefinitionEntity alertDefinitionEntity = alertDefinitionDAO.findByName(
+          cluster.getClusterId(), "journalnode_process");
+
+      if (alertDefinitionEntity != null) {
+        String source = alertDefinitionEntity.getSource();
+
+        alertDefinitionEntity.setSource(modifyJournalnodeProcessAlertSource(source));
+        alertDefinitionEntity.setSourceType(SourceType.WEB);
+        alertDefinitionEntity.setHash(UUID.randomUUID().toString());
+
+        alertDefinitionDAO.merge(alertDefinitionEntity);
+        LOG.info("journalnode_process alert definition was updated.");
+      }
+    }
+  }
+
+  /**
+   * Modifies type of the journalnode_process alert to WEB.
+   * Changes reporting text and uri according to the WEB type.
+   * Removes default_port property.
+   */
+  String modifyJournalnodeProcessAlertSource(String source) {
+    JsonObject rootJson = new JsonParser().parse(source).getAsJsonObject();
+
+    rootJson.remove("type");
+    rootJson.addProperty("type", "WEB");
+
+    rootJson.remove("default_port");
+
+    rootJson.remove("uri");
+    JsonObject uriJson = new JsonObject();
+    uriJson.addProperty("http", "{{hdfs-site/dfs.journalnode.http-address}}");
+    uriJson.addProperty("https", "{{hdfs-site/dfs.journalnode.https-address}}");
+    uriJson.addProperty("kerberos_keytab", "{{hdfs-site/dfs.web.authentication.kerberos.keytab}}");
+    uriJson.addProperty("kerberos_principal", "{{hdfs-site/dfs.web.authentication.kerberos.principal}}");
+    uriJson.addProperty("https_property", "{{hdfs-site/dfs.http.policy}}");
+    uriJson.addProperty("https_property_value", "HTTPS_ONLY");
+    uriJson.addProperty("connection_timeout", 5.0);
+    rootJson.add("uri", uriJson);
+
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").remove("text");
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").addProperty(
+            "text", "HTTP {0} response in {2:.3f}s");
+
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("text");
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").addProperty(
+            "text", "HTTP {0} response from {1} in {2:.3f}s ({3})");
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("value");
+
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("text");
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").addProperty("text",
+            "Connection failed to {1} ({3})");
+    rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("value");
+
+    return rootJson.toString();
   }
 
   protected void addMissingConfigs() throws AmbariException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
new file mode 100644
index 0000000..d9be622
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.lang.reflect.Field;
+
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
+import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+
+/**
+ * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
+ * used by Guava with a synchronous, serial {@link EventBus} instance. This
+ * enables testing that relies on testing the outcome of asynchronous events by
+ * executing the events on the current thread serially.
+ */
+public class EventBusSynchronizer {
+
+  /**
+   * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
+   * and synchronous.
+   *
+   * @param binder
+   */
+  public static void synchronizeAmbariEventPublisher(Binder binder) {
+    EventBus synchronizedBus = new EventBus();
+    AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
+
+    replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
+        synchronizedBus);
+
+    binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
+  }
+
+  /**
+   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
+    EventBus synchronizedBus = new EventBus();
+    AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
+
+    replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
+
+    // register common ambari event listeners
+    registerAmbariListeners(injector, synchronizedBus);
+
+    return synchronizedBus;
+  }
+
+  /**
+   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static EventBus synchronizeAlertEventPublisher(Injector injector) {
+    EventBus synchronizedBus = new EventBus();
+    AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
+
+    replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
+
+    // register common alert event listeners
+    registerAlertListeners(injector, synchronizedBus);
+
+    return synchronizedBus;
+  }
+
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerAmbariListeners(Injector injector,
+      EventBus synchronizedBus) {
+    synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
+    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+    synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
+    synchronizedBus.register(injector.getInstance(HostVersionOutOfSyncListener.class));
+  }
+
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerAlertListeners(Injector injector,
+      EventBus synchronizedBus) {
+    synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
+    synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+  }
+
+  private static void replaceEventBus(Class<?> eventPublisherClass,
+      Object instance, EventBus eventBus) {
+
+    try {
+      Field field = eventPublisherClass.getDeclaredField("m_eventBus");
+      field.setAccessible(true);
+      field.set(instance, eventBus);
+    } catch (Exception exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
index 2ea9446..1eda00f 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
@@ -585,26 +585,31 @@
     "JOURNALNODE": [
       {
         "name": "journalnode_process",
-        "label": "JournalNode Process",
-        "description": "This host-level alert is triggered if the JournalNode process cannot
be confirmed to be up and listening on the network.",
+        "label": "JournalNode Web UI",
+        "description": "This host-level alert is triggered if the JournalNode Web UI is unreachable.",
         "interval": 1,
         "scope": "HOST",
         "enabled": true,
         "source": {
-          "type": "PORT",        
-          "uri": "{{hdfs-site/dfs.journalnode.http-address}}",
-          "default_port": 8480,
+          "type": "WEB",
+          "uri": {
+            "http": "{{hdfs-site/dfs.journalnode.http-address}}",
+            "https": "{{hdfs-site/dfs.journalnode.https-address}}",
+            "kerberos_keytab": "{{hdfs-site/dfs.web.authentication.kerberos.keytab}}",
+            "kerberos_principal": "{{hdfs-site/dfs.web.authentication.kerberos.principal}}",
+            "https_property": "{{hdfs-site/dfs.http.policy}}",
+            "https_property_value": "HTTPS_ONLY",
+            "connection_timeout": 5.0
+          },
           "reporting": {
             "ok": {
-              "text": "TCP OK - {0:.3f}s response on port {1}"
+              "text": "HTTP {0} response in {2:.3f}s"
             },
             "warning": {
-              "text": "TCP OK - {0:.3f}s response on port {1}",
-              "value": 1.5
+              "text": "HTTP {0} response from {1} in {2:.3f}s ({3})"
             },
             "critical": {
-              "text": "Connection failed: {0} to {1}:{2}",
-              "value": 5.0
+              "text": "Connection failed to {1} ({3})"
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index a54dee2..8063d33 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -97,16 +97,20 @@ public class UpgradeCatalog213Test {
   public void testExecuteDMLUpdates() throws Exception {
     Method addMissingConfigs = UpgradeCatalog213.class.getDeclaredMethod("addMissingConfigs");
     Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+    Method updateAlertDefinitions = UpgradeCatalog213.class.getDeclaredMethod("updateAlertDefinitions");
 
     UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
         .addMockedMethod(addMissingConfigs)
         .addMockedMethod(updateAMSConfigs)
+        .addMockedMethod(updateAlertDefinitions)
         .createMock();
 
     upgradeCatalog213.addMissingConfigs();
     expectLastCall().once();
     upgradeCatalog213.updateAMSConfigs();
     expectLastCall().once();
+    upgradeCatalog213.updateAlertDefinitions();
+    expectLastCall().once();
 
     replay(upgradeCatalog213);
 
@@ -258,6 +262,25 @@ public class UpgradeCatalog213Test {
     easyMockSupport.verifyAll();
   }
 
+  @Test
+  public void testModifyJournalnodeProcessAlertSource() throws Exception {
+    UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
+    String alertSource = "{\"uri\":\"{{hdfs-site/dfs.journalnode.http-address}}\",\"default_port\":8480,"
+
+            "\"type\":\"PORT\",\"reporting\":{\"ok\":{\"text\":\"TCP OK - {0:.3f}s response
on port {1}\"}," +
+            "\"warning\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\",\"value\":1.5},"
+
+            "\"critical\":{\"text\":\"Connection failed: {0} to {1}:{2}\",\"value\":5.0}}}";
+    String expected = "{\"reporting\":{\"ok\":{\"text\":\"HTTP {0} response in {2:.3f}s\"},"
+
+            "\"warning\":{\"text\":\"HTTP {0} response from {1} in {2:.3f}s ({3})\"}," +
+            "\"critical\":{\"text\":\"Connection failed to {1} ({3})\"}},\"type\":\"WEB\","
+
+            "\"uri\":{\"http\":\"{{hdfs-site/dfs.journalnode.http-address}}\"," +
+            "\"https\":\"{{hdfs-site/dfs.journalnode.https-address}}\"," +
+            "\"kerberos_keytab\":\"{{hdfs-site/dfs.web.authentication.kerberos.keytab}}\","
+
+            "\"kerberos_principal\":\"{{hdfs-site/dfs.web.authentication.kerberos.principal}}\","
+
+            "\"https_property\":\"{{hdfs-site/dfs.http.policy}}\"," +
+            "\"https_property_value\":\"HTTPS_ONLY\",\"connection_timeout\":5.0}}";
+    Assert.assertEquals(expected, upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource));
+  }
+
   /**
    * @param dbAccessor
    * @return
@@ -290,4 +313,4 @@ public class UpgradeCatalog213Test {
 
     Assert.assertEquals("2.1.3", upgradeCatalog.getTargetVersion());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9b0c2468/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
deleted file mode 100644
index d9be622..0000000
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.utils;
-
-import java.lang.reflect.Field;
-
-import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
-import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
-import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
-import org.apache.ambari.server.events.publishers.AlertEventPublisher;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
-
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.inject.Binder;
-import com.google.inject.Injector;
-
-/**
- * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
- * used by Guava with a synchronous, serial {@link EventBus} instance. This
- * enables testing that relies on testing the outcome of asynchronous events by
- * executing the events on the current thread serially.
- */
-public class EventBusSynchronizer {
-
-  /**
-   * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
-   * and synchronous.
-   *
-   * @param binder
-   */
-  public static void synchronizeAmbariEventPublisher(Binder binder) {
-    EventBus synchronizedBus = new EventBus();
-    AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
-
-    replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
-        synchronizedBus);
-
-    binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
-  }
-
-  /**
-   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
-   * and synchronous. Also register the known listeners. Registering known
-   * listeners is necessary since the event bus was replaced.
-   *
-   * @param injector
-   */
-  public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
-    EventBus synchronizedBus = new EventBus();
-    AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
-
-    replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
-
-    // register common ambari event listeners
-    registerAmbariListeners(injector, synchronizedBus);
-
-    return synchronizedBus;
-  }
-
-  /**
-   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
-   * and synchronous. Also register the known listeners. Registering known
-   * listeners is necessary since the event bus was replaced.
-   *
-   * @param injector
-   */
-  public static EventBus synchronizeAlertEventPublisher(Injector injector) {
-    EventBus synchronizedBus = new EventBus();
-    AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
-
-    replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
-
-    // register common alert event listeners
-    registerAlertListeners(injector, synchronizedBus);
-
-    return synchronizedBus;
-  }
-
-  /**
-   * Register the normal listeners with the replaced synchronous bus.
-   *
-   * @param injector
-   * @param synchronizedBus
-   */
-  private static void registerAmbariListeners(Injector injector,
-      EventBus synchronizedBus) {
-    synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
-    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
-    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
-    synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
-    synchronizedBus.register(injector.getInstance(HostVersionOutOfSyncListener.class));
-  }
-
-  /**
-   * Register the normal listeners with the replaced synchronous bus.
-   *
-   * @param injector
-   * @param synchronizedBus
-   */
-  private static void registerAlertListeners(Injector injector,
-      EventBus synchronizedBus) {
-    synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
-    synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
-    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
-  }
-
-  private static void replaceEventBus(Class<?> eventPublisherClass,
-      Object instance, EventBus eventBus) {
-
-    try {
-      Field field = eventPublisherClass.getDeclaredField("m_eventBus");
-      field.setAccessible(true);
-      field.set(instance, eventBus);
-    } catch (Exception exception) {
-      throw new RuntimeException(exception);
-    }
-  }
-}


Mime
View raw message