incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1349043 - in /incubator/tashi/trunk/src/tashi: clustermanager/clustermanagerservice.py nodemanager/nodemanagerservice.py
Date Mon, 11 Jun 2012 21:52:29 GMT
Author: stroucki
Date: Mon Jun 11 21:52:29 2012
New Revision: 1349043

URL: http://svn.apache.org/viewvc?rev=1349043&view=rev
Log:
clustermanagerservice: make sure we actually get a type Instance back when doing getVmInfo
clustermanagerservice: don't accept notifications of "running" Exited machines
clustermanagerservice: clean syntax in places
clustermanagerservice: process state updates via checkInstances the same way as if the NM
had told us (ie pick up on host and vmId changes).
clustermanagerservice: mark which functions are called externally
clustermanagerservice: make sure we copy the Instance instance for display in vmUpdate
nodemanagerservice: try to catch ValueError
nodemanagerservice: don't report source VM in migration exiting to the CM
nodemanagerservice: when the VMM notices a VM exit, it will tell the NM which will cause the
removal of its record
nodemanagerservice: vmId is not yet known until the VM is received.

Modified:
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1349043&r1=1349042&r2=1349043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Mon Jun 11 21:52:29
2012
@@ -20,7 +20,7 @@ import threading
 import time
 
 from tashi.rpycservices import rpycservices	     
-from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, Instance, HostState, TashiException
 from tashi import boolean, ConnectionManager, vmStates, version, scrubString
 
 class ClusterManagerService(object):
@@ -235,7 +235,7 @@ class ClusterManagerService(object):
 				# get a list of VMs running on host
 				try:
 					hostProxy = self.proxy[host.name]
-					remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+					remoteInstances = [self.__getVmInfo(host.name, vmId) for vmId in hostProxy.listVms()]
 				except:
 					self.log.warning('Failure getting instances from host %s' % (host.name))
 					self.data.releaseHost(host)
@@ -244,6 +244,9 @@ class ClusterManagerService(object):
 				# register instances I don't know about
 				for instance in remoteInstances:
 					if (instance.id not in myInstances):
+						if instance.state == InstanceState.Exited:
+							self.log.warning("%s telling me about exited instance %s, ignoring." % (host.name,
instance.id))
+							continue
 						instance.hostId = host.id
 						instance = self.data.registerInstance(instance)
 						self.data.releaseInstance(instance)
@@ -285,9 +288,7 @@ class ClusterManagerService(object):
 					# Don't query non-running VMs. eg. if a VM
 					# is suspended, and has no host, then there's
 					# no one to ask
-					if instance.state != InstanceState.Running and \
-					   instance.state != InstanceState.Activating and \
-					   instance.state != InstanceState.Orphaned:
+					if instance.state not in [InstanceState.Running, InstanceState.Activating, InstanceState.Orphaned]:
 						self.data.releaseInstance(instance)
 						continue
 				except:
@@ -302,22 +303,34 @@ class ClusterManagerService(object):
 
 				# get updated state on VM
 				try:
-					hostProxy = self.proxy[host.name]
-					newInstance = hostProxy.getVmInfo(instance.vmId)
+					newInstance = self.__getVmInfo(host.name, instance.vmId)
 				except:
 					self.log.warning('Failure getting data for instance %s from host %s' % (instance.name,
host.name))
 					self.data.releaseInstance(instance)
 					continue
 
-				# replace existing state with new state
-				# XXXstroucki more?
-				instance.state = newInstance.state
-				self.instanceLastContactTime[instanceId] = self.__now()
-				instance.decayed = False
-				self.data.releaseInstance(instance)
+				# update the information we have on the vm
+				before = instance.state
+				rv = self.__vmUpdate(instance, newInstance, None)
+				if (rv == "release"):
+					self.data.releaseInstance(instance)
+
+				if (rv == "remove"):
+					self.data.removeInstance(instance)
+
+
+	def __getVmInfo(self, host, vmid):
+		hostProxy = self.proxy[host]
+		rv = hostProxy.getVmInfo(vmid)
+		if isinstance(rv, Exception):
+			raise rv
 
+		if not isinstance(rv, Instance):
+			raise ValueError
 
-	def normalize(self, instance):
+		return rv
+
+	def __normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
 		instance.hostId = None
@@ -345,15 +358,17 @@ class ClusterManagerService(object):
 				del instance.hints[hint]
 		return instance
 	
+	# extern
 	def createVm(self, instance):
 		"""Function to add a VM to the list of pending VMs"""
 		# XXXstroucki: check for exception here
-		instance = self.normalize(instance)
+		instance = self.__normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
 		self.__ACCOUNT("CM VM REQUEST", instance=instance)
 		return instance
-	
+
+	# extern
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.__stateTransition(instance, None, InstanceState.ShuttingDown)
@@ -366,7 +381,8 @@ class ClusterManagerService(object):
 			self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
 			raise
 		return
-	
+
+	# extern
 	def destroyVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -396,6 +412,7 @@ class ClusterManagerService(object):
 
 		return
 	
+	# extern
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		try:
@@ -415,6 +432,7 @@ class ClusterManagerService(object):
 			raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' %
(instance.name)})
 		return
 	
+	# extern
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		try:
@@ -429,6 +447,7 @@ class ClusterManagerService(object):
 		self.__ACCOUNT("CM VM RESUME", instance=instance)
 		return instance
 	
+	# extern
 	def migrateVm(self, instanceId, targetHostId):
 		instance = self.data.acquireInstance(instanceId)
 		self.__ACCOUNT("CM VM MIGRATE", instance=instance)
@@ -483,8 +502,11 @@ class ClusterManagerService(object):
 		except Exception:
 			self.log.exception('receiveVm failed')
 			raise
+
+		self.log.info("migrateVM finished")
 		return
-	
+
+	# extern
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		try:
@@ -511,6 +533,7 @@ class ClusterManagerService(object):
 		self.data.releaseInstance(instance)
 		return
 
+	# extern
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		try:
@@ -536,10 +559,12 @@ class ClusterManagerService(object):
 
 		self.data.releaseInstance(instance)
 		return
-	
+
+	# extern
 	def getHosts(self):
 		return self.data.getHosts().values()
 	
+	# extern
 	def getNetworks(self):
 		networks = self.data.getNetworks()
 		for network in networks:
@@ -548,16 +573,20 @@ class ClusterManagerService(object):
 
 		return networks.values()
 
-	
+
+	# extern
 	def getUsers(self):
 		return self.data.getUsers().values()
-	
+
+	# extern
 	def getInstances(self):
 		return self.data.getInstances().values()
 
+	# extern
 	def getImages(self):
 		return self.data.getImages()
 	
+	# extern
 	def copyImage(self, src, dst):
 		imageSrc = self.dfs.getLocalHandle("images/" + src)
 		imageDst = self.dfs.getLocalHandle("images/" + dst)
@@ -571,6 +600,7 @@ class ClusterManagerService(object):
 		except Exception, e:
 			self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
 
+	# extern
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
@@ -582,7 +612,7 @@ class ClusterManagerService(object):
 			raise
 		return res
 	
-#	@timed
+	# extern
 	def registerNodeManager(self, host, instances):
 		"""Called by the NM every so often as a keep-alive/state polling -- state changes here
are NOT AUTHORITATIVE"""
 
@@ -615,30 +645,21 @@ class ClusterManagerService(object):
 		# let the host communicate what it is running
 		# and note that the information is not stale
 		for instance in instances:
+			if instance.state == InstanceState.Exited:
+				self.log.warning("%s reporting exited instance %s, ignoring." % (host.name, instance.id))
+				continue
 			self.instanceLastContactTime.setdefault(instance.id, 0)
 
 		self.data.releaseHost(oldHost)
 		return host.id
 	
-	def vmUpdate(self, instanceId, instance, oldState):
-		try:
-			oldInstance = self.data.acquireInstance(instanceId)
-		except TashiException, e:
-			# shouldn't have a lock to clean up after here
-			if (e.errno == Errors.NoSuchInstanceId):
-				self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
-				return
-		except:
-			self.log.exception("Could not acquire instance")
-			raise
+	def __vmUpdate(self, oldInstance, instance, oldState):
+		# this function assumes a lock is held on the instance
+		# already, and will be released elsewhere
 
-		self.instanceLastContactTime[instanceId] = self.__now()
+		self.instanceLastContactTime[oldInstance.id] = self.__now()
 		oldInstance.decayed = False
 
-		displayInstance = oldInstance
-		displayInstance.state = instance.state
-		self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
-
 		if (instance.state == InstanceState.Exited):
 			# determine why a VM has exited
 			hostname = self.data.getHost(oldInstance.hostId).name
@@ -650,21 +671,21 @@ class ClusterManagerService(object):
 				self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
-				self.data.releaseInstance(oldInstance)
+				return "release"
 
 			if (oldInstance.state == InstanceState.MigrateTrans):
 				# Just await update from target host
-				self.data.releaseInstance(oldInstance)
+				return "release"
 
 			else:
 				del self.instanceLastContactTime[oldInstance.id]
-				self.data.removeInstance(oldInstance)
+				return "remove"
 
 		else:
 			if (instance.state):
 				# XXXstroucki does this matter?
 				if (oldState and oldInstance.state != oldState):
-					self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously
%s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
+					self.log.warning('Doing vmUpdate of state from %s to %s, but the instance was previously
%s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
 				oldInstance.state = instance.state
 			if (instance.vmId):
 				oldInstance.vmId = instance.vmId
@@ -677,10 +698,40 @@ class ClusterManagerService(object):
 							if (oldNic.mac == nic.mac):
 								oldNic.ip = nic.ip
 
+			return "release"
+
+
+		return "success"
+
+	# extern
+	def vmUpdate(self, instanceId, instance, oldState):
+		try:
+			oldInstance = self.data.acquireInstance(instanceId)
+		except TashiException, e:
+			# shouldn't have a lock to clean up after here
+			if (e.errno == Errors.NoSuchInstanceId):
+				self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
+				return
+		except:
+			self.log.exception("Could not acquire instance")
+			raise
+
+		import copy
+		displayInstance = copy.copy(oldInstance)
+		displayInstance.state = instance.state
+		self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
+
+		rv = self.__vmUpdate(oldInstance, instance, oldState)
+
+		if (rv == "release"):
 			self.data.releaseInstance(oldInstance)
 
+		if (rv == "remove"):
+			self.data.removeInstance(oldInstance)
+
 		return "success"
-	
+
+	# extern
 	def activateVm(self, instanceId, host):
 		# XXXstroucki: check my idea of the host's capacity before
 		# trying.
@@ -748,6 +799,7 @@ class ClusterManagerService(object):
 		self.data.releaseInstance(instance)
 		return "success"
 
+	# extern
 	def registerHost(self, hostname, memory, cores, version):
 		hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
 		if alreadyRegistered:
@@ -763,6 +815,7 @@ class ClusterManagerService(object):
 
 		return hostId
 
+	# extern
 	def unregisterHost(self, hostId):
 		try:
 			host = self.data.getHost(hostId)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1349043&r1=1349042&r2=1349043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Mon Jun 11 21:52:29
2012
@@ -112,7 +112,10 @@ class NodeManagerService(object):
 					# XXXstroucki ValueError: need more than 1 value to unpack
 					# observed here. How?
 					value = self.notifyCM.pop(0)
-					(instanceId, newInst, old, success) = value
+					try:
+						(instanceId, newInst, old, success) = value
+					except:
+						self.log.exception("problem with value: %s" % value)
 					try:
 						self.cm.vmUpdate(instanceId, newInst, old)
 					except TashiException, e:
@@ -256,6 +259,7 @@ class NodeManagerService(object):
 			else:
 				raise
 
+		before = instance.state
 		if (instance.state == cur):
 			# Don't do anything if state is what it should be
 			return True
@@ -271,9 +275,18 @@ class NodeManagerService(object):
 
 		newInst = Instance(d={'state':cur})
 		success = lambda: None
-		# send the state change up to the CM
-		self.notifyCM.append((instance.id, newInst, old, success))
-		self.__flushNotifyCM()
+
+		# if this instance was in MigrateTrans, and has exited
+		# then don't tell the CM; it is the source instance
+		# exiting, and the CM should have updated its information
+		# to the target instance's info.
+		# Otherwise, send the state change up to the CM
+
+		if before == InstanceState.MigrateTrans and cur == InstanceState.Exited:
+			pass
+		else:
+			self.notifyCM.append((instance.id, newInst, old, success))
+			self.__flushNotifyCM()
 
 		# cache change locally
 		self.instances[vmId] = instance
@@ -282,7 +295,6 @@ class NodeManagerService(object):
 			# At this point, the VMM will clean up,
 			# so forget about this instance
 			del self.instances[vmId]
-			return True
 
 		return True
 
@@ -358,7 +370,9 @@ class NodeManagerService(object):
 	# XXXstroucki migrate out?
 	def __migrateVmHelper(self, instance, target, transportCookie):
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
-		del self.instances[instance.vmId]
+		# removal from self.instances done by communication from
+		# VMM as part of above migrateVm function
+		return
 
 	# remote
 	# XXXstroucki migrate out?
@@ -367,7 +381,7 @@ class NodeManagerService(object):
 		self.__ACCOUNT("NM VM MIGRATE", instance=instance)
 		instance.state = InstanceState.MigrateTrans
 		self.instances[vmId] = instance
-		threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
+		threading.Thread(name="migrateVmHelper", target=self.__migrateVmHelper, args=(instance,
target, transportCookie)).start()
 		return
 
 	# called by receiveVm as thread
@@ -380,15 +394,16 @@ class NodeManagerService(object):
 		self.instances[vmId] = instance
 		newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
 		success = lambda: None
-		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
 		self.__flushNotifyCM()
 
 	# remote
 	# XXXstroucki migrate in?
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
-		vmId = instance.vmId
-		self.instances[vmId] = instance
+		# XXXstroucki new vmId is not known yet until VM is received
+		#vmId = instance.vmId
+		#self.instances[vmId] = instance
 		self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
 		threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
 		return



Mime
View raw message