incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1222909 - /incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
Date Sat, 24 Dec 2011 05:43:14 GMT
Author: stroucki
Date: Sat Dec 24 05:43:13 2011
New Revision: 1222909

URL: http://svn.apache.org/viewvc?rev=1222909&view=rev
Log:
nodemanagerservice: preparation for accounting turned into a full revision


Modified:
    incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py?rev=1222909&r1=1222908&r2=1222909&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
(original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
Sat Dec 24 05:43:13 2011
@@ -52,55 +52,71 @@ class NodeManagerService(object):
 		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
 		self.infoFile = self.config.get('NodeManagerService', 'infoFile')
 		self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
-		self.id = None
+		self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+		try:
+			self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+		except:
+			self.log.exception("Could not connect to CM")
+			return
+
 		self.notifyCM = []
+		
+		# XXXstroucki this fn could be in this level maybe?
+		self.host = self.vmm.getHostInfo(self)
 
+		# populate self.instances
+		self.__loadVmInfo()
 
-		# XXXstroucki: loadVmInfo probably once came from
-		# pickle store. Now the following check is redundant
-		# is there a need to maintain state on this level?
-
-		# get VMs from VMM
-		self.loadVmInfo()
-
-		# get VMs from VMM
-		vmList = self.vmm.listVms()
-		for vmId in vmList:
-			if (vmId not in self.instances):
-				self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
-				self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
-		for vmId in self.instances.keys():
-			if (vmId not in vmList):
-				self.log.warning('vmcontrol backend does not report %d' % (vmId))
-				self.vmStateChange(vmId, None, InstanceState.Exited)
+		self.__registerHost()
 
-		# XXXstroucki is there a config for this to have effect?
-		self.registerHost()
+		self.id = self.cm.registerNodeManager(self.host, self.instances.values())
 
+		# XXXstroucki cut cross check for NM/VMM state
 
 		# start service threads
-		threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
-		threading.Thread(target=self.registerWithClusterManager).start()
-		threading.Thread(target=self.statsThread).start()
+		threading.Thread(target=self.__registerWithClusterManager).start()
+		threading.Thread(target=self.__statsThread).start()
 	
-	# XXXstroucki is this still useful?
 	def loadVmInfo(self):
 		try:
 			self.instances = self.vmm.getInstances()
 		except Exception, e:
 			self.log.exception('Failed to obtain VM info')
 			self.instances = {}
-	
-	# XXXstroucki is this still useful?
-	def saveVmInfo(self):
+
+	# send data to CM
+	# XXXstroucki adapt this for accounting?
+	def __flushNotifyCM(self):
+		# send data to CM, adding message to buffer if
+		# it fails
 		try:
-			data = cPickle.dumps(self.instances)
-			f = open(self.infoFile, "w")
-			f.write(data)
-			f.close()
+			notifyCM = []
+			try:
+				while (len(self.notifyCM) > 0):
+					(instanceId, newInst, old, success) = self.notifyCM.pop(0)
+					try:
+						cm.vmUpdate(instanceId, newInst, old)
+					except TashiException, e:
+						notifyCM.append((instanceId, newInst, old, success))
+						if (e.errno != Erors.IncorrectVmState):
+							raise
+					except:
+						notifyCM.append((instanceId, newInst, old, success))
+						raise
+					else:
+						success()
+			finally:
+				self.notifyCM.append(notifyCM)
 		except Exception, e:
-			self.log.exception('Failed to save VM info to %s' % (self.infoFile))
-	
+			self.log.exception('Failed to register with the CM')
+
+		toSleep = start - time.time() + self.registerFrequency
+		if (toSleep > 0):
+			time.sleep(toSleep)
+
+
+	# Called from VMM to update self.instances
+	# but only changes are Exited, MigrateTrans and Running
 	def vmStateChange(self, vmId, old, cur):
 		instance = self.getInstance(vmId)
 		if (old and instance.state != old):
@@ -116,101 +132,68 @@ class NodeManagerService(object):
 		instance.state = cur
 		newInst = Instance(d={'state':cur})
 		success = lambda: None
-		try:
-			cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-			cm.vmUpdate(instance.id, newInst, old)
-		except Exception, e:
-			self.log.exception('RPC failed for vmUpdate on CM')
-			# XXXstroucki things are pretty messed if CM is unreachable
-			# should see if this fn is used purely imperatively or
-			# purely advisory
-			self.notifyCM.append((instance.id, newInst, old, success))
-		else:
-			success()
+		# send the state change up to the CM
+		self.notifyCM.append((instance.id, newInst, old, success))
+		self.__flushNotifyCM()
+
+		# XXXstroucki things are pretty messed if CM is unreachable
+		# qemu.py calls this in the matchSystemPids thread
+		# xenpv.py: i have no real idea why it is called there
+		
 		return True
 	
 	# service thread function
-	def backupVmInfoAndFlushNotifyCM(self):
-		cm = None
-		cmUseCount = 0
-		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					# XXXstroucki hope rpyc handles destruction
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
-
-			cmUseCount = cmUseCount + 1
+	def __registerWithClusterManager(self):
 			start = time.time()
 			try:
-				self.saveVmInfo()
-			except Exception, e:
-				self.log.exception('Failed to save VM info')
-
-			# send data to the CM, adding message to buffer if
-			# it fails
-			try:
-				notifyCM = []
-				try:
-					while (len(self.notifyCM) > 0):
-						(instanceId, newInst, old, success) = self.notifyCM.pop(0)
-						try:
-							cm.vmUpdate(instanceId, newInst, old)
-						except TashiException, e:
-							notifyCM.append((instanceId, newInst, old, success))
-							if (e.errno != Errors.IncorrectVmState):
-								raise
-						except:
-							notifyCM.append((instanceId, newInst, old, success))
-							raise
-						else:
-							success()
-				finally:
-					self.notifyCM = self.notifyCM + notifyCM
+				instances = self.instances.values()
+				self.id = self.cm.registerNodeManager(self.host, instances)
 			except Exception, e:
 				self.log.exception('Failed to register with the CM')
 			toSleep = start - time.time() + self.registerFrequency
 			if (toSleep > 0):
 				time.sleep(toSleep)
-	
+
 	# service thread function
-	def registerWithClusterManager(self):
-		cm = None
-		cmUseCount = 0
+	def __statsThread(self):
+		if (self.statsInterval == 0):
+			return
 		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
-			cmUseCount = cmUseCount + 1
-			start = time.time()
 			try:
-				# XXXstroucki this fn could be in this level maybe?
-				host = self.vmm.getHostInfo(self)
-				instances = self.instances.values()
-				#import pprint
-				#self.log.warning("Instances: " + pprint.saferepr(instances))
-				self.id = cm.registerNodeManager(host, instances)
-			except Exception, e:
-				self.log.exception('Failed to register with the CM')
-			toSleep = start - time.time() + self.registerFrequency
-			if (toSleep > 0):
-				time.sleep(toSleep)
-	
-	def getInstance(self, vmId):
+				publishList = []
+				for vmId in self.instances.keys():
+					try:
+						instance = self.instances.get(vmId, None)
+						if (not instance):
+							continue
+						id = instance.id
+						stats = self.vmm.getStats(vmId)
+						for stat in stats:
+							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
+					except:
+						self.log.exception('statsThread threw an exception')
+				if (len(publishList) > 0):
+					tashi.publisher.publishList(publishList)
+			except:
+				self.log.exception('statsThread threw an exception')
+			time.sleep(self.statsInterval)
+
+        def __registerHost(self):
+                hostname = socket.gethostname()
+		# populate some defaults
+		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering
with the clustermanager
+		memory = 0
+		cores = 0
+		version = "empty"
+                #self.cm.registerHost(hostname, memory, cores, version)
+
+	def __getInstance(self, vmId):
 		instance = self.instances.get(vmId, None)
 		if (instance is None):
 			raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host"
% (vmId)})
 		return instance
 	
+	# remote
 	def instantiateVm(self, instance):
 		vmId = self.vmm.instantiateVm(instance)
 		instance.vmId = vmId
@@ -218,25 +201,22 @@ class NodeManagerService(object):
 		self.instances[vmId] = instance
 		return vmId
 	
+	# remote
 	def suspendVm(self, vmId, destination):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Suspending
 		threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
 	
-	def resumeVmHelper(self, instance, name):
+	# called by resumeVm as thread
+	def __resumeVmHelper(self, instance, name):
 		self.vmm.resumeVmHelper(instance, name)
 		instance.state = InstanceState.Running
 		newInstance = Instance(d={'id':instance.id,'state':instance.state})
 		success = lambda: None
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in resumeVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
-		else:
-			success()
-	
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
+		self.__flushNotifyCM()
+
+	# remote
 	def resumeVm(self, instance, name):
 		instance.state = InstanceState.Resuming
 		instance.hostId = self.id
@@ -249,16 +229,19 @@ class NodeManagerService(object):
 			raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node
manager"})
 		return instance.vmId
 	
+	# remote
 	def prepReceiveVm(self, instance, source):
 		instance.vmId = -1
 		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
 		return transportCookie
 
+	# remote
 	def prepSourceVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.MigratePrep
-	
-	def migrateVmHelper(self, instance, target, transportCookie):
+
+	# called by migrateVm as thread
+	def __migrateVmHelper(self, instance, target, transportCookie):
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
 		del self.instances[instance.vmId]
 		
@@ -268,6 +251,7 @@ class NodeManagerService(object):
 		threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
 		return
 	
+	# called by receiveVm as thread
 	def receiveVmHelper(self, instance, transportCookie):
 		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		vmId = self.vmm.receiveVm(transportCookie)
@@ -284,76 +268,52 @@ class NodeManagerService(object):
 			self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
 		else:
 			success()
-	
+	# remote
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
 		threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
 		return
-	
+
+	# remote
 	def pauseVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Pausing
 		self.vmm.pauseVm(vmId)
 		instance.state = InstanceState.Paused
-	
+
+	# remote
 	def unpauseVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Unpausing
 		self.vmm.unpauseVm(vmId)
 		instance.state = InstanceState.Running
-	
+
+	# remote
 	def shutdownVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.ShuttingDown
 		self.vmm.shutdownVm(vmId)
-	
+
+	# remote
 	def destroyVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Destroying
 		self.vmm.destroyVm(vmId)
-	
+
+	# remote
 	def getVmInfo(self, vmId):
 		instance = self.getInstance(vmId)
 		return instance
-	
+
+	# remote
 	def vmmSpecificCall(self, vmId, arg):
 		return self.vmm.vmmSpecificCall(vmId, arg)
-	
+
+	# remote
 	def listVms(self):
 		return self.instances.keys()
 
+	# remote
 	def liveCheck(self):
 		return "alive"
 	
-	def statsThread(self):
-		if (self.statsInterval == 0):
-			return
-		while True:
-			try:
-				publishList = []
-				for vmId in self.instances.keys():
-					try:
-						instance = self.instances.get(vmId, None)
-						if (not instance):
-							continue
-						id = instance.id
-						stats = self.vmm.getStats(vmId)
-						for stat in stats:
-							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
-					except:
-						self.log.exception('statsThread threw an exception')
-				if (len(publishList) > 0):
-					tashi.publisher.publishList(publishList)
-			except:
-				self.log.exception('statsThread threw an exception')
-			time.sleep(self.statsInterval)
-
-        def registerHost(self):
-                cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-                hostname = socket.gethostname()
-		# populate some defaults
-		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering
with the clustermanager
-		memory = 0
-		cores = 0
-		version = "empty"
-                #cm.registerHost(hostname, memory, cores, version)



Mime
View raw message