Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8715D200BFB for ; Wed, 11 Jan 2017 19:37:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 85A6F160B50; Wed, 11 Jan 2017 18:37:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E0FD160B3B for ; Wed, 11 Jan 2017 19:37:33 +0100 (CET) Received: (qmail 78883 invoked by uid 500); 11 Jan 2017 18:37:32 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 78874 invoked by uid 99); 11 Jan 2017 18:37:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jan 2017 18:37:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58314DFAEB; Wed, 11 Jan 2017 18:37:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ncole@apache.org To: commits@ambari.apache.org Message-Id: <757e9a8796b6411d98e90515e2c4b7df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-19450. Host version incorrectly reported as OUT_OF_SYNC (ncole) Date: Wed, 11 Jan 2017 18:37:32 +0000 (UTC) archived-at: Wed, 11 Jan 2017 18:37:34 -0000 Repository: ambari Updated Branches: refs/heads/branch-2.5 0378a6f04 -> 947b7191b AMBARI-19450. Host version incorrectly reported as OUT_OF_SYNC (ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/947b7191 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/947b7191 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/947b7191 Branch: refs/heads/branch-2.5 Commit: 947b7191bdc9badd8576c66c22c22189e33c0c03 Parents: 0378a6f Author: Nate Cole Authored: Tue Jan 10 18:52:32 2017 -0500 Committer: Nate Cole Committed: Wed Jan 11 13:37:16 2017 -0500 ---------------------------------------------------------------------- .../actionmanager/ExecutionCommandWrapper.java | 2 +- .../upgrade/HostVersionOutOfSyncListener.java | 101 +++++++++++++++- .../HostVersionOutOfSyncListenerTest.java | 121 +++++++++++++++++-- .../server/state/cluster/ClusterTest.java | 5 +- 4 files changed, 207 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/947b7191/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java index 0562c15..26c39fc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java @@ -183,7 +183,7 @@ public class ExecutionCommandWrapper { // it's possible that there are commands without clusters; in such cases, // just return the de-serialized command and don't try to read configs LOG.warn( - "Unable to lookup the cluster byt ID; assuming that there is no cluster and therefore no configs for this execution command: {}", + "Unable to lookup the cluster by ID; assuming that there is no cluster and therefore no configs for this execution command: {}", cnfe.getMessage()); return executionCommand; http://git-wip-us.apache.org/repos/asf/ambari/blob/947b7191/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java index dabd348..51ee285 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; @@ -31,11 +32,14 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.events.HostsAddedEvent; import org.apache.ambari.server.events.HostsRemovedEvent; import org.apache.ambari.server.events.ServiceComponentInstalledEvent; +import org.apache.ambari.server.events.ServiceComponentUninstalledEvent; import org.apache.ambari.server.events.ServiceInstalledEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.logging.LockFactory; import org.apache.ambari.server.orm.dao.HostDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.entities.ClusterVersionEntity; +import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; @@ -83,9 +87,17 @@ public class HostVersionOutOfSyncListener { @Inject private Provider ami; + /** + * The publisher may be an asynchronous, multi-threaded one, so to avoid the (rare, but possible) case + * of both an Install and Uninstall event occurring at the same time, we use a Lock. + */ + private final Lock m_lock; + @Inject - public HostVersionOutOfSyncListener(AmbariEventPublisher ambariEventPublisher) { + public HostVersionOutOfSyncListener(AmbariEventPublisher ambariEventPublisher, LockFactory lockFactory) { ambariEventPublisher.register(this); + + m_lock = lockFactory.newLock("hostVersionOutOfSyncListenerLock"); } @Subscribe @@ -95,6 +107,8 @@ public class HostVersionOutOfSyncListener { LOG.debug(event.toString()); } + m_lock.lock(); + try { Cluster cluster = clusters.get().getClusterById(event.getClusterId()); List hostVersionEntities = @@ -102,7 +116,7 @@ public class HostVersionOutOfSyncListener { for (HostVersionEntity hostVersionEntity : hostVersionEntities) { StackEntity hostStackEntity = hostVersionEntity.getRepositoryVersion().getStack(); - StackId hostStackId = new StackId(hostStackEntity.getStackName(), hostStackEntity.getStackVersion()); + StackId hostStackId = new StackId(hostStackEntity); // If added components do not advertise version, it makes no sense to mark version OUT_OF_SYNC // We perform check per-stack version, because component may be not versionAdvertised in current @@ -111,21 +125,96 @@ public class HostVersionOutOfSyncListener { String componentName = event.getComponentName(); ComponentInfo component = ami.get().getComponent(hostStackId.getStackName(), hostStackId.getStackVersion(), serviceName, componentName); + if (!component.isVersionAdvertised()) { + RepositoryVersionState state = checkAllHostComponents(hostStackId, hostVersionEntity.getHostEntity()); + if (null != state) { + hostVersionEntity.setState(state); + hostVersionDAO.get().merge(hostVersionEntity); + } continue; } - if (hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED)) { - hostVersionEntity.setState(RepositoryVersionState.OUT_OF_SYNC); - hostVersionDAO.get().merge(hostVersionEntity); - cluster.recalculateClusterVersionState(hostVersionEntity.getRepositoryVersion()); + switch (hostVersionEntity.getState()) { + case INSTALLED: + case NOT_REQUIRED: + hostVersionEntity.setState(RepositoryVersionState.OUT_OF_SYNC); + hostVersionDAO.get().merge(hostVersionEntity); + cluster.recalculateClusterVersionState(hostVersionEntity.getRepositoryVersion()); + break; + default: + break; } } } catch (AmbariException e) { LOG.error("Can not update hosts about out of sync", e); + } finally { + m_lock.unlock(); + } + } + + @Subscribe + @Transactional + public void onServiceComponentHostEvent(ServiceComponentUninstalledEvent event) { + + m_lock.lock(); + + try { + Cluster cluster = clusters.get().getClusterById(event.getClusterId()); + List hostVersionEntities = + hostVersionDAO.get().findByClusterAndHost(cluster.getClusterName(), event.getHostName()); + + for (HostVersionEntity hostVersionEntity : hostVersionEntities) { + HostEntity hostEntity = hostVersionEntity.getHostEntity(); + RepositoryVersionEntity repoVersionEntity = hostVersionEntity.getRepositoryVersion(); + StackId stackId = repoVersionEntity.getStackId(); + + if (null == stackId) { + LOG.info("Stack id could not be loaded for host version {}, repo {}", hostVersionEntity.getHostName(), + repoVersionEntity.getVersion()); + continue; + } + + RepositoryVersionState repoState = checkAllHostComponents(stackId, hostEntity); + if (null != repoState) { + hostVersionEntity.setState(repoState); + hostVersionDAO.get().merge(hostVersionEntity); + } + } + + } catch (AmbariException e) { + LOG.error("Cannot update states after a component was uninstalled: {}", event, e); + } finally { + m_lock.unlock(); } } + /** + * Checks if all the components advertise version. If additional states need to be + * computed, add on to the logic of this method; make sure the usages are checked for + * correctness. + * + * @param stackId the stack id + * @param host the host entity to find components + * @return {@code null} if there should be no state change. non-{@code null} to change it + */ + private RepositoryVersionState checkAllHostComponents(StackId stackId, + HostEntity host) throws AmbariException { + + Collection hostComponents = host.getHostComponentDesiredStateEntities(); + + for (HostComponentDesiredStateEntity hostComponent : hostComponents) { + ComponentInfo ci = ami.get().getComponent(stackId.getStackName(), stackId.getStackVersion(), + hostComponent.getServiceName(), hostComponent.getComponentName()); + + if (ci.isVersionAdvertised()) { + return null; + } + } + + return RepositoryVersionState.NOT_REQUIRED; + } + @Subscribe @Transactional public void onServiceEvent(ServiceInstalledEvent event) { http://git-wip-us.apache.org/repos/asf/ambari/blob/947b7191/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java index 3592668..5abcb94 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java @@ -29,9 +29,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.events.ServiceComponentInstalledEvent; +import org.apache.ambari.server.events.ServiceComponentUninstalledEvent; import org.apache.ambari.server.events.ServiceInstalledEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; @@ -48,15 +50,16 @@ import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.ServiceComponentHostFactory; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; @@ -64,7 +67,6 @@ import com.google.inject.persist.PersistService; import com.google.inject.persist.UnitOfWork; public class HostVersionOutOfSyncListenerTest { - private static final Logger LOG = LoggerFactory.getLogger(HostVersionOutOfSyncListenerTest.class); private final String stackId = "HDP-2.2.0"; private final String yetAnotherStackId = "HDP-2.1.1"; @@ -151,7 +153,7 @@ public class HostVersionOutOfSyncListenerTest { // Register and install new version RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId, INSTALLED_VERSION); - HostVersionEntity hv1 = helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); + helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); c1.recalculateAllClusterVersionStates(); assertRepoVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED); @@ -179,7 +181,7 @@ public class HostVersionOutOfSyncListenerTest { // Register and install new version RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId, INSTALLED_VERSION); - HostVersionEntity hv2 = helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); + helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); c1.recalculateAllClusterVersionStates(); assertRepoVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED); @@ -345,7 +347,6 @@ public class HostVersionOutOfSyncListenerTest { h1.setState(HostState.HEALTHY); StackId stackId = new StackId(this.stackId); - StackId yaStackId = new StackId(yetAnotherStackId); RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId,"2.2.0-1000"); RepositoryVersionEntity repositoryVersionEntity2 = helper.getOrCreateRepositoryVersion(stackId,"2.2.0-2000"); c1.createClusterVersion(stackId, "2.2.0-1000", "admin", RepositoryVersionState.INSTALLING); @@ -354,8 +355,8 @@ public class HostVersionOutOfSyncListenerTest { assertRepoVersionState(stackId.getStackId(), "2.2.0-1000", RepositoryVersionState.INSTALLING); assertRepoVersionState(stackId.getStackId(), "2.2.0-2086", RepositoryVersionState.CURRENT); - HostVersionEntity hv1 = helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); - HostVersionEntity hv2 = helper.createHostVersion("h1", repositoryVersionEntity2, RepositoryVersionState.INSTALLED); + helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED); + helper.createHostVersion("h1", repositoryVersionEntity2, RepositoryVersionState.INSTALLED); c1.recalculateAllClusterVersionStates(); assertRepoVersionState(stackId.getStackId(), "2.2.0-1000", RepositoryVersionState.INSTALLED); assertRepoVersionState(stackId.getStackId(), "2.2.0-2000", RepositoryVersionState.INSTALLED); @@ -395,10 +396,6 @@ public class HostVersionOutOfSyncListenerTest { c1.setCurrentStackVersion(stackId); c1.recalculateAllClusterVersionStates(); - for (ClusterVersionEntity cve : c1.getAllClusterVersions()) { - System.out.println(cve.getRepositoryVersion().getDisplayName()); - } - assertRepoVersionState(stackId.getStackId(), "2.2.0", RepositoryVersionState.CURRENT); assertRepoVersionState(stackId.getStackId(), "2.2.9-9999", RepositoryVersionState.INSTALLING); @@ -433,6 +430,100 @@ public class HostVersionOutOfSyncListenerTest { assertRepoVersionState(stackId.getStackId(), "2.2.9-9999", RepositoryVersionState.INSTALLED); } + @Test + public void testComponentHostVersionNotRequired() throws Exception { + String clusterName = UUID.randomUUID().toString(); + String host1 = "host1"; + String host2 = "host2"; + String host3 = "host3"; + + List allHosts = Lists.newArrayList(host1, host2, host3); + + // create cluster and hosts + StackId stackId = new StackId(this.stackId); + clusters.addCluster(clusterName, stackId); + c1 = clusters.getCluster(clusterName); + addHost(host1); + addHost(host2); + addHost(host3); + + // create repo version + RepositoryVersionEntity repo = helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion()); + c1.createClusterVersion(stackId, stackId.getStackVersion(), "admin", RepositoryVersionState.INSTALLING); + clusters.mapHostToCluster(host1, clusterName); + clusters.mapHostToCluster(host2, clusterName); + clusters.mapHostToCluster(host3, clusterName); + + helper.createHostVersion(host1, repo, RepositoryVersionState.INSTALLED); + helper.createHostVersion(host2, repo, RepositoryVersionState.INSTALLED); + helper.createHostVersion(host3, repo, RepositoryVersionState.INSTALLED); + + // add host1 with versionable component + non-versionable + // add host2 with versionable component + // add host3 with non-versionable component + + Map> topology = new ImmutableMap.Builder>() + .put("NAMENODE", Lists.newArrayList(0)) + .put("DATANODE", Lists.newArrayList(1)) + .build(); + addService(c1, allHosts, topology, "HDFS"); + + topology = new ImmutableMap.Builder>() + .put("GANGLIA_SERVER", Lists.newArrayList(0)) + .put("GANGLIA_MONITOR", Lists.newArrayList(2)) + .build(); + addService(c1, allHosts, topology, "GANGLIA"); + + List hostVersions = hostVersionDAO.findAll(); + assertEquals(3, hostVersions.size()); + + // assert host1 is OUT_OF_SYNC + // assert host2 is OUT_OF_SYNC + // assert host3 is NOT_REQUIRED + for (HostVersionEntity hve : hostVersions) { + if (hve.getHostName().equals(host3)) { + assertEquals(RepositoryVersionState.NOT_REQUIRED, hve.getState()); + } else { + assertEquals(RepositoryVersionState.OUT_OF_SYNC, hve.getState()); + } + } + + // add versionable component to host3 + addServiceComponent(c1, Collections.singletonList(host3), "HDFS", "DATANODE"); + + // assert host3 is OUT_OF_SYNC + hostVersions = hostVersionDAO.findAll(); + for (HostVersionEntity hve : hostVersions) { + assertEquals(RepositoryVersionState.OUT_OF_SYNC, hve.getState()); + } + + // remove versionable component from host3 + List hostComponents = c1.getServiceComponentHosts(host3); + for (ServiceComponentHost sch : hostComponents) { + if (sch.getServiceName().equals("HDFS")) { + sch.delete(); + + StackId clusterStackId = c1.getDesiredStackVersion(); + + ServiceComponentUninstalledEvent event = new ServiceComponentUninstalledEvent( + c1.getClusterId(), clusterStackId.getStackName(), clusterStackId.getStackVersion(), + "HDFS", "DATANODE", sch.getHostName(), false); + + m_eventPublisher.publish(event); + } + } + + // assert host3 is back to NOT_REQUIRED + hostVersions = hostVersionDAO.findAll(); + for (HostVersionEntity hve : hostVersions) { + if (hve.getHostName().equals(host3)) { + assertEquals(RepositoryVersionState.NOT_REQUIRED, hve.getState()); + } else { + assertEquals(RepositoryVersionState.OUT_OF_SYNC, hve.getState()); + } + } + } + private void addHost(String hostname) throws AmbariException { clusters.addHost(hostname); @@ -476,7 +567,11 @@ public class HostVersionOutOfSyncListenerTest { String serviceName, String componentName) throws AmbariException { StackId stackIdObj = new StackId(stackId); Service service = cl.getService(serviceName); - service.addServiceComponent(componentName); + + if (!service.getServiceComponents().containsKey(componentName)) { + service.addServiceComponent(componentName); + } + ServiceComponent component = service.getServiceComponent(componentName); for(String hostName : hostList) { http://git-wip-us.apache.org/repos/asf/ambari/blob/947b7191/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java index bbf1478..cb5e520 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java @@ -2033,7 +2033,8 @@ public class ClusterTest { Assert.assertNotNull(cv2); Assert.assertEquals(cv2.getState(), RepositoryVersionState.INSTALLED); - // Add one more Host, with only Ganglia on it. It should have a HostVersion in OUT_OF_SYNC for v2 + // Add one more Host, with only Ganglia on it. It should have a HostVersion in NOT_REQUIRED for v2, + // as Ganglia isn't versionable addHost("h-5", hostAttributes); clusters.mapAndPublishHostsToCluster(Collections.singleton("h-5"), clusterName); ServiceComponentHost schHost5Serv3CompB = serviceComponentHostFactory.createNew(sc3CompB, "h-5"); @@ -2042,7 +2043,7 @@ public class ClusterTest { // Host 5 will be in OUT_OF_SYNC, so redistribute bits to it so that it reaches a state of INSTALLED HostVersionEntity h5Version2 = hostVersionDAO.findByClusterStackVersionAndHost(clusterName, stackId, v2, "h-5"); Assert.assertNotNull(h5Version2); - Assert.assertEquals(h5Version2.getState(), RepositoryVersionState.OUT_OF_SYNC); + Assert.assertEquals(RepositoryVersionState.NOT_REQUIRED, h5Version2.getState()); h5Version2.setState(RepositoryVersionState.INSTALLED); hostVersionDAO.merge(h5Version2);