incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1150543 - /incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
Date Mon, 25 Jul 2011 04:42:11 GMT
Author: stroucki
Date: Mon Jul 25 04:42:11 2011
New Revision: 1150543

URL: http://svn.apache.org/viewvc?rev=1150543&view=rev
Log:
clustermanagerservice: reorganize cluster management aspect of CM. Collect "decay" handlers
into one spot.
                       Return success or failure value to scheduler

Modified:
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1150543&r1=1150542&r2=1150543&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Mon Jul 25 04:42:11
2011
@@ -42,9 +42,9 @@ class ClusterManagerService(object):
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
-		self.lastContacted = {}
-		self.decayedHosts = {}
-		self.decayedInstances = {}
+		self.hostLastContactTime = {}
+		#self.hostLastUpdateTime = {}
+		self.instanceLastContactTime = {}
 		self.expireHostTime = float(self.config.get('ClusterManagerService', 'expireHostTime'))
 		self.allowDecayed = float(self.config.get('ClusterManagerService', 'allowDecayed'))
 		self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
@@ -64,101 +64,161 @@ class ClusterManagerService(object):
 			host.up = False
 			host.decayed = False
 			self.data.releaseHost(host)
-		self.decayLock = threading.Lock()
-		threading.Thread(target=self.monitorHosts).start()
+		threading.Thread(target=self.monitorCluster).start()
 
 	def stateTransition(self, instance, old, cur):
+		if cur == InstanceState.Running:
+			self.log.exception("%d was made running here" % instance.id)
 		if (old and instance.state != old):
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it
is %s" % (vmStates[old], vmStates[instance.state])})
 		instance.state = cur
 
-	def updateDecay(self, set, obj):
+	def __downHost(self, host):
+		self.log.warning('Host %s is down' % (host.name))
+		host.up = False
+		host.decayed = False
+
+		self.__orphanInstances(host)
+
+	def __upHost(self, host):
+		self.log.warning('Host %s is up' % (host.name))
+		host.up = True
+		host.decayed = True
+
+	def __orphanInstances(self, host):
+		# expects lock to be held on host
+		instances = [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId
== host.id]
+
+		for instanceId in instances:
+			instance = self.data.acquireInstance(instanceId)
+			if instance.hostId == host.id:
+				instance.decayed = True
+				self.stateTransition(instance, None, InstanceState.Orphaned)
+
+			self.data.releaseInstance(instance)
+
+	def __checkHosts(self):
+		# Check if hosts have been heard from recently
+		# Otherwise, see if it is alive
+
 		now = time.time()
-		self.decayLock.acquire()
-		if (obj.decayed and obj.id not in set):
-			set[obj.id] = now
-		elif (not obj.decayed and obj.id in set):
-			del set[obj.id]
-		self.decayLock.release()
+		for hostId in self.hostLastContactTime.keys():
+			if (self.hostLastContactTime[hostId] < (now-self.expireHostTime)):
+				host = self.data.acquireHost(hostId)
+				string = None
+				try:
+					string = self.proxy[host.name].liveCheck()
+				except:
+					pass
+
+				if string != "alive":
+					self.__downHost(host)
+					del self.hostLastContactTime[hostId]
+				else:
+					self.__upHost(host)
+					self.hostLastContactTime[hostId] = now
+
+				self.data.releaseHost(host)
+
+	def __checkInstances(self):
+		# Reconcile instances with nodes
+
+		# obtain a list of instances I know about
+		myInstancesError = False
+		try:
+			myInstances = self.data.getInstances()
+		except:
+			myInstancesError = True
+			self.log.warning('Failure communicating with my database')
+
+		if myInstancesError == True:
+			return
+
+		now = time.time()
+
+		# iterate through all hosts I believe are up
+		for hostId in self.hostLastContactTime.keys():
+			#self.log.warning("iterate %d" % hostId)
+			host = self.data.acquireHost(hostId)
+			if (self.hostLastContactTime[hostId] < (now - self.allowDecayed)):
+				host.decayed = True
+
+				self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+				
+				myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
+
+				# get a list of VMs running on host
+				try:
+					hostProxy = self.proxy[host.name]
+					remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+				except:
+					self.log.warning('Failure getting instances from host %s' % (host.name))
+					self.data.releaseHost(host)
+					continue
+
+				# register instances I don't know about
+				for instance in remoteInstances:
+					if (instance.id not in myInstances):
+						instance.hostId = host.id
+						instance = self.data.registerInstance(instance)
+						self.data.releaseInstance(instance)
+				remoteInstanceIds = [i.id for i in remoteInstances]
+				# remove instances that shouldn't be running
+				for instance in myInstancesThisHost:
+					if (instance.id not in remoteInstanceIds):
+						instance = self.data.acquireInstance(instance.id)
+						# XXXstroucki destroy?
+						del self.instanceLastContactTime[instance.id]
+						self.data.removeInstance(instance)
+
+				self.hostLastContactTime[hostId] = now
+				host.decayed = False
+
+			self.data.releaseHost(host)
+			#self.log.warning("iterate %d done" % hostId)
 		
+		# iterate through all VMs I believe are active
+		for instanceId in self.instanceLastContactTime.keys():
+			instance = self.data.acquireInstance(instanceId)
+			if (self.instanceLastContactTime[instanceId] < (now - self.allowDecayed)):
+				instance.decayed = True
+				self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+				if instance.hostId is None: raise AssertionError
 
-	def monitorHosts(self):
-		# XXX: retry multiple hosts (iterate through them even with an exception)
+				# XXXstroucki check if host is down?
+				host = self.data.getHost(instance.hostId)
+
+				# get updated state on VM
+				try:
+					hostProxy = self.proxy[host.name]
+					newInstance = hostProxy.getVmInfo(instance.vmId)
+				except:
+					self.log.warning('Failure getting data for instance %s from host %s' % (instance.name,
host.name))
+					self.data.releaseInstance(instance)
+					continue
+
+				# replace existing state with new state
+				# XXXstroucki more?
+				instance.state = newInstance.state
+				self.instanceLastContactTime[instanceId] = now
+				instance.decayed = False
+
+			self.data.releaseInstance(instance)
+
+
+	def monitorCluster(self):
 		while True:
-			now = time.time()
 			sleepFor = min(self.expireHostTime, self.allowDecayed)
-			try:
-				for k in self.lastContacted.keys():
-					if (self.lastContacted[k] < (now-self.expireHostTime)):
-						host = self.data.acquireHost(k)
-						try: 
-							self.log.warning('Host %s has expired after %f seconds' % (host.name, self.expireHostTime))
-							for instanceId in [instance.id for instance in self.data.getInstances().itervalues()
if instance.hostId == host.id]:
-								instance = self.data.acquireInstance(instanceId)
-								instance.decayed = True
-								self.stateTransition(instance, None, InstanceState.Orphaned)
-								self.data.releaseInstance(instance)
-							host.up = False
-							host.decayed = False
-						finally:
-							self.data.releaseHost(host)
-						del self.lastContacted[k]
-					else:
-						sleepFor = min(self.lastContacted[k] + self.expireHostTime - now, sleepFor)
-				for hostId in self.decayedHosts.keys():
-					# XXXstroucki: what if the host undecays here?
-					# XXXstroucki: exceptions every so often; should get a lock
-					if (self.decayedHosts[hostId] < (now-self.allowDecayed)):
-						host = self.data.getHost(hostId)
-						self.log.warning('Fetching state from host %s because it is decayed' % (host.name))
-						hostProxy = self.proxy[host.name]
-						oldInstances = [i for i in self.data.getInstances().values() if i.hostId == host.id]
-						instances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
-						instanceIds = [i.id for i in instances]
-						for instance in instances:
-							if (instance.id not in self.data.getInstances()):
-								instance.hostId = host.id
-								instance = self.data.registerInstance(instance)
-								self.data.releaseInstance(instance)
-						for instance in oldInstances:
-							if (instance.id not in instanceIds):
-								instance = self.data.acquireInstance(instance.id)
-								self.data.removeInstance(instance)
-						self.decayedHosts[hostId] = now
-					else:
-						sleepFor = min(self.decayedHosts[hostId] + self.allowDecayed - now, sleepFor)
-				for instanceId in self.decayedInstances.keys():
-					try:
-						if (self.decayedInstances[instanceId] < (now-self.allowDecayed)):
-							self.log.warning('Fetching state on instance %d because it is decayed' % (instanceId))
-							try:
-								instance = self.data.getInstance(instanceId)
-								if instance.hostId is None: raise AssertionError
-							except TashiException, e:
-								if (e.errno == Errors.NoSuchInstanceId):
-									del self.decayedInstances[instanceId]
-									continue
-								else:
-									raise
-							host = self.data.getHost(instance.hostId)
-							hostProxy = self.proxy[host.name]
-							instance = hostProxy.getVmInfo(instance.vmId)
-							oldInstance = self.data.acquireInstance(instanceId)
-							oldInstance.state = instance.state
-							self.data.releaseInstance(oldInstance)
-							self.decayedInstances[instanceId] = now
-						else:
-							sleepFor = min(self.decayedInstances[instanceId] + self.allowDecayed - now, sleepFor)
-					except (KeyError, TashiException):
-						self.log.warning("Don't know about instance %d anymore." % instanceId)
-						self.data.removeInstance(instance)
-					except Exception, e:
-						self.log.exception('Exception in monitorHosts trying to get instance information')
-			except Exception, e:
-				self.log.exception('Exception in monitorHosts')
 
+			try:
+				self.__checkHosts()
+				self.__checkInstances()
+			except:
+				self.log.exception('monitorCluster iteration failed')
+			self.log.info("Sleeping for %d seconds" % sleepFor)
 			time.sleep(sleepFor)
-	
+
+
 	def normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
@@ -166,7 +226,7 @@ class ClusterManagerService(object):
 		instance.decayed = False
 		instance.name = scrubString(instance.name, allowed="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.")
 		instance.state = InstanceState.Pending
-		# At some point, check userId
+		# XXXstroucki At some point, check userId
 		if (not self.allowDuplicateNames):
 			for i in self.data.getInstances().itervalues():
 				if (i.name == instance.name):
@@ -181,7 +241,7 @@ class ClusterManagerService(object):
 			raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be
<= %d" % (self.maxMemory)})
 		# Make sure disk spec is valid
 		# Make sure network spec is valid
-		# Ignore hints
+		# Ignore internal hints
 		for hint in instance.hints:
 			if (hint.startswith("__")):
 				del instance.hints[hint]
@@ -225,7 +285,7 @@ class ClusterManagerService(object):
 					if hostname is not None:
 						self.proxy[hostname].destroyVm(instance.vmId)
 						self.data.releaseInstance(instance)
-				except Exception:
+				except:
 					self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
 					self.data.removeInstance(instance)
 
@@ -352,108 +412,73 @@ class ClusterManagerService(object):
 #	@timed
 	def registerNodeManager(self, host, instances):
 		"""Called by the NM every so often as a keep-alive/state polling -- state changes here
are NOT AUTHORITATIVE"""
+
+		# Handle a new registration
 		if (host.id == None):
 			hostList = [h for h in self.data.getHosts().itervalues() if h.name == host.name]
 			if (len(hostList) != 1):
 				raise TashiException(d={'errno':Errors.NoSuchHost, 'msg':'A host with name %s is not
identifiable' % (host.name)})
 			host.id = hostList[0].id
+
+		# Check if remote host information matches mine
 		oldHost = self.data.acquireHost(host.id)
 		if (oldHost.name != host.name):
 			self.data.releaseHost(oldHost)
 			raise TashiException(d={'errno':Errors.NoSuchHostId, 'msg':'Host id and hostname mismatch'})
-		try:
-			try:
-				self.lastContacted[host.id] = time.time()
-				oldHost.version = host.version
-				oldHost.memory = host.memory
-				oldHost.cores = host.cores
-				oldHost.up = True
-				oldHost.decayed = False
-
-# compare whether CM / NM versions are compatible
-				if (host.version != version and not self.allowMismatchedVersions):
-					oldHost.state = HostState.VersionMismatch
-				if (host.version == version and oldHost.state == HostState.VersionMismatch):
-					oldHost.state = HostState.Normal
-				for instance in instances:
-					string = ""
-					string = string + "id %d " % instance.id
-					string = string + "host %d " % host.id
-					string = string + "vmId %d " % instance.vmId
-					string = string + "user %d " % instance.userId
-					string = string + "cores %d " % instance.cores
-					string = string + "memory %d " % instance.memory
-					self.log.info('Accounting: ' + string)
-					#self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id,
host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
-					try:
-						oldInstance = self.data.acquireInstance(instance.id)
-					except TashiException, e:
-						if (e.errno == Errors.NoSuchInstanceId):
-							self.log.info('Host %s reported an instance %d that did not previously exist (decay)'
% (host.name, instance.id))
-							oldHost.decayed = True
-							continue
-							#oldInstance = self.data.registerInstance(instance)
-						else:
-							self.log.exception("failed to acquire instance")
-							raise
-
-					try:
-						if (oldInstance.hostId != host.id):
-							self.log.info('Host %s is claiming instance %d actually owned by hostId %s (decay)'
% (host.name, oldInstance.id, str(oldInstance.hostId)))
-							oldHost.decayed = True
-							continue
-						oldInstance.decayed = (oldInstance.state != instance.state)
-						self.updateDecay(self.decayedInstances, oldInstance)
-						if (oldInstance.decayed):
-							self.log.info('State reported as %s instead of %s for instance %d on host %s (decay)'
% (vmStates[instance.state], vmStates[oldInstance.state], instance.id, host.name))
-					finally:
-						self.data.releaseInstance(oldInstance)
-				instanceIds = [instance.id for instance in instances]
-				for instanceId in [instance.id for instance in self.data.getInstances().itervalues()
if instance.hostId == host.id]:
-					if (instanceId not in instanceIds):
-						self.log.info('instance %d was not reported by host %s as expected (decay)' % (instanceId,
host.name))
-						instance = self.data.acquireInstance(instanceId)
-						instance.decayed = True
-						self.updateDecay(self.decayedInstances, instance)
-						oldHost.decayed = True
-						self.data.releaseInstance(instance)
-			except Exception, e:
-				self.log.exception("Exception in RegisterNodeManager")
-				oldHost.decayed = True
-				raise
-		finally:
-			self.updateDecay(self.decayedHosts, oldHost)
-			self.data.releaseHost(oldHost)
 
+		if oldHost.up == False:
+			self.__upHost(oldHost)
+		self.hostLastContactTime[host.id] = time.time()
+		#self.hostLastUpdateTime[host.id] = time.time()
+		oldHost.version = host.version
+		oldHost.memory = host.memory
+		oldHost.cores = host.cores
+
+		# compare whether CM / NM versions are compatible
+		if (host.version != version and not self.allowMismatchedVersions):
+			oldHost.state = HostState.VersionMismatch
+		if (host.version == version and oldHost.state == HostState.VersionMismatch):
+			oldHost.state = HostState.Normal
+
+		# let the host communicate what it is running
+		for instance in instances:
+			self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id,
host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
+			self.instanceLastContactTime.setdefault(instance.id, 0)
+
+		self.data.releaseHost(oldHost)
 		return host.id
 	
 	def vmUpdate(self, instanceId, instance, oldState):
 		try:
 			oldInstance = self.data.acquireInstance(instanceId)
 		except TashiException, e:
-			self.data.releaseInstance(oldInstance)
+			# shouldn't have a lock to clean up after here
 			if (e.errno == Errors.NoSuchInstanceId):
-				self.log.exception('Got vmUpdate for unknown instanceId %d' % (instanceId))
+				self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
 				return
-			else:
-				self.log.exception("Could not acquire instance")
-				raise
+		except:
+			self.log.exception("Could not acquire instance")
+			raise
+
+		self.instanceLastContactTime[instanceId] = time.time()
+		oldInstance.decayed = False
 
 		if (instance.state == InstanceState.Exited):
-			oldInstance.decayed = False
-			self.updateDecay(self.decayedInstances, oldInstance)
+			# determine why a VM has exited
 			hostname = self.data.getHost(oldInstance.hostId).name
 			if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
-				self.log.warning('Unexpected exit on %s of instance %d (vmId %d)' % (hostname, instanceId,
oldInstance.vmId))
+				self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name,
oldInstance.vmId))
 			if (oldInstance.state == InstanceState.Suspending):
 				self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
 				self.data.releaseInstance(oldInstance)
 			else:
+				del self.instanceLastContactTime[oldInstance.id]
 				self.data.removeInstance(oldInstance)
 		else:
 			if (instance.state):
+				# XXXstroucki does this matter?
 				if (oldState and oldInstance.state != oldState):
 					self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously
%s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
 				oldInstance.state = instance.state
@@ -467,13 +492,13 @@ class ClusterManagerService(object):
 						for oldNic in oldInstance.nics:
 							if (oldNic.mac == nic.mac):
 								oldNic.ip = nic.ip
-			oldInstance.decayed = False
-			self.updateDecay(self.decayedInstances, oldInstance)
-			self.data.releaseInstance(oldInstance)
-		return
+
+		self.data.releaseInstance(oldInstance)
+		return "success"
 	
 	def activateVm(self, instanceId, host):
 		dataHost = self.data.acquireHost(host.id)
+
 		if (dataHost.name != host.name):
 			self.data.releaseHost(dataHost)
 			raise TashiException(d={'errno':Errors.HostNameMismatch,'msg':"Mismatched target host"})
@@ -483,14 +508,18 @@ class ClusterManagerService(object):
 		if (dataHost.state != HostState.Normal):
 			self.data.releaseHost(dataHost)
 			raise TashiException(d={'errno':Errors.HostStateError,'msg':"Target host state is not
normal"})
+
 		self.data.releaseHost(dataHost)
 		instance = self.data.acquireInstance(instanceId)
+
 		if ('__resume_source' in instance.hints):
 			self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
 		else:
 			self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+
 		instance.hostId = host.id
 		self.data.releaseInstance(instance)
+
 		try:
 			if ('__resume_source' in instance.hints):
 				vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
@@ -501,24 +530,32 @@ class ClusterManagerService(object):
 			if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called
during initialization and initialization fails
 				self.data.removeInstance(instance)
 			else:
+				# XXXstroucki what can we do about pending hosts in the scheduler?
+				# put them at the end of the queue and keep trying?
 				self.stateTransition(instance, None, InstanceState.Held)
 				instance.hostId = None
 				self.data.releaseInstance(instance)
-			raise
+			return "failure"
+
 		instance = self.data.acquireInstance(instanceId)
 		instance.vmId = vmId
+
 		if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called
during initialization
-			self.data.releaseInstance(instance)
 			try:
 				self.proxy[host.name].destroyVm(vmId)
+				self.data.removeInstance(instance)
 			except Exception:
 				self.log.exception('destroyVm failed for host %s vmId %d' % (host.name, instance.vmId))
-				raise
+				self.data.releaseInstance(instance)
+				return "failure"
 		else:
 			if ('__resume_source' not in instance.hints):
-				self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
-			self.data.releaseInstance(instance)
-		return
+				# XXXstroucki should we just wait for NM to update?
+				#self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+				pass
+
+		self.data.releaseInstance(instance)
+		return "success"
 
         def registerHost(self, hostname, memory, cores, version):
                 hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores,
version)



Mime
View raw message