incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1225071 - in /incubator/tashi/branches/stroucki-accounting/src/tashi: nodemanager/nodemanagerservice.py version.py
Date Wed, 28 Dec 2011 02:38:41 GMT
Author: stroucki
Date: Wed Dec 28 02:38:40 2011
New Revision: 1225071

URL: http://svn.apache.org/viewvc?rev=1225071&view=rev
Log:
nodemanagerservice: add accounting messages

Modified:
    incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/branches/stroucki-accounting/src/tashi/version.py

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py?rev=1225071&r1=1225070&r2=1225071&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
(original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
Wed Dec 28 02:38:40 2011
@@ -60,6 +60,9 @@ class NodeManagerService(object):
 			return
 
 		self.notifyCM = []
+
+		self.accountLines = 0
+		self.accountBuffer = []
 		
 		self.id = None
 		# XXXstroucki this fn could be in this level maybe?
@@ -116,34 +119,34 @@ class NodeManagerService(object):
 		if (toSleep > 0):
 			time.sleep(toSleep)
 
+        def __ACCOUNTFLUSH(self):
+                print "Called account flush"
+                self.accountLines = 0
+                self.accountBuffer = []
 
-	# Called from VMM to update self.instances
-	# but only changes are Exited, MigrateTrans and Running
-	def vmStateChange(self, vmId, old, cur):
-		instance = self.__getInstance(vmId)
-		if (old and instance.state != old):
-			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-		if (cur == InstanceState.Exited):
-			del self.instances[vmId]
-			return True
 
-		if (instance.state == cur):
-			# Don't do anything if state is what it should be
-			return True
+        def __ACCOUNT(self, text, instance=None, host=None):
+                now = self.__now()
+                instanceText = None
+                hostText = None
+
+                if instance is not None:
+                        instanceText = 'Instance(id %d host %d vmId %d user %d cores %d memory
%d)' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+
+                if host is not None:
+                        hostText = "Host(id %d memory %d cores %d)" % (host.id, host.memory,
host.cores)
+
+                secondary = ','.join(hostText, instanceText)
+
+                line = "%s|%s|%s" % (now, text, secondary)
+
+                self.accountBuffer.append(line)
+                self.accountLines += 1
+
+                if (self.accountLines > 5):
+                        self.__ACCOUNTFLUSH()
 
-		instance.state = cur
-		newInst = Instance(d={'state':cur})
-		success = lambda: None
-		# send the state change up to the CM
-		self.notifyCM.append((instance.id, newInst, old, success))
-		self.__flushNotifyCM()
 
-		# XXXstroucki things are pretty messed if CM is unreachable
-		# qemu.py calls this in the matchSystemPids thread
-		# xenpv.py: i have no real idea why it is called there
-		
-		return True
-	
 	# service thread function
 	def __registerWithClusterManager(self):
 			start = time.time()
@@ -196,17 +199,62 @@ class NodeManagerService(object):
 		return instance
 	
 	# remote
-	def instantiateVm(self, instance):
-		vmId = self.vmm.instantiateVm(instance)
-		instance.vmId = vmId
-		instance.state = InstanceState.Running
+	# Called from VMM to update self.instances
+	# but only changes are Exited, MigrateTrans and Running
+	# qemu.py calls this in the matchSystemPids thread
+	# xenpv.py: i have no real idea why it is called there
+	def vmStateChange(self, vmId, old, cur):
+		instance = self.__getInstance(vmId)
+
+		if (instance.state == cur):
+			# Don't do anything if state is what it should be
+			return True
+
+		if (old and instance.state != old):
+			# make a note of mismatch, but go on.
+			# the VMM should know best
+			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+                        
+		instance.state = cur
+
+		self._ACCOUNT("NM VM STATE CHANGE", instance=instance)
+			      
+		newInst = Instance(d={'state':cur})
+		success = lambda: None
+		# send the state change up to the CM
+		self.notifyCM.append((instance.id, newInst, old, success))
+		self.__flushNotifyCM()
+
+		# cache change locally
 		self.instances[vmId] = instance
-		return vmId
+
+		if (cur == InstanceState.Exited):
+			# At this point, the VMM will clean up,
+			# so forget about this instance
+			del self.instances[vmId]
+			return True
+
+		return True
+	
+	# remote
+	def instantiateVm(self, instance):
+		self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
+		try:
+			vmId = self.vmm.instantiateVm(instance)
+			instance.vmId = vmId
+			instance.state = InstanceState.Running
+			self.instances[vmId] = instance
+			return vmId
+		except:
+			self.log.exception("Failed to start instance")
 	
 	# remote
 	def suspendVm(self, vmId, destination):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SUSPEND", instance=instance)
+
 		instance.state = InstanceState.Suspending
+		self.instances[vmId] = instance
 		threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
 	
 	# called by resumeVm as thread
@@ -220,6 +268,7 @@ class NodeManagerService(object):
 
 	# remote
 	def resumeVm(self, instance, name):
+		self.__ACCOUNT("NM VM RESUME", instance=instance)
 		instance.state = InstanceState.Resuming
 		instance.hostId = self.id
 		try:
@@ -233,6 +282,7 @@ class NodeManagerService(object):
 	
 	# remote
 	def prepReceiveVm(self, instance, source):
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
 		instance.vmId = -1
 		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
 		return transportCookie
@@ -240,22 +290,29 @@ class NodeManagerService(object):
 	# remote
 	def prepSourceVm(self, vmId):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE SOURCE PREP", instance=instance)
 		instance.state = InstanceState.MigratePrep
+		self.instance[vmId] = instance
 
 	# called by migrateVm as thread
+	# XXXstroucki migrate out?
 	def __migrateVmHelper(self, instance, target, transportCookie):
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
 		del self.instances[instance.vmId]
-		
+
+	# remote
+	# XXXstroucki migrate out?
 	def migrateVm(self, vmId, target, transportCookie):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE", instance=instance)
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
+		self.instances[vmId] = instance
+		threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
 		return
 	
 	# called by receiveVm as thread
-	def receiveVmHelper(self, instance, transportCookie):
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+	# XXXstroucki migrate in?
+	def __receiveVmHelper(self, instance, transportCookie):
 		vmId = self.vmm.receiveVm(transportCookie)
 		instance.state = InstanceState.Running
 		instance.hostId = self.id
@@ -263,43 +320,52 @@ class NodeManagerService(object):
 		self.instances[vmId] = instance
 		newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
 		success = lambda: None
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in receiveVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
-		else:
-			success()
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+		self.__flushNotifyCM()
+
 	# remote
+	# XXXstroucki migrate in?
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
+		self.instances[vmId] = instance
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
+		threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
 		return
 
 	# remote
 	def pauseVm(self, vmId):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM PAUSE", instance=instance)
 		instance.state = InstanceState.Pausing
+		self.instance[vmId] = instance
 		self.vmm.pauseVm(vmId)
 		instance.state = InstanceState.Paused
+		self.instance[vmId] = instance
 
 	# remote
 	def unpauseVm(self, vmId):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM UNPAUSE", instance=instance)
 		instance.state = InstanceState.Unpausing
+		self.instance[vmId] = instance
 		self.vmm.unpauseVm(vmId)
 		instance.state = InstanceState.Running
+		self.instance[vmId] = instance
 
 	# remote
 	def shutdownVm(self, vmId):
 		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SHUTDOWN", instance=instance)
 		instance.state = InstanceState.ShuttingDown
+		self.instance[vmId] = instance
 		self.vmm.shutdownVm(vmId)
 
 	# remote
 	def destroyVm(self, vmId):
 		instance = self.__getInstance(vmId)
+		self.ACCOUNT("NM VM DESTROY", instance=instance)
 		instance.state = InstanceState.Destroying
+		self.instance[vmId] = instance
 		self.vmm.destroyVm(vmId)
 
 	# remote

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/version.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/version.py?rev=1225071&r1=1225070&r2=1225071&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/version.py (original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/version.py Wed Dec 28 02:38:40
2011
@@ -16,4 +16,3 @@
 # under the License.    
 
 version = "HEAD"
-#blorf



Mime
View raw message