Return-Path: X-Original-To: apmail-incubator-ambari-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-ambari-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D638F605 for ; Wed, 20 Mar 2013 20:45:59 +0000 (UTC) Received: (qmail 43244 invoked by uid 500); 20 Mar 2013 20:45:59 -0000 Delivered-To: apmail-incubator-ambari-commits-archive@incubator.apache.org Received: (qmail 43223 invoked by uid 500); 20 Mar 2013 20:45:59 -0000 Mailing-List: contact ambari-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@incubator.apache.org Delivered-To: mailing list ambari-commits@incubator.apache.org Received: (qmail 43212 invoked by uid 99); 20 Mar 2013 20:45:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Mar 2013 20:45:58 +0000 X-ASF-Spam-Status: No, hits=-1999.6 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM_FRAUD_PHISH,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Mar 2013 20:45:55 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9ADAF2388A91; Wed, 20 Mar 2013 20:45:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1459041 [7/18] - in /incubator/ambari/branches/branch-1.2: ./ ambari-agent/ ambari-agent/conf/unix/ ambari-agent/src/main/puppet/modules/hdp-ganglia/files/ ambari-agent/src/main/puppet/modules/hdp-ganglia/manifests/ ambari-agent/src/main/p... Date: Wed, 20 Mar 2013 20:44:50 -0000 To: ambari-commits@incubator.apache.org From: yusaku@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130320204510.9ADAF2388A91@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java Wed Mar 20 20:44:43 2013 @@ -138,54 +138,62 @@ public class ClusterImpl implements Clus * Make sure we load all the service host components. * We need this for live status checks. */ - public synchronized void loadServiceHostComponents() { + public void loadServiceHostComponents() { loadServices(); - LOG.info("Loading Service Host Components"); if (svcHostsLoaded) return; - if (services != null) { - for (Map.Entry serviceKV: services.entrySet()) { - /* get all the service component hosts **/ - Service service = serviceKV.getValue(); - if (!serviceComponentHosts.containsKey(service.getName())) { - serviceComponentHosts.put(service.getName(), new HashMap>()); - } - for (Map.Entry svcComponent: - service.getServiceComponents().entrySet()) { - ServiceComponent comp = svcComponent.getValue(); - String componentName = svcComponent.getKey(); - if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) { - serviceComponentHosts.get(service.getName()).put(componentName, - new HashMap()); + writeLock.lock(); + try { + if (svcHostsLoaded) return; + LOG.info("Loading Service Host Components"); + if (svcHostsLoaded) return; + if (services != null) { + for (Entry serviceKV: services.entrySet()) { + /* get all the service component hosts **/ + Service service = serviceKV.getValue(); + if (!serviceComponentHosts.containsKey(service.getName())) { + serviceComponentHosts.put(service.getName(), new HashMap>()); + } + for (Entry svcComponent: + service.getServiceComponents().entrySet()) { + ServiceComponent comp = svcComponent.getValue(); + String componentName = svcComponent.getKey(); + if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) { + serviceComponentHosts.get(service.getName()).put(componentName, + new HashMap()); + } + /** Get Service Host Components **/ + for (Entry svchost: + comp.getServiceComponentHosts().entrySet()) { + String hostname = svchost.getKey(); + ServiceComponentHost svcHostComponent = svchost.getValue(); + if (!serviceComponentHostsByHost.containsKey(hostname)) { + serviceComponentHostsByHost.put(hostname, + new ArrayList()); + } + List compList = serviceComponentHostsByHost.get(hostname); + compList.add(svcHostComponent); + + if (!serviceComponentHosts.get(service.getName()).get(componentName) + .containsKey(hostname)) { + serviceComponentHosts.get(service.getName()).get(componentName) + .put(hostname, svcHostComponent); + } + } } - /** Get Service Host Components **/ - for (Map.Entry svchost: - comp.getServiceComponentHosts().entrySet()) { - String hostname = svchost.getKey(); - ServiceComponentHost svcHostComponent = svchost.getValue(); - if (!serviceComponentHostsByHost.containsKey(hostname)) { - serviceComponentHostsByHost.put(hostname, - new ArrayList()); - } - List compList = serviceComponentHostsByHost.get(hostname); - compList.add(svcHostComponent); - - if (!serviceComponentHosts.get(service.getName()).get(componentName) - .containsKey(hostname)) { - serviceComponentHosts.get(service.getName()).get(componentName) - .put(hostname, svcHostComponent); - } - } } } + svcHostsLoaded = true; + } finally { + writeLock.unlock(); } - svcHostsLoaded = true; } private void loadServices() { LOG.info("clusterEntity " + clusterEntity.getClusterServiceEntities() ); if (services == null) { - synchronized (this) { + writeLock.lock(); + try { if (services == null) { services = new TreeMap(); if (!clusterEntity.getClusterServiceEntities().isEmpty()) { @@ -194,6 +202,8 @@ public class ClusterImpl implements Clus } } } + } finally { + writeLock.unlock(); } } } @@ -201,22 +211,27 @@ public class ClusterImpl implements Clus public ServiceComponentHost getServiceComponentHost(String serviceName, String serviceComponentName, String hostname) throws AmbariException { loadServiceHostComponents(); - if (!serviceComponentHosts.containsKey(serviceName) - || !serviceComponentHosts.get(serviceName) - .containsKey(serviceComponentName) - || !serviceComponentHosts.get(serviceName).get(serviceComponentName) - .containsKey(hostname)) { - throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName, - serviceComponentName, hostname); + readLock.lock(); + try { + if (!serviceComponentHosts.containsKey(serviceName) + || !serviceComponentHosts.get(serviceName) + .containsKey(serviceComponentName) + || !serviceComponentHosts.get(serviceName).get(serviceComponentName) + .containsKey(hostname)) { + throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName, + serviceComponentName, hostname); + } + return serviceComponentHosts.get(serviceName).get(serviceComponentName) + .get(hostname); + } finally { + readLock.unlock(); } - return serviceComponentHosts.get(serviceName).get(serviceComponentName) - .get(hostname); } @Override public String getClusterName() { + readLock.lock(); try { - readLock.lock(); return clusterEntity.getClusterName(); } finally { readLock.unlock(); @@ -225,8 +240,8 @@ public class ClusterImpl implements Clus @Override public void setClusterName(String clusterName) { + writeLock.lock(); try { - writeLock.lock(); String oldName = clusterEntity.getClusterName(); clusterEntity.setClusterName(clusterName); clusterDAO.merge(clusterEntity); //RollbackException possibility if UNIQUE constraint violated @@ -236,35 +251,37 @@ public class ClusterImpl implements Clus } } - public synchronized void addServiceComponentHost( + public void addServiceComponentHost( ServiceComponentHost svcCompHost) throws AmbariException { loadServiceHostComponents(); - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache" - + ", serviceName=" + svcCompHost.getServiceName() - + ", componentName=" + svcCompHost.getServiceComponentName() - + ", hostname=" + svcCompHost.getHostName()); - } - - final String hostname = svcCompHost.getHostName(); - final String serviceName = svcCompHost.getServiceName(); - final String componentName = svcCompHost.getServiceComponentName(); - Set cs = clusters.getClustersForHost(hostname); - boolean clusterFound = false; - Iterator iter = cs.iterator(); - while (iter.hasNext()) { - Cluster c = iter.next(); - if (c.getClusterId() == this.getClusterId()) { - clusterFound = true; - break; - } - } - if (!clusterFound) { - throw new AmbariException("Host does not belong this cluster" - + ", hostname=" + hostname - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId()); - } + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache" + + ", serviceName=" + svcCompHost.getServiceName() + + ", componentName=" + svcCompHost.getServiceComponentName() + + ", hostname=" + svcCompHost.getHostName()); + } + + final String hostname = svcCompHost.getHostName(); + final String serviceName = svcCompHost.getServiceName(); + final String componentName = svcCompHost.getServiceComponentName(); + Set cs = clusters.getClustersForHost(hostname); + boolean clusterFound = false; + Iterator iter = cs.iterator(); + while (iter.hasNext()) { + Cluster c = iter.next(); + if (c.getClusterId() == this.getClusterId()) { + clusterFound = true; + break; + } + } + if (!clusterFound) { + throw new AmbariException("Host does not belong this cluster" + + ", hostname=" + hostname + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId()); + } if (!serviceComponentHosts.containsKey(serviceName)) { serviceComponentHosts.put(serviceName, @@ -275,278 +292,400 @@ public class ClusterImpl implements Clus new HashMap()); } - if (serviceComponentHosts.get(serviceName).get(componentName). - containsKey(hostname)) { - throw new AmbariException("Duplicate entry for ServiceComponentHost" - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (serviceComponentHosts.get(serviceName).get(componentName). + containsKey(hostname)) { + throw new AmbariException("Duplicate entry for ServiceComponentHost" + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - if (!serviceComponentHostsByHost.containsKey(hostname)) { - serviceComponentHostsByHost.put(hostname, - new ArrayList()); - } + if (!serviceComponentHostsByHost.containsKey(hostname)) { + serviceComponentHostsByHost.put(hostname, + new ArrayList()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new ServiceComponentHost" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new ServiceComponentHost" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - serviceComponentHosts.get(serviceName).get(componentName).put(hostname, - svcCompHost); - serviceComponentHostsByHost.get(hostname).add(svcCompHost); + serviceComponentHosts.get(serviceName).get(componentName).put(hostname, + svcCompHost); + serviceComponentHostsByHost.get(hostname).add(svcCompHost); + } finally { + writeLock.unlock(); + } } @Override public long getClusterId() { - return clusterEntity.getClusterId(); + readLock.lock(); + try { + return clusterEntity.getClusterId(); + } finally { + readLock.unlock(); + } } @Override - public synchronized List getServiceComponentHosts( + public List getServiceComponentHosts( String hostname) { loadServiceHostComponents(); - if (serviceComponentHostsByHost.containsKey(hostname)) { - return Collections.unmodifiableList( - serviceComponentHostsByHost.get(hostname)); + readLock.lock(); + try { + if (serviceComponentHostsByHost.containsKey(hostname)) { + return Collections.unmodifiableList( + serviceComponentHostsByHost.get(hostname)); + } + return new ArrayList(); + } finally { + readLock.unlock(); } - return new ArrayList(); } @Override - public synchronized void addService(Service service) + public void addService(Service service) throws AmbariException { loadServices(); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new Service" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + service.getName()); - } - if (services.containsKey(service.getName())) { - throw new AmbariException("Service already exists" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + service.getName()); + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new Service" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + service.getName()); + } + if (services.containsKey(service.getName())) { + throw new AmbariException("Service already exists" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + service.getName()); + } + this.services.put(service.getName(), service); + } finally { + writeLock.unlock(); } - this.services.put(service.getName(), service); } @Override - public synchronized Service addService(String serviceName) throws AmbariException{ + public Service addService(String serviceName) throws AmbariException{ loadServices(); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new Service" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName); - } - if (services.containsKey(serviceName)) { - throw new AmbariException("Service already exists" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName); + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new Service" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName); + } + if (services.containsKey(serviceName)) { + throw new AmbariException("Service already exists" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName); + } + Service s = serviceFactory.createNew(this, serviceName); + this.services.put(s.getName(), s); + return s; + } finally { + writeLock.unlock(); } - Service s = serviceFactory.createNew(this, serviceName); - this.services.put(s.getName(), s); - return s; } @Override - public synchronized Service getService(String serviceName) + public Service getService(String serviceName) throws AmbariException { loadServices(); - if (!services.containsKey(serviceName)) { - throw new ServiceNotFoundException(getClusterName(), serviceName); + readLock.lock(); + try { + if (!services.containsKey(serviceName)) { + throw new ServiceNotFoundException(getClusterName(), serviceName); + } + return services.get(serviceName); + } finally { + readLock.unlock(); } - return services.get(serviceName); } @Override - public synchronized Map getServices() { + public Map getServices() { loadServices(); - return Collections.unmodifiableMap(services); + readLock.lock(); + try { + return Collections.unmodifiableMap(services); + } finally { + readLock.unlock(); + } } @Override - public synchronized StackId getDesiredStackVersion() { - return desiredStackVersion; + public StackId getDesiredStackVersion() { + readLock.lock(); + try { + return desiredStackVersion; + } finally { + readLock.unlock(); + } } @Override - public synchronized void setDesiredStackVersion(StackId stackVersion) { - if (LOG.isDebugEnabled()) { - LOG.debug("Changing DesiredStackVersion of Cluster" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", currentDesiredStackVersion=" + this.desiredStackVersion - + ", newDesiredStackVersion=" + stackVersion); + public void setDesiredStackVersion(StackId stackVersion) { + readWriteLock.writeLock().lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Changing DesiredStackVersion of Cluster" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", currentDesiredStackVersion=" + this.desiredStackVersion + + ", newDesiredStackVersion=" + stackVersion); + } + this.desiredStackVersion = stackVersion; + clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion)); + clusterDAO.merge(clusterEntity); + } finally { + readWriteLock.writeLock().unlock(); } - this.desiredStackVersion = stackVersion; - clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion)); - clusterDAO.merge(clusterEntity); + } - public synchronized StackId getDesiredState() { + public StackId getDesiredState() { //TODO separate implementation, mapped to StackVersion for now // return desiredState; for separate implementation - return getDesiredStackVersion(); + readWriteLock.readLock().lock(); + try { + return getDesiredStackVersion(); + } finally { + readWriteLock.readLock().unlock(); + } + } - public synchronized void setDesiredState(StackId desiredState) { + public void setDesiredState(StackId desiredState) { //TODO separate implementation, mapped to StackVersion for now // LOG.debug("Changing desired state of cluster, clusterName={}, clusterId={}, oldState={}, newState={}", // getClusterName(), getClusterId(), this.desiredState, desiredState); // clusterEntity.setDesiredClusterState(gson.toJson(desiredState)); // clusterDAO.merge(clusterEntity); // this.desiredState = desiredState; - setDesiredStackVersion(desiredState); + readWriteLock.writeLock().lock(); + try { + setDesiredStackVersion(desiredState); + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override - public synchronized Map getDesiredConfigsByType(String configType) { - if (!configs.containsKey(configType)) - return null; + public Map getDesiredConfigsByType(String configType) { + readWriteLock.writeLock().lock(); + try { + if (!configs.containsKey(configType)) + return null; + + return Collections.unmodifiableMap(configs.get(configType)); + } finally { + readWriteLock.writeLock().unlock(); + } - return Collections.unmodifiableMap(configs.get(configType)); } @Override - public synchronized Config getDesiredConfig(String configType, String versionTag) { - if (!configs.containsKey(configType) - || !configs.get(configType).containsKey(versionTag)) { - return null; + public Config getDesiredConfig(String configType, String versionTag) { + readWriteLock.readLock().lock(); + try { + if (!configs.containsKey(configType) + || !configs.get(configType).containsKey(versionTag)) { + return null; + } + return configs.get(configType).get(versionTag); + } finally { + readWriteLock.readLock().unlock(); } - return configs.get(configType).get(versionTag); + } @Override - public synchronized void addDesiredConfig(Config config) { - if (config.getType() == null - || config.getType().isEmpty() - || config.getVersionTag() == null - || config.getVersionTag().isEmpty()) { - // TODO throw error - } - if (!configs.containsKey(config.getType())) { - configs.put(config.getType(), new HashMap()); + public void addDesiredConfig(Config config) { + readWriteLock.writeLock().lock(); + try { + if (config.getType() == null + || config.getType().isEmpty() + || config.getVersionTag() == null + || config.getVersionTag().isEmpty()) { + // TODO throw error + } + if (!configs.containsKey(config.getType())) { + configs.put(config.getType(), new HashMap()); + } + + configs.get(config.getType()).put(config.getVersionTag(), config); + } finally { + readWriteLock.writeLock().unlock(); } - configs.get(config.getType()).put(config.getVersionTag(), config); } - public synchronized Collection getAllConfigs() { - List list = new ArrayList(); - for (Entry> entry : configs.entrySet()) { - for (Config config : entry.getValue().values()) { - list.add(config); + public Collection getAllConfigs() { + readWriteLock.readLock().lock(); + try { + List list = new ArrayList(); + for (Entry> entry : configs.entrySet()) { + for (Config config : entry.getValue().values()) { + list.add(config); + } } + return Collections.unmodifiableList(list); + } finally { + readWriteLock.readLock().unlock(); } - return Collections.unmodifiableList(list); + } @Override - public synchronized ClusterResponse convertToResponse() + public ClusterResponse convertToResponse() throws AmbariException { - ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(), - clusters.getHostsForCluster(getClusterName()).keySet(), - getDesiredStackVersion().getStackId()); - return r; + readWriteLock.readLock().lock(); + try { + ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(), + clusters.getHostsForCluster(getClusterName()).keySet(), + getDesiredStackVersion().getStackId()); + return r; + } finally { + readWriteLock.readLock().unlock(); + } + } - public synchronized void debugDump(StringBuilder sb) { + public void debugDump(StringBuilder sb) { loadServices(); - sb.append("Cluster={ clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", desiredStackVersion=" + desiredStackVersion.getStackId() - + ", services=[ "); - boolean first = true; - for(Service s : services.values()) { - if (!first) { - sb.append(" , "); - first = false; - } - sb.append("\n "); - s.debugDump(sb); - sb.append(" "); + readWriteLock.readLock().lock(); + try { + sb.append("Cluster={ clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", desiredStackVersion=" + desiredStackVersion.getStackId() + + ", services=[ "); + boolean first = true; + for (Service s : services.values()) { + if (!first) { + sb.append(" , "); + first = false; + } + sb.append("\n "); + s.debugDump(sb); + sb.append(" "); + } + sb.append(" ] }"); + } finally { + readWriteLock.readLock().unlock(); } - sb.append(" ] }"); + } @Override @Transactional - public synchronized void refresh() { - clusterEntity = clusterDAO.findById(clusterEntity.getClusterId()); - clusterDAO.refresh(clusterEntity); + public void refresh() { + readWriteLock.writeLock().lock(); + try { + clusterEntity = clusterDAO.findById(clusterEntity.getClusterId()); + clusterDAO.refresh(clusterEntity); + } finally { + readWriteLock.writeLock().unlock(); + } + } @Override @Transactional - public synchronized void deleteAllServices() throws AmbariException { + public void deleteAllServices() throws AmbariException { loadServices(); - LOG.info("Deleting all services for cluster" - + ", clusterName=" + getClusterName()); - for (Service service : services.values()) { - if (!service.canBeRemoved()) { - throw new AmbariException("Found non removable service when trying to" - + " all services from cluster" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); + readWriteLock.writeLock().lock(); + try { + LOG.info("Deleting all services for cluster" + + ", clusterName=" + getClusterName()); + for (Service service : services.values()) { + if (!service.canBeRemoved()) { + throw new AmbariException("Found non removable service when trying to" + + " all services from cluster" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } } - } - for (Service service : services.values()) { - service.delete(); + for (Service service : services.values()) { + service.delete(); + } + + services.clear(); + } finally { + readWriteLock.writeLock().unlock(); } - services.clear(); } @Override - public synchronized void deleteService(String serviceName) + public void deleteService(String serviceName) throws AmbariException { loadServices(); - Service service = getService(serviceName); - LOG.info("Deleting service for cluster" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); - // FIXME check dependencies from meta layer - if (!service.canBeRemoved()) { - throw new AmbariException("Could not delete service from cluster" + readWriteLock.writeLock().lock(); + try { + Service service = getService(serviceName); + LOG.info("Deleting service for cluster" + ", clusterName=" + getClusterName() + ", serviceName=" + service.getName()); + // FIXME check dependencies from meta layer + if (!service.canBeRemoved()) { + throw new AmbariException("Could not delete service from cluster" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } + service.delete(); + services.remove(serviceName); + } finally { + readWriteLock.writeLock().unlock(); } - service.delete(); - services.remove(serviceName); + } @Override public boolean canBeRemoved() { loadServices(); - boolean safeToRemove = true; - for (Service service : services.values()) { - if (!service.canBeRemoved()) { - safeToRemove = false; - LOG.warn("Found non removable service" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); + readWriteLock.readLock().lock(); + try { + boolean safeToRemove = true; + for (Service service : services.values()) { + if (!service.canBeRemoved()) { + safeToRemove = false; + LOG.warn("Found non removable service" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } } + return safeToRemove; + } finally { + readWriteLock.readLock().unlock(); } - return safeToRemove; + } @Override @Transactional public void delete() throws AmbariException { - deleteAllServices(); - removeEntities(); - configs.clear(); + readWriteLock.writeLock().lock(); + try { + deleteAllServices(); + removeEntities(); + configs.clear(); + } finally { + readWriteLock.writeLock().unlock(); + } + } @Transactional Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java Wed Mar 20 20:44:43 2013 @@ -25,6 +25,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.persistence.RollbackException; @@ -56,11 +59,17 @@ public class ClustersImpl implements Clu private static final Logger LOG = LoggerFactory.getLogger( ClustersImpl.class); - private Map clusters; - private Map clustersById; - private Map hosts; - private Map> hostClusterMap; - private Map> clusterHostMap; + private ConcurrentHashMap clusters; + private ConcurrentHashMap clustersById; + private ConcurrentHashMap hosts; + private ConcurrentHashMap> hostClusterMap; + private ConcurrentHashMap> clusterHostMap; + + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock r = rwl.readLock(); + private final Lock w = rwl.writeLock(); + + volatile boolean clustersLoaded = false; @Inject ClusterDAO clusterDAO; @@ -77,159 +86,194 @@ public class ClustersImpl implements Clu @Inject public ClustersImpl() { - clusters = new HashMap(); - clustersById = new HashMap(); - hosts = new HashMap(); - hostClusterMap = new HashMap>(); - clusterHostMap = new HashMap>(); + clusters = new ConcurrentHashMap(); + clustersById = new ConcurrentHashMap(); + hosts = new ConcurrentHashMap(); + hostClusterMap = new ConcurrentHashMap>(); + clusterHostMap = new ConcurrentHashMap>(); + LOG.info("Initializing the ClustersImpl"); } + @Transactional + void loadClustersAndHosts() { + if (!clustersLoaded) { + w.lock(); + try { + if (!clustersLoaded) { + for (ClusterEntity clusterEntity : clusterDAO.findAll()) { + Cluster currentCluster = clusterFactory.create(clusterEntity); + clusters.put(clusterEntity.getClusterName(), currentCluster); + clustersById.put(currentCluster.getClusterId(), currentCluster); + clusterHostMap.put(currentCluster.getClusterName(), Collections.newSetFromMap(new ConcurrentHashMap())); + } + + for (HostEntity hostEntity : hostDAO.findAll()) { + Host host = hostFactory.create(hostEntity, true); + hosts.put(hostEntity.getHostName(), host); + Set cSet = Collections.newSetFromMap(new ConcurrentHashMap()); + hostClusterMap.put(hostEntity.getHostName(), cSet); + + for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) { + clusterHostMap.get(clusterEntity.getClusterName()).add(host); + cSet.add(clusters.get(clusterEntity.getClusterName())); + } + } + } + clustersLoaded = true; + } finally { + w.unlock(); + } + } + } + @Override - public synchronized void addCluster(String clusterName) + public void addCluster(String clusterName) throws AmbariException { + loadClustersAndHosts(); + if (clusters.containsKey(clusterName)) { throw new DuplicateResourceException("Attempted to create a Cluster which already exists" + ", clusterName=" + clusterName); } - // retrieve new cluster id - // add cluster id -> cluster mapping into clustersById - ClusterEntity clusterEntity = new ClusterEntity(); - clusterEntity.setClusterName(clusterName); - clusterEntity.setDesiredStackVersion(gson.toJson(new StackId())); + w.lock(); try { - clusterDAO.create(clusterEntity); - clusterEntity = clusterDAO.merge(clusterEntity); - Cluster cluster = clusterFactory.create(clusterEntity); + if (clusters.containsKey(clusterName)) { + throw new DuplicateResourceException("Attempted to create a Cluster which already exists" + + ", clusterName=" + clusterName); + } + // retrieve new cluster id + // add cluster id -> cluster mapping into clustersById + ClusterEntity clusterEntity = new ClusterEntity(); + clusterEntity.setClusterName(clusterName); + clusterEntity.setDesiredStackVersion(gson.toJson(new StackId())); + try { + clusterDAO.create(clusterEntity); + clusterEntity = clusterDAO.merge(clusterEntity); + } catch (RollbackException e) { + LOG.warn("Unable to create cluster " + clusterName, e); + throw new AmbariException("Unable to create cluster " + clusterName, e); + } + + Cluster cluster = clusterFactory.create(clusterEntity); clusters.put(clusterName, cluster); clustersById.put(cluster.getClusterId(), cluster); clusterHostMap.put(clusterName, new HashSet()); - } catch (RollbackException e) { - LOG.warn("Unable to create cluster " + clusterName, e); - throw new AmbariException("Unable to create cluster " + clusterName, e); + } finally { + w.unlock(); } } @Override - @Transactional - public synchronized Cluster getCluster(String clusterName) + public Cluster getCluster(String clusterName) throws AmbariException { - if (!clusters.containsKey(clusterName)) { - ClusterEntity clusterEntity = clusterDAO.findByName(clusterName); - if (clusterEntity != null) { - Cluster cl = getClusterById(clusterEntity.getClusterId()); - clustersById.put(cl.getClusterId(), cl); - clusters.put(cl.getClusterName(), cl); - if (!clusterHostMap.containsKey(clusterEntity.getClusterName())) - clusterHostMap.put(clusterEntity.getClusterName(), new HashSet()); - } else { + loadClustersAndHosts(); + r.lock(); + try { + if (!clusters.containsKey(clusterName)) { throw new ClusterNotFoundException(clusterName); } + return clusters.get(clusterName); + } finally { + r.unlock(); } - return clusters.get(clusterName); } @Override - @Transactional - public synchronized Cluster getClusterById(long id) throws AmbariException { - if (!clustersById.containsKey(id)) { - ClusterEntity clusterEntity = clusterDAO.findById(id); - if (clusterEntity != null) { - Cluster cluster = clusterFactory.create(clusterEntity); - clustersById.put(cluster.getClusterId(), cluster); - clusters.put(clusterEntity.getClusterName(), cluster); - if (!clusterHostMap.containsKey(clusterEntity.getClusterName())) - clusterHostMap.put(clusterEntity.getClusterName(), new HashSet()); - } else { + public Cluster getClusterById(long id) throws AmbariException { + loadClustersAndHosts(); + r.lock(); + try { + if (!clustersById.containsKey(id)) { throw new ClusterNotFoundException("clusterID=" + id); } + return clustersById.get(id); + } finally { + r.unlock(); } - return clustersById.get(id); } @Override @Transactional - public synchronized List getHosts() { - List hostList = new ArrayList(hosts.size()); - hostList.addAll(hosts.values()); - - for (HostEntity hostEntity : hostDAO.findAll()) { - if (!hosts.containsKey(hostEntity.getHostName())) { - try { - hostList.add(getHost(hostEntity.getHostName())); - } catch (AmbariException ignored) { - LOG.error("Database externally modified?"); - } - } - } + public List getHosts() { + loadClustersAndHosts(); + r.lock(); - return hostList; + try { + List hostList = new ArrayList(hosts.size()); + hostList.addAll(hosts.values()); + return hostList; + } finally { + r.unlock(); + } } @Override - public synchronized Set getClustersForHost(String hostname) + public Set getClustersForHost(String hostname) throws AmbariException { - if (!hostClusterMap.containsKey(hostname)) { - getHost(hostname); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Looking up clusters for hostname" - + ", hostname=" + hostname - + ", mappedClusters=" + hostClusterMap.get(hostname).size()); + loadClustersAndHosts(); + r.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking up clusters for hostname" + + ", hostname=" + hostname + + ", mappedClusters=" + hostClusterMap.get(hostname).size()); + } + return Collections.unmodifiableSet(hostClusterMap.get(hostname)); + } finally { + r.unlock(); } - return Collections.unmodifiableSet(hostClusterMap.get(hostname)); } @Override @Transactional - public synchronized Host getHost(String hostname) throws AmbariException { - if (!hosts.containsKey(hostname)) { - HostEntity hostEntity = hostDAO.findByName(hostname); - if (hostEntity != null) { - Host host = hostFactory.create(hostEntity, true); - Set cSet = new HashSet(); - hosts.put(hostname, host); - hostClusterMap.put(hostname, cSet); - - for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) { - if (clustersById.containsKey(clusterEntity.getClusterId())) { - cSet.add(clustersById.get(clusterEntity.getClusterId())); - } else { - cSet.add(getClusterById(clusterEntity.getClusterId())); - } - } - } else { + public Host getHost(String hostname) throws AmbariException { + loadClustersAndHosts(); + r.lock(); + try { + if (!hosts.containsKey(hostname)) { throw new HostNotFoundException(hostname); } + return hosts.get(hostname); + } finally { + r.unlock(); } - return hosts.get(hostname); } @Override - public synchronized void addHost(String hostname) throws AmbariException { + public void addHost(String hostname) throws AmbariException { + loadClustersAndHosts(); + String duplicateMessage = "Duplicate entry for Host" + + ", hostName= " + hostname; + if (hosts.containsKey(hostname)) { - throw new AmbariException("Duplicate entry for Host" - + ", hostName= " + hostname); + throw new AmbariException(duplicateMessage); } - HostEntity hostEntity = new HostEntity(); - hostEntity.setHostName(hostname); - hostEntity.setClusterEntities(new ArrayList()); - //not stored to DB - Host host = hostFactory.create(hostEntity, false); - host.setAgentVersion(new AgentVersion("")); - List emptyDiskList = new ArrayList(); - host.setDisksInfo(emptyDiskList); - host.setHealthStatus(new HostHealthStatus(HealthStatus.UNKNOWN, "")); - host.setHostAttributes(new HashMap()); - host.setState(HostState.INIT); - - hosts.put(hostname, host); - hostClusterMap.put(hostname, new HashSet()); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a host to Clusters" - + ", hostname=" + hostname); + r.lock(); + + try { + HostEntity hostEntity = new HostEntity(); + hostEntity.setHostName(hostname); + hostEntity.setClusterEntities(new ArrayList()); + //not stored to DB + Host host = hostFactory.create(hostEntity, false); + host.setAgentVersion(new AgentVersion("")); + List emptyDiskList = new ArrayList(); + host.setDisksInfo(emptyDiskList); + host.setHealthStatus(new HostHealthStatus(HealthStatus.UNKNOWN, "")); + host.setHostAttributes(new HashMap()); + host.setState(HostState.INIT); + hosts.put(hostname, host); + hostClusterMap.put(hostname, Collections.newSetFromMap(new ConcurrentHashMap())); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a host to Clusters" + + ", hostname=" + hostname); + } + } finally { + r.unlock(); } } @@ -244,46 +288,46 @@ public class ClustersImpl implements Clu } @Override - public synchronized void mapHostToCluster(String hostname, - String clusterName) throws AmbariException { - Cluster cluster = getCluster(clusterName); - HostImpl host = (HostImpl) getHost(hostname); + public void mapHostToCluster(String hostname, + String clusterName) throws AmbariException { + loadClustersAndHosts(); + w.lock(); - if (!hostClusterMap.containsKey(hostname)) { - throw new HostNotFoundException(hostname); - } + try { + Host host = getHost(hostname); + Cluster cluster = getCluster(clusterName); - for (Cluster c : hostClusterMap.get(hostname)) { - if (c.getClusterName().equals(clusterName)) { - throw new DuplicateResourceException("Attempted to create a host which already exists: clusterName=" + - clusterName + ", hostName=" + hostname); + for (Cluster c : hostClusterMap.get(hostname)) { + if (c.getClusterName().equals(clusterName)) { + throw new DuplicateResourceException("Attempted to create a host which already exists: clusterName=" + + clusterName + ", hostName=" + hostname); + } } - } - if (!isOsSupportedByClusterStack(cluster, host)) { - String message = "Trying to map host to cluster where stack does not" - + " support host's os type" - + ", clusterName=" + clusterName - + ", clusterStackId=" + cluster.getDesiredStackVersion().getStackId() - + ", hostname=" + hostname - + ", hostOsType=" + host.getOsType(); - LOG.warn(message); - throw new AmbariException(message); - } - - mapHostClusterEntities(hostname, cluster.getClusterId()); + if (!isOsSupportedByClusterStack(cluster, host)) { + String message = "Trying to map host to cluster where stack does not" + + " support host's os type" + + ", clusterName=" + clusterName + + ", clusterStackId=" + cluster.getDesiredStackVersion().getStackId() + + ", hostname=" + hostname + + ", hostOsType=" + host.getOsType(); + LOG.warn(message); + throw new AmbariException(message); + } - hostClusterMap.get(hostname).add(cluster); - clusterHostMap.get(clusterName).add(host); + mapHostClusterEntities(hostname, cluster.getClusterId()); - cluster.refresh(); - host.refresh(); + hostClusterMap.get(hostname).add(cluster); + clusterHostMap.get(clusterName).add(host); - if (LOG.isDebugEnabled()) { - LOG.debug("Mapping a host to a cluster" - + ", clusterName=" + clusterName - + ", clusterId=" + cluster.getClusterId() - + ", hostname=" + hostname); + if (LOG.isDebugEnabled()) { + LOG.debug("Mapping a host to a cluster" + + ", clusterName=" + clusterName + + ", clusterId=" + cluster.getClusterId() + + ", hostname=" + hostname); + } + } finally { + w.unlock(); } } @@ -301,81 +345,105 @@ public class ClustersImpl implements Clu @Override @Transactional - public synchronized Map getClusters() { - for (ClusterEntity clusterEntity : clusterDAO.findAll()) { - try { - if (!clustersById.containsKey(clusterEntity.getClusterId())) { - getClusterById(clusterEntity.getClusterId()); - } - } catch (AmbariException ignored) { - - } + public Map getClusters() { + loadClustersAndHosts(); + r.lock(); + try { + return Collections.unmodifiableMap(clusters); + } finally { + r.unlock(); } - return Collections.unmodifiableMap(clusters); } @Override - public synchronized void mapHostsToCluster(Set hostnames, - String clusterName) throws AmbariException { - for (String hostname : hostnames) { - mapHostToCluster(hostname, clusterName); + public void mapHostsToCluster(Set hostnames, + String clusterName) throws AmbariException { + loadClustersAndHosts(); + w.lock(); + try { + for (String hostname : hostnames) { + mapHostToCluster(hostname, clusterName); + } + } finally { + w.unlock(); } } @Override - public synchronized void updateClusterName(String oldName, String newName) { - clusters.put(newName, clusters.remove(oldName)); + public void updateClusterName(String oldName, String newName) { + w.lock(); + try { + clusters.put(newName, clusters.remove(oldName)); + clusterHostMap.put(newName, clusterHostMap.remove(oldName)); + } finally { + w.unlock(); + } } public void debugDump(StringBuilder sb) { - sb.append("Clusters=[ "); - boolean first = true; - for(Cluster c : clusters.values()) { - if (!first) { - sb.append(" , "); - first = false; - } - sb.append("\n "); - c.debugDump(sb); - sb.append(" "); + r.lock(); + try { + sb.append("Clusters=[ "); + boolean first = true; + for (Cluster c : clusters.values()) { + if (!first) { + sb.append(" , "); + first = false; + } + sb.append("\n "); + c.debugDump(sb); + sb.append(" "); + } + sb.append(" ]"); + } finally { + r.unlock(); } - sb.append(" ]"); } @Override @Transactional public Map getHostsForCluster(String clusterName) throws AmbariException { + loadClustersAndHosts(); + r.lock(); - getCluster(clusterName); + try { + Map hosts = new HashMap(); - Map hosts = new HashMap(); + for (Host h : clusterHostMap.get(clusterName)) { + hosts.put(h.getHostName(), h); + } - for (Host h : clusterHostMap.get(clusterName)) { - hosts.put(h.getHostName(), h); + return hosts; + } finally { + r.unlock(); } - - return hosts; } @Override public synchronized void deleteCluster(String clusterName) throws AmbariException { - Cluster cluster = getCluster(clusterName); - if (!cluster.canBeRemoved()) { - throw new AmbariException("Could not delete cluster" - + ", clusterName=" + clusterName); - } - LOG.info("Deleting cluster "+ cluster.getClusterName()); - cluster.delete(); + loadClustersAndHosts(); + w.lock(); + try { + Cluster cluster = getCluster(clusterName); + if (!cluster.canBeRemoved()) { + throw new AmbariException("Could not delete cluster" + + ", clusterName=" + clusterName); + } + LOG.info("Deleting cluster " + cluster.getClusterName()); + cluster.delete(); - //clear maps - for (Set clusterSet : hostClusterMap.values()) { - clusterSet.remove(cluster); + //clear maps + for (Set clusterSet : hostClusterMap.values()) { + clusterSet.remove(cluster); + } + clusterHostMap.remove(cluster.getClusterName()); + clusters.remove(clusterName); + } finally { + w.unlock(); } - clusterHostMap.remove(cluster.getClusterName()); - clusters.remove(clusterName); } } Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java Wed Mar 20 20:44:43 2013 @@ -61,6 +61,7 @@ public class HostImpl implements Host { private static final Type hostAttributesType = new TypeToken>() {}.getType(); + ReadWriteLock rwLock; private final Lock readLock; private final Lock writeLock; @@ -185,7 +186,7 @@ public class HostImpl implements Host { public HostImpl(@Assisted HostEntity hostEntity, @Assisted boolean persisted, Injector injector) { this.stateMachine = stateMachineFactory.make(this); - ReadWriteLock rwLock = new ReentrantReadWriteLock(); + rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -961,8 +962,8 @@ public class HostImpl implements Host { */ @Override public void persist() { + writeLock.lock(); try { - writeLock.lock(); if (!persisted) { persistEntities(); refresh(); @@ -984,7 +985,7 @@ public class HostImpl implements Host { } @Transactional - protected void persistEntities() { + void persistEntities() { hostDAO.create(hostEntity); hostStateDAO.create(hostStateEntity); if (!hostEntity.getClusterEntities().isEmpty()) { @@ -998,8 +999,8 @@ public class HostImpl implements Host { @Override @Transactional public void refresh() { + writeLock.lock(); try { - writeLock.lock(); if (isPersisted()) { hostEntity = hostDAO.findByName(hostEntity.getHostName()); hostStateEntity = hostEntity.getHostStateEntity(); @@ -1012,7 +1013,7 @@ public class HostImpl implements Host { } @Transactional - private void saveIfPersisted() { + void saveIfPersisted() { if (isPersisted()) { hostDAO.merge(hostEntity); hostStateDAO.merge(hostStateEntity); Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java Wed Mar 20 20:44:43 2013 @@ -987,13 +987,6 @@ public class ServiceComponentHostImpl im try { writeLock.lock(); - Set deletedTypes = new HashSet(); - for (String type : this.desiredConfigs.keySet()) { - if (!configs.containsKey(type)) { - deletedTypes.add(type); - } - } - for (Entry entry : configs.entrySet()) { boolean contains = false; @@ -1022,34 +1015,6 @@ public class ServiceComponentHostImpl im this.desiredConfigs.put(entry.getKey(), entry.getValue().getVersionTag()); } - if (!deletedTypes.isEmpty()) { - if (persisted) { - List deleteEntities = - hostComponentDesiredConfigMappingDAO.findByHostComponentAndType( - stateEntity.getClusterId(), stateEntity.getServiceName(), - stateEntity.getComponentName(), - stateEntity.getHostName(), deletedTypes); - for (HostComponentDesiredConfigMappingEntity deleteEntity : deleteEntities) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting desired config to ServiceComponentHost" - + ", clusterId=" + stateEntity.getClusterId() - + ", serviceName=" + stateEntity.getServiceName() - + ", componentName=" + stateEntity.getComponentName() - + ", hostname=" + stateEntity.getHostName() - + ", configType=" + deleteEntity.getConfigType() - + ", configVersionTag=" + deleteEntity.getVersionTag()); - } - desiredStateEntity.getHostComponentDesiredConfigMappingEntities().remove( - deleteEntity); - hostComponentDesiredConfigMappingDAO.remove(deleteEntity); - } - } else { - for (String deletedType : deletedTypes) { - desiredConfigs.remove(deletedType); - } - } - } - saveIfPersisted(); } finally { Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/python/ambari-server.py URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/python/ambari-server.py?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/python/ambari-server.py (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/python/ambari-server.py Wed Mar 20 20:44:43 2013 @@ -45,6 +45,7 @@ START_ACTION = "start" STOP_ACTION = "stop" RESET_ACTION = "reset" UPGRADE_ACTION = "upgrade" +UPGRADE_STACK_ACTION = "upgradestack" # selinux commands GET_SE_LINUX_ST_CMD = "/usr/sbin/sestatus" @@ -65,13 +66,17 @@ IP_TBLS_SRVC_NT_FND = "iptables: unrecog ambari_provider_module_option = "" ambari_provider_module = os.environ.get('AMBARI_PROVIDER_MODULE') +# constants +STACK_NAME_VER_SEP = "-" + if ambari_provider_module is not None: ambari_provider_module_option = "-Dprovider.module.class=" +\ ambari_provider_module + " " SERVER_START_CMD="{0}" + os.sep + "bin" + os.sep +\ - "java -server -XX:NewRatio=2 "\ + "java -server -XX:NewRatio=3 "\ "-XX:+UseConcMarkSweepGC " +\ + "-XX:-UseGCOverheadLimit -XX:CMSInitiatingOccupancyFraction=60 " +\ ambari_provider_module_option +\ os.getenv('AMBARI_JVM_ARGS','-Xms512m -Xmx2048m') +\ " -cp {1}"+ os.pathsep + "{2}" +\ @@ -96,6 +101,8 @@ AMBARI_PROPERTIES_FILE="ambari.propertie SETUP_DB_CMD = ['su', 'postgres', '--command=psql -f {0} -v username=\'"{1}"\' -v password="\'{2}\'"'] +UPGRADE_STACK_CMD = ['su', 'postgres', + '--command=psql -f {0} -v stack_name="\'{1}\'" -v stack_version="\'{2}\'"'] PG_ST_CMD = "/sbin/service postgresql status" PG_START_CMD = "/sbin/service postgresql start" PG_RESTART_CMD = "/sbin/service postgresql restart" @@ -230,8 +237,6 @@ def write_property(key, value): return 0 - - def setup_db(args): #password access to ambari-server and mapred configure_postgres_username_password(args) @@ -247,12 +252,24 @@ def setup_db(args): return retcode +def execute_db_script(args, file): + #password access to ambari-server and mapred + configure_postgres_username_password(args) + dbname = args.postgredbname + username = args.postgres_username + password = args.postgres_password + command = SETUP_DB_CMD[:] + command[-1] = command[-1].format(file, username, password) + retcode, outdata, errdata = run_os_command(command) + if not retcode == 0: + print errdata + return retcode + -def upgrade_db(args): +def check_db_consistency(args, file): #password access to ambari-server and mapred configure_postgres_username_password(args) dbname = args.postgredbname - file = args.upgrade_script_file username = args.postgres_username password = args.postgres_password command = SETUP_DB_CMD[:] @@ -260,6 +277,27 @@ def upgrade_db(args): retcode, outdata, errdata = run_os_command(command) if not retcode == 0: print errdata + return retcode + else: + # Assumes that the output is of the form ...\n + print_info_msg("Parsing output: " + outdata) + lines = outdata.splitlines() + if (lines[-1] == '3'): + return 0 + return -1 + + +def upgrade_stack(args, stack_id): + #password access to ambari-server and mapred + configure_postgres_username_password(args) + dbname = args.postgredbname + file = args.upgrade_stack_script_file + stack_name, stack_version = stack_id.split(STACK_NAME_VER_SEP) + command = UPGRADE_STACK_CMD[:] + command[-1] = command[-1].format(file, stack_name, stack_version) + retcode, outdata, errdata = run_os_command(command) + if not retcode == 0: + print errdata return retcode # @@ -420,7 +458,7 @@ def download_jdk(args): try: jdk_url = properties['jdk.url'] - resources_dir = properties['resources.dir'] + resources_dir = properties['resources.dir'] except (KeyError), e: print 'Property ' + str(e) + ' is not defined at ' + conf_file return -1 @@ -432,7 +470,7 @@ def download_jdk(args): #Get Header from url,to get file size then retcode, out, err = run_os_command(size_command) if out.find("Content-Length") == -1: - print "Request headr doesn't contain Content-Length"; + print "Request header doesn't contain Content-Length"; return -1 start_with = int(out.find("Content-Length") + len("Content-Length") + 2) end_with = out.find("\r\n", start_with) @@ -458,7 +496,43 @@ def download_jdk(args): return -1 else: print "JDK already exists using " + dest_file + + try: + out = install_jdk(dest_file) + jdk_version = re.search('Creating (jdk.*)/jre', out).group(1) + except Exception, e: + print "Installation of JDK was failed: %s\n" % e.message + file_exists = os.path.isfile(dest_file) + if file_exists: + ok = get_YN_input("JDK found at "+dest_file+". " + "Would you like to re-download the JDK [y/n] (y)? ", True) + if (ok == False): + print "Unable to install JDK. Please remove JDK file found at "+ dest_file +" and re-run Ambari Server setup" + return -1 + else: + track_jdk(JDK_LOCAL_FILENAME, jdk_url, dest_file) + print 'Successfully re-downloaded JDK distribution to ' + dest_file + try: + out = install_jdk(dest_file) + jdk_version = re.search('Creating (jdk.*)/jre', out).group(1) + except Exception, e: + print "Installation of JDK was failed: %s\n" % e.message + print "Unable to install JDK. Please remove JDK, file found at "+ dest_file +" and re-run Ambari Server setup" + return -1 + + else: + print "Unable to install JDK. File "+ dest_file +"does not exist, please re-run Ambari Server setup" + return -1 + + print "Successfully installed JDK to {0}/{1}".\ + format(JDK_INSTALL_DIR, jdk_version) + write_property(JAVA_HOME_PROPERTY, "{0}/{1}". + format(JDK_INSTALL_DIR, jdk_version)) + return 0 +class RetCodeException(Exception): pass + +def install_jdk(dest_file): ok = get_YN_input("To install the Oracle JDK you must accept the " "license terms found at " "http://www.oracle.com/technetwork/java/javase/" @@ -475,14 +549,9 @@ def download_jdk(args): retcode, out, err = run_os_command(MAKE_FILE_EXECUTABLE_CMD.format(dest_file)) retcode, out, err = run_os_command(dest_file + ' -noregister') os.chdir(savedPath) - jdk_version = re.search('Creating (jdk.*)/jre', out).group(1) - print "Successfully installed JDK to {0}/{1}".\ - format(JDK_INSTALL_DIR, jdk_version) - write_property(JAVA_HOME_PROPERTY, "{0}/{1}". - format(JDK_INSTALL_DIR, jdk_version)) - return 0 - - + if (retcode != 0): + raise RetCodeException("Installation JDK returned code %s" % retcode) + return out def get_postgre_status(): retcode, out, err = run_os_command(PG_ST_CMD) @@ -753,19 +822,33 @@ def stop(args): # Upgrades the Ambari Server. # def upgrade(args): - print 'Checking PostgreSQL...' retcode = check_postgre_up() if not retcode == 0: - printErrorMsg ('PostgreSQL server not running. Exiting') + printErrorMsg('PostgreSQL server not running. Exiting') sys.exit(retcode) + file = args.upgrade_script_file print 'Upgrading database...' - retcode = upgrade_db(args) + retcode = execute_db_script(args, file) if not retcode == 0: - printErrorMsg ('Database upgrade script has failed. Exiting.') + printErrorMsg('Database upgrade script has failed. Exiting.') sys.exit(retcode) + print 'Checking database integrity...' + check_file = file[:-3] + "Check" + file[-4:] + retcode = check_db_consistency(args, check_file) + + if not retcode == 0: + print 'Found inconsistency. Trying to fix...' + fix_file = file[:-3] + "Fix" + file[-4:] + retcode = execute_db_script(args, fix_file) + + if not retcode == 0: + printErrorMsg('Database cannot be fixed. Exiting.') + sys.exit(retcode) + else: + print 'Database is consistent.' print "Ambari Server 'upgrade' finished successfully" # @@ -877,7 +960,7 @@ def configure_postgres_username_password try: properties.load(open(conf_file)) except Exception, e: - print 'Could not read "%s": %s' % (conf_file, e) + print 'Could not read ambari config file "%s": %s' % (conf_file, e) return -1 username = properties[JDBC_USER_NAME_PROPERTY] @@ -928,7 +1011,7 @@ def configure_postgres_username_password # Main. # def main(): - parser = optparse.OptionParser(usage="usage: %prog [options] action",) + parser = optparse.OptionParser(usage="usage: %prog [options] action [stack_id]",) parser.add_option('-d', '--postgredbname', default='ambari', help="Database name in postgresql") parser.add_option('-f', '--init-script-file', @@ -941,8 +1024,12 @@ def main(): help="File with drop script") parser.add_option('-u', '--upgrade-script-file', default="/var/lib/" "ambari-server/resources/upgrade/ddl/" - "Ambari-DDL-Postgres-UPGRADE-1.2.1.sql", + "Ambari-DDL-Postgres-UPGRADE-1.2.2.sql", help="File with upgrade script") + parser.add_option('-t', '--upgrade-stack-script-file', default="/var/lib/" + "ambari-server/resources/upgrade/dml/" + "Ambari-DML-Postgres-UPGRADE_STACK.sql", + help="File with stack upgrade script") parser.add_option('-j', '--java-home', default=None, help="Use specified java_home. Must be valid on all hosts") parser.add_option("-v", "--verbose", @@ -962,12 +1049,23 @@ def main(): global SILENT SILENT = options.silent - if not len(args) == 1: + + + if len(args) == 0: print parser.print_help() - parser.error("Invalid number of arguments") + parser.error("No action entered") action = args[0] + if action == UPGRADE_STACK_ACTION: + args_number_required = 2 + else: + args_number_required = 1 + + if len(args) < args_number_required: + print parser.print_help() + parser.error("Invalid number of arguments. Entered: " + str(len(args)) + ", required: " + str(args_number_required)) + if action == SETUP_ACTION: setup(options) elif action == START_ACTION: @@ -978,6 +1076,9 @@ def main(): reset(options) elif action == UPGRADE_ACTION: upgrade(options) + elif action == UPGRADE_STACK_ACTION: + stack_id = args[1] + upgrade_stack(options, stack_id) else: parser.error("Invalid action") Modified: incubator/ambari/branches/branch-1.2/ambari-server/src/main/resources/META-INF/persistence.xml URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-server/src/main/resources/META-INF/persistence.xml?rev=1459041&r1=1459040&r2=1459041&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2/ambari-server/src/main/resources/META-INF/persistence.xml (original) +++ incubator/ambari/branches/branch-1.2/ambari-server/src/main/resources/META-INF/persistence.xml Wed Mar 20 20:44:43 2013 @@ -12,87 +12,58 @@ - + + org.eclipse.persistence.jpa.PersistenceProvider org.apache.ambari.server.orm.entities.ClusterEntity - org.apache.ambari.server.orm.entities.ClusterConfigEntity - - org.apache.ambari.server.orm.entities.ClusterServiceEntity - - org.apache.ambari.server.orm.entities.ClusterStateEntity - - org.apache.ambari.server.orm.entities.ComponentConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentDesiredConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity - - org.apache.ambari.server.orm.entities.HostComponentStateEntity - + org.apache.ambari.server.orm.entities.ClusterConfigEntity + org.apache.ambari.server.orm.entities.ClusterServiceEntity + org.apache.ambari.server.orm.entities.ClusterStateEntity + org.apache.ambari.server.orm.entities.ComponentConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentDesiredConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity + org.apache.ambari.server.orm.entities.HostComponentStateEntity org.apache.ambari.server.orm.entities.HostEntity org.apache.ambari.server.orm.entities.HostStateEntity - org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity - - org.apache.ambari.server.orm.entities.ServiceConfigMappingEntity - - org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity - + org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity + org.apache.ambari.server.orm.entities.ServiceConfigMappingEntity + org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity org.apache.ambari.server.orm.entities.RoleEntity org.apache.ambari.server.orm.entities.UserEntity - org.apache.ambari.server.orm.entities.ExecutionCommandEntity - - org.apache.ambari.server.orm.entities.HostRoleCommandEntity - - org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity - + org.apache.ambari.server.orm.entities.ExecutionCommandEntity + org.apache.ambari.server.orm.entities.HostRoleCommandEntity + org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity org.apache.ambari.server.orm.entities.StageEntity org.apache.ambari.server.orm.entities.KeyValueEntity - - - - + - + + org.eclipse.persistence.jpa.PersistenceProvider org.apache.ambari.server.orm.entities.ClusterEntity - org.apache.ambari.server.orm.entities.ClusterConfigEntity - - org.apache.ambari.server.orm.entities.ClusterServiceEntity - - org.apache.ambari.server.orm.entities.ClusterStateEntity - - org.apache.ambari.server.orm.entities.ComponentConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentDesiredConfigMappingEntity - - org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity - - org.apache.ambari.server.orm.entities.HostComponentStateEntity - + org.apache.ambari.server.orm.entities.ClusterConfigEntity + org.apache.ambari.server.orm.entities.ClusterServiceEntity + org.apache.ambari.server.orm.entities.ClusterStateEntity + org.apache.ambari.server.orm.entities.ComponentConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentDesiredConfigMappingEntity + org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity + org.apache.ambari.server.orm.entities.HostComponentStateEntity org.apache.ambari.server.orm.entities.HostEntity org.apache.ambari.server.orm.entities.HostStateEntity - org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity - - org.apache.ambari.server.orm.entities.ServiceConfigMappingEntity - - org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity - + org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity + org.apache.ambari.server.orm.entities.ServiceConfigMappingEntity + org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity org.apache.ambari.server.orm.entities.RoleEntity org.apache.ambari.server.orm.entities.UserEntity - org.apache.ambari.server.orm.entities.ExecutionCommandEntity - - org.apache.ambari.server.orm.entities.HostRoleCommandEntity - - org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity - + org.apache.ambari.server.orm.entities.ExecutionCommandEntity + org.apache.ambari.server.orm.entities.HostRoleCommandEntity + org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity org.apache.ambari.server.orm.entities.StageEntity org.apache.ambari.server.orm.entities.KeyValueEntity @@ -102,11 +73,8 @@ - - - - - + +