incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1222012 - /incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
Date Thu, 22 Dec 2011 03:00:14 GMT
Author: stroucki
Date: Thu Dec 22 03:00:13 2011
New Revision: 1222012

URL: http://svn.apache.org/viewvc?rev=1222012&view=rev
Log:
Start adding push-accounting to CM

Modified:
    incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py?rev=1222012&r1=1222011&r2=1222012&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
(original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
Thu Dec 22 03:00:13 2011
@@ -51,27 +51,71 @@ class ClusterManagerService(object):
 		self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
 		self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
 		self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
-		now = self.__now()
+		
+		self.__initAccounting()
+		self.__initCluster()
+
+		threading.Thread(target=self.monitorCluster).start()
+
+	def __initAccounting(self):
+		self.accountBuffer = []
+		self.accountLines = 0
+
+
+	def __initCluster(self):
+		# initialize state of VMs if restarting
 		for instance in self.data.getInstances().itervalues():
 			instanceId = instance.id
 			instance = self.data.acquireInstance(instanceId)
 			instance.decayed = False
 
 			if instance.hostId is None:
-				self.stateTransition(instance, None, InstanceState.Pending)
+				self.__stateTransition(instance, None, InstanceState.Pending)
 			else:
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
+
+		# initialize state of hosts if restarting
 		for host in self.data.getHosts().itervalues():
 			hostId = host.id
 			host = self.data.acquireHost(hostId)
 			host.up = False
 			host.decayed = False
 			self.data.releaseHost(host)
-		threading.Thread(target=self.monitorCluster).start()
 
-	def stateTransition(self, instance, old, cur):
+
+
+	def __ACCOUNTFLUSH(self):
+		print "Called account flush"
+		self.accountLines = 0
+		self.accountBuffer = []
+
+
+	def __ACCOUNT(self, text, instance=None, host=None):
+		now = self.__now()
+		instanceText = None
+		hostText = None
+
+		if instance is not None:
+			instanceText = 'Instance(id %d host %d vmId %d user %d cores %d memory %d)' % (instance.id,
instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+
+		if host is not None:
+			hostText = "Host(id %d memory %d cores %d)" % (host.id, host.memory, host.cores)
+
+		secondary = ','.join(hostText, instanceText)
+
+		line = "%s|%s|%s" % (now, text, secondary)
+
+		self.accountBuffer.append(line)
+		self.accountLines += 1
+
+		if (self.accountLines > 5):
+			self.__ACCOUNTFLUSH()
+
+
+
+	def __stateTransition(self, instance, old, cur):
 		if (old and instance.state != old):
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it
is %s" % (vmStates[old], vmStates[instance.state])})
 		if (instance.state == cur):
@@ -104,7 +148,7 @@ class ClusterManagerService(object):
 			instance = self.data.acquireInstance(instanceId)
 			if instance.hostId == host.id:
 				instance.decayed = True
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
 
@@ -225,23 +269,6 @@ class ClusterManagerService(object):
 				self.data.releaseInstance(instance)
 
 
-
-	def monitorCluster(self):
-		while True:
-			sleepFor = min(self.expireHostTime, self.allowDecayed)
-
-			try:
-				self.__checkHosts()
-				self.__checkInstances()
-			except:
-				self.log.exception('monitorCluster iteration failed')
-			#  XXXrgass too chatty.  Remove
-			# XXXstroucki the risk is that a deadlock in obtaining
-			# data could prevent this loop from continuing.
-			#self.log.info("Sleeping for %d seconds" % sleepFor)
-			time.sleep(sleepFor)
-
-
 	def normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
@@ -276,12 +303,14 @@ class ClusterManagerService(object):
 		instance = self.normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM REQUEST", instance=instance)
 		return instance
 	
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].shutdownVm(instance.vmId)
@@ -293,14 +322,17 @@ class ClusterManagerService(object):
 	def destroyVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
+			self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
 			self.data.removeInstance(instance)
 		elif (instance.state is InstanceState.Activating):
-			self.stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
+			self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
 			self.data.releaseInstance(instance)
 		else:
 			# XXXstroucki: This is a problem with keeping
 			# clean state.
-			self.stateTransition(instance, None, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY", instance=instance)
+			self.__stateTransition(instance, None, InstanceState.Destroying)
 			if instance.hostId is None:
 				self.data.removeInstance(instance)
 			else:
@@ -318,8 +350,9 @@ class ClusterManagerService(object):
 	
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SUSPEND", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		destination = "suspend/%d_%s" % (instance.id, instance.name)
 		try:
@@ -331,14 +364,16 @@ class ClusterManagerService(object):
 	
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+		self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
 		source = "suspend/%d_%s" % (instance.id, instance.name)
 		instance.hints['__resume_source'] = source
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM RESUME", instance=instance)
 		return instance
 	
 	def migrateVm(self, instanceId, targetHostId):
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM MIGRATE", instance=instance)
 		try:
 			# FIXME: should these be acquire/release host?
 			targetHost = self.data.getHost(targetHostId)
@@ -347,7 +382,7 @@ class ClusterManagerService(object):
 		except:
 			self.data.releaseInstance(instance)
 			raise
-		self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
 		self.data.releaseInstance(instance)
 		try:
 			# Prepare the target
@@ -359,7 +394,7 @@ class ClusterManagerService(object):
 			self.log.exception('prepReceiveVm failed')
 			raise
 		instance = self.data.acquireInstance(instance.id)
-		self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+		self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
 		self.data.releaseInstance(instance)
 		try:
 			# Send the VM
@@ -383,8 +418,9 @@ class ClusterManagerService(object):
 	
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM PAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].pauseVm(instance.vmId)
@@ -392,14 +428,15 @@ class ClusterManagerService(object):
 			self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+		self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
 		self.data.releaseInstance(instance)
 		return
 
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+		self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].unpauseVm(instance.vmId)
@@ -407,7 +444,7 @@ class ClusterManagerService(object):
 			self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+		self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
 		self.data.releaseInstance(instance)
 		return
 	
@@ -442,6 +479,7 @@ class ClusterManagerService(object):
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
+		self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
 		try:
 			res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
 		except Exception:
@@ -500,8 +538,9 @@ class ClusterManagerService(object):
 			self.log.exception("Could not acquire instance")
 			raise
 
-		self.instanceLastContactTime[instanceId] = time.time()
+		self.instanceLastContactTime[instanceId] = self.__now()
 		oldInstance.decayed = False
+		self.__ACCOUNT("CM VM UPDATE", instance=instance)
 
 		if (instance.state == InstanceState.Exited):
 			# determine why a VM has exited
@@ -509,7 +548,7 @@ class ClusterManagerService(object):
 			if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
 				self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name,
oldInstance.vmId))
 			if (oldInstance.state == InstanceState.Suspending):
-				self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
+				self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
 				self.data.releaseInstance(oldInstance)
@@ -552,13 +591,14 @@ class ClusterManagerService(object):
 
 		self.data.releaseHost(dataHost)
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
 
 		if ('__resume_source' in instance.hints):
-			self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+			self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
 		else:
 			# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
-			#self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
-			self.stateTransition(instance, None, InstanceState.Activating)
+			#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+			self.__stateTransition(instance, None, InstanceState.Activating)
 
 		instance.hostId = host.id
 		self.data.releaseInstance(instance)
@@ -575,7 +615,7 @@ class ClusterManagerService(object):
 			else:
 				# XXXstroucki what can we do about pending hosts in the scheduler?
 				# put them at the end of the queue and keep trying?
-				self.stateTransition(instance, None, InstanceState.Held)
+				self.__stateTransition(instance, None, InstanceState.Held)
 				instance.hostId = None
 				self.data.releaseInstance(instance)
 			return "failure"
@@ -594,7 +634,7 @@ class ClusterManagerService(object):
 		else:
 			if ('__resume_source' not in instance.hints):
 				# XXXstroucki should we just wait for NM to update?
-				#self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+				#self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
 				pass
 
 		self.data.releaseInstance(instance)
@@ -606,9 +646,42 @@ class ClusterManagerService(object):
                         self.log.info("Host %s is already registered, it was updated now"
% hostname)
                 else:
                         self.log.info("A host was registered - hostname: %s, version: %s,
memory: %s, cores: %s" % (hostname, version, memory, cores))
+
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST REGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+
                 return hostId
 
         def unregisterHost(self, hostId):
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST UNREGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+			return
+
                 self.data.unregisterHost(hostId)
                 self.log.info("Host %s was unregistered" % hostId)
                 return
+
+	# This function runs in a separate thread to monitor the state
+	# of the cluster
+	def monitorCluster(self):
+		while True:
+			sleepFor = min(self.expireHostTime, self.allowDecayed)
+
+			try:
+				self.__checkHosts()
+				self.__checkInstances()
+			except:
+				self.log.exception('monitorCluster iteration failed')
+			#  XXXrgass too chatty.  Remove
+			# XXXstroucki the risk is that a deadlock in obtaining
+			# data could prevent this loop from continuing.
+			#self.log.info("Sleeping for %d seconds" % sleepFor)
+			time.sleep(sleepFor)
+
+



Mime
View raw message