incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1230043 - in /incubator/tashi/trunk: ./ etc/ src/tashi/ src/tashi/accounting/ src/tashi/agents/ src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/nodemanager/ src/tashi/nodemanager/vmcontrol/ src/tashi/rpycservices/
Date Wed, 11 Jan 2012 14:00:57 GMT
Author: stroucki
Date: Wed Jan 11 14:00:56 2012
New Revision: 1230043

URL: http://svn.apache.org/viewvc?rev=1230043&view=rev
Log:
Merge stroucki-accounting into trunk

Changes:
accounting server is now a server and will record remote info
all services now have a clear naming distinction between private and public functions
additional constants moved to top for eventual configurability
use scratch directory consistently for VMM
separate out VMM specific functions
rework inter-service communications
clean up NM startup
general code cleanup
security fixes


Added:
    incubator/tashi/trunk/src/tashi/accounting/__init__.py
      - copied unchanged from r1229894, incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py
    incubator/tashi/trunk/src/tashi/accounting/accountingservice.py
      - copied unchanged from r1229894, incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accountingservice.py
Modified:
    incubator/tashi/trunk/   (props changed)
    incubator/tashi/trunk/etc/Accounting.cfg
    incubator/tashi/trunk/etc/NodeManager.cfg
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/accounting/accounting.py
    incubator/tashi/trunk/src/tashi/agents/primitive.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/connectionmanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/trunk/src/tashi/parallel.py
    incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
    incubator/tashi/trunk/src/tashi/util.py

Propchange: incubator/tashi/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 14:00:56 2012
@@ -1,2 +1,3 @@
 /incubator/tashi/branches/cmu:1178106-1187632
+/incubator/tashi/branches/stroucki-accounting:1221525-1229894
 /incubator/tashi/branches/zoni-dev/trunk:1034098-1177646

Modified: incubator/tashi/trunk/etc/Accounting.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/Accounting.cfg?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/Accounting.cfg (original)
+++ incubator/tashi/trunk/etc/Accounting.cfg Wed Jan 11 14:00:56 2012
@@ -1,3 +1,10 @@
+[Accounting]
+service = tashi.accounting.AccountingService
+
+[AccountingService]
+port = 2228
+pollSleep = 600
+
 [handler_fileHandler]
 class = FileHandler
 level = NOTSET

Modified: incubator/tashi/trunk/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/NodeManager.cfg?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/NodeManager.cfg (original)
+++ incubator/tashi/trunk/etc/NodeManager.cfg Wed Jan 11 14:00:56 2012
@@ -79,6 +79,8 @@ infoFile = /var/tmp/nm.dat
 clusterManagerHost = localhost ; Clustermanager hostname
 clusterManagerPort = 9882
 statsInterval = 0.0
+accountingHost = clustermanager
+accountingPort = 2228
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Security]

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Wed Jan 11 14:00:56 2012
@@ -55,6 +55,8 @@ allowMismatchedVersions = False
 maxMemory = 8192
 maxCores = 8
 allowDuplicateNames = False
+accountingHost = clustermanager
+accountingPort = 2228
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [GetentOverride]

Modified: incubator/tashi/trunk/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/accounting/accounting.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/trunk/src/tashi/accounting/accounting.py Wed Jan 11 14:00:56 2012
@@ -17,17 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from socket import gethostname
-import os
-import socket
 import sys
-import threading
-import time
-import random
+import signal
 import logging.config
 
-from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
+
+#from tashi.rpycservices.rpyctypes import *
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
 import tashi
 
 class Accounting(object):
@@ -48,20 +47,30 @@ class Accounting(object):
 				except:
 					self.log.exception("Failed to load hook %s" % (value))
 					
-	def start(self):
-		while True:
-			try:
-				instances = self.cm.getInstances()
-				for instance in instances:
-					# XXXstroucki this currently duplicates what the CM was doing.
-					# perhaps implement a diff-like log?
-					self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
-			except:
-				self.log.warning("Accounting iteration failed")
-
-			# wait to do the next iteration
-			# XXXstroucki make this configurable?
-			time.sleep(60)
+	def initAccountingServer(self):
+		service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+
+		#if boolean(self.config.get("Security", "authAndEncrypt")):
+		if False:
+			pass
+		else:
+			t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService', 'port')), auto_register=False)
+
+		t.logger.setLevel(logging.ERROR)
+		t.service.service = service
+		t.service._type = 'AccountingService'
+
+		debugConsole(globals())
+
+		try:
+			t.start()
+		except KeyboardInterrupt:
+			self.handleSIGTERM(signal.SIGTERM, None)
+
+	@signalHandler(signal.SIGTERM)
+	def handleSIGTERM(self, signalNumber, stackFrame):
+		self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+		sys.exit(0)
 
 def main():
 	(config, configFiles) = getConfig(["Accounting"])
@@ -70,7 +79,8 @@ def main():
 	cmclient = createClient(config)
 	logging.config.fileConfig(configFiles)
 	accounting = Accounting(config, cmclient)
-	accounting.start()
+
+	accounting.initAccountingServer()
 
 if __name__ == "__main__":
 	main()

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Wed Jan 11 14:00:56 2012
@@ -17,16 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from socket import gethostname
-import os
-import socket
-import sys
-import threading
 import time
-import random
 import logging.config
 
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
+
 from tashi.util import getConfig, createClient, instantiateImplementation, boolean
 import tashi
 
@@ -274,10 +269,10 @@ class Primitive(object):
 					# end for unassigned vms
 
 
-			except TashiException, e:
+			except TashiException:
 				self.log.exception("Tashi exception")
 
-			except Exception, e:
+			except Exception:
 				self.log.warning("Scheduler iteration failed")
 
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py Wed Jan 11 14:00:56 2012
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import os
 import sys
-import threading
 import signal
 import logging.config
-from getopt import getopt, GetoptError
-from ConfigParser import ConfigParser
 
 from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
 import tashi

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Wed Jan 11 14:00:56 2012
@@ -15,15 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from datetime import datetime
-from random import randint
-from socket import gethostname
 import logging
 import threading
 import time
 
+from tashi.rpycservices import rpycservices             
 from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
+from tashi import boolean, ConnectionManager, vmStates, version, scrubString
 
 class ClusterManagerService(object):
 	"""RPC service for the ClusterManager"""
@@ -42,6 +40,7 @@ class ClusterManagerService(object):
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
+		self.log.setLevel(logging.ERROR)
 		self.hostLastContactTime = {}
 		#self.hostLastUpdateTime = {}
 		self.instanceLastContactTime = {}
@@ -51,27 +50,97 @@ class ClusterManagerService(object):
 		self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
 		self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
 		self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
-		now = self.__now()
+
+		self.accountingHost = None
+		self.accountingPort = None
+		try:
+			self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
+			self.accountingPort = self.config.get('ClusterManagerService', 'accountingPort')
+		except:
+			pass
+
+		self.__initAccounting()
+		self.__initCluster()
+
+		threading.Thread(target=self.__monitorCluster).start()
+
+	def __initAccounting(self):
+		self.accountBuffer = []
+		self.accountLines = 0
+		self.accountingClient = None
+		try:
+			if (self.accountingHost is not None) and \
+				    (self.accountingPort is not None):
+
+				self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+		except:
+			self.log.exception("Could not init accounting")
+
+	def __initCluster(self):
+		# initialize state of VMs if restarting
 		for instance in self.data.getInstances().itervalues():
 			instanceId = instance.id
 			instance = self.data.acquireInstance(instanceId)
 			instance.decayed = False
 
 			if instance.hostId is None:
-				self.stateTransition(instance, None, InstanceState.Pending)
+				self.__stateTransition(instance, None, InstanceState.Pending)
 			else:
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
+
+		# initialize state of hosts if restarting
 		for host in self.data.getHosts().itervalues():
 			hostId = host.id
 			host = self.data.acquireHost(hostId)
 			host.up = False
 			host.decayed = False
 			self.data.releaseHost(host)
-		threading.Thread(target=self.monitorCluster).start()
 
-	def stateTransition(self, instance, old, cur):
+
+
+	def __ACCOUNTFLUSH(self):
+		try:
+			if (self.accountingClient is not None):
+				self.accountingClient.record(self.accountBuffer)
+			self.accountLines = 0
+			self.accountBuffer = []
+		except:
+			self.log.exception("Failed to flush accounting data")
+
+
+	def __ACCOUNT(self, text, instance=None, host=None):
+		now = self.__now()
+		instanceText = None
+		hostText = None
+
+		if instance is not None:
+			try:
+				instanceText = 'Instance(%s)' % (instance)
+			except:
+				self.log.exception("Invalid instance data")
+
+		if host is not None:
+			try:
+				hostText = "Host(%s)" % (host)
+			except:
+				self.log.exception("Invalid host data")
+
+                secondary = ','.join(filter(None, (hostText, instanceText)))
+
+		line = "%s|%s|%s" % (now, text, secondary)
+
+		self.accountBuffer.append(line)
+		self.accountLines += 1
+
+		# XXXstroucki think about autoflush by time
+		if (self.accountLines > 0):
+			self.__ACCOUNTFLUSH()
+
+
+
+	def __stateTransition(self, instance, old, cur):
 		if (old and instance.state != old):
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
 		if (instance.state == cur):
@@ -104,7 +173,7 @@ class ClusterManagerService(object):
 			instance = self.data.acquireInstance(instanceId)
 			if instance.hostId == host.id:
 				instance.decayed = True
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
 
@@ -148,10 +217,16 @@ class ClusterManagerService(object):
 		for hostId in self.hostLastContactTime.keys():
 			#self.log.warning("iterate %d" % hostId)
 			host = self.data.acquireHost(hostId)
-			if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+			# XXXstroucki: timing has changed with the message
+			# buffering in the NM, so this wasn't being run any-
+			# more because the time check was passing.
+			# I should think a bit more about this, but
+			# the "if True" is probably appropriate.
+			#if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+			if True:
 				host.decayed = True
 
-				self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+				self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
 				
 				myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
 
@@ -202,7 +277,7 @@ class ClusterManagerService(object):
 					continue
 
 				instance.decayed = True
-				self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+				self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
 				if instance.hostId is None: raise AssertionError
 
 				# XXXstroucki check if host is down?
@@ -225,23 +300,6 @@ class ClusterManagerService(object):
 				self.data.releaseInstance(instance)
 
 
-
-	def monitorCluster(self):
-		while True:
-			sleepFor = min(self.expireHostTime, self.allowDecayed)
-
-			try:
-				self.__checkHosts()
-				self.__checkInstances()
-			except:
-				self.log.exception('monitorCluster iteration failed')
-			#  XXXrgass too chatty.  Remove
-			# XXXstroucki the risk is that a deadlock in obtaining
-			# data could prevent this loop from continuing.
-			#self.log.info("Sleeping for %d seconds" % sleepFor)
-			time.sleep(sleepFor)
-
-
 	def normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
@@ -276,12 +334,14 @@ class ClusterManagerService(object):
 		instance = self.normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM REQUEST", instance=instance)
 		return instance
 	
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].shutdownVm(instance.vmId)
@@ -293,14 +353,17 @@ class ClusterManagerService(object):
 	def destroyVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
+			self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
 			self.data.removeInstance(instance)
 		elif (instance.state is InstanceState.Activating):
-			self.stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
+			self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
 			self.data.releaseInstance(instance)
 		else:
 			# XXXstroucki: This is a problem with keeping
 			# clean state.
-			self.stateTransition(instance, None, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY", instance=instance)
+			self.__stateTransition(instance, None, InstanceState.Destroying)
 			if instance.hostId is None:
 				self.data.removeInstance(instance)
 			else:
@@ -318,8 +381,9 @@ class ClusterManagerService(object):
 	
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SUSPEND", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		destination = "suspend/%d_%s" % (instance.id, instance.name)
 		try:
@@ -331,14 +395,16 @@ class ClusterManagerService(object):
 	
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+		self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
 		source = "suspend/%d_%s" % (instance.id, instance.name)
 		instance.hints['__resume_source'] = source
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM RESUME", instance=instance)
 		return instance
 	
 	def migrateVm(self, instanceId, targetHostId):
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM MIGRATE", instance=instance)
 		try:
 			# FIXME: should these be acquire/release host?
 			targetHost = self.data.getHost(targetHostId)
@@ -347,7 +413,7 @@ class ClusterManagerService(object):
 		except:
 			self.data.releaseInstance(instance)
 			raise
-		self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
 		self.data.releaseInstance(instance)
 		try:
 			# Prepare the target
@@ -355,16 +421,16 @@ class ClusterManagerService(object):
 			self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
 			self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
 			cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
-		except Exception, e:
+		except Exception:
 			self.log.exception('prepReceiveVm failed')
 			raise
 		instance = self.data.acquireInstance(instance.id)
-		self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+		self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
 		self.data.releaseInstance(instance)
 		try:
 			# Send the VM
 			self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
-		except Exception, e:
+		except Exception:
 			self.log.exception('migrateVm failed')
 			raise
 		try:
@@ -376,15 +442,16 @@ class ClusterManagerService(object):
 		try:
 			# Notify the target
 			vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
-		except Exception, e:
+		except Exception:
 			self.log.exception('receiveVm failed')
 			raise
 		return
 	
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM PAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].pauseVm(instance.vmId)
@@ -392,14 +459,15 @@ class ClusterManagerService(object):
 			self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+		self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
 		self.data.releaseInstance(instance)
 		return
 
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+		self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].unpauseVm(instance.vmId)
@@ -407,7 +475,7 @@ class ClusterManagerService(object):
 			self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+		self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
 		self.data.releaseInstance(instance)
 		return
 	
@@ -442,6 +510,7 @@ class ClusterManagerService(object):
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
+		self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
 		try:
 			res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
 		except Exception:
@@ -468,8 +537,7 @@ class ClusterManagerService(object):
 
 		if oldHost.up == False:
 			self.__upHost(oldHost)
-		self.hostLastContactTime[host.id] = time.time()
-		#self.hostLastUpdateTime[host.id] = time.time()
+		self.hostLastContactTime[host.id] = self.__now()
 		oldHost.version = host.version
 		oldHost.memory = host.memory
 		oldHost.cores = host.cores
@@ -500,8 +568,9 @@ class ClusterManagerService(object):
 			self.log.exception("Could not acquire instance")
 			raise
 
-		self.instanceLastContactTime[instanceId] = time.time()
+		self.instanceLastContactTime[instanceId] = self.__now()
 		oldInstance.decayed = False
+		self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
 
 		if (instance.state == InstanceState.Exited):
 			# determine why a VM has exited
@@ -509,7 +578,7 @@ class ClusterManagerService(object):
 			if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
 				self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
 			if (oldInstance.state == InstanceState.Suspending):
-				self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
+				self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
 				self.data.releaseInstance(oldInstance)
@@ -552,13 +621,14 @@ class ClusterManagerService(object):
 
 		self.data.releaseHost(dataHost)
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
 
 		if ('__resume_source' in instance.hints):
-			self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+			self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
 		else:
 			# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
-			#self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
-			self.stateTransition(instance, None, InstanceState.Activating)
+			#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+			self.__stateTransition(instance, None, InstanceState.Activating)
 
 		instance.hostId = host.id
 		self.data.releaseInstance(instance)
@@ -568,14 +638,14 @@ class ClusterManagerService(object):
 				vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
 			else:
 				vmId = self.proxy[host.name].instantiateVm(instance)
-		except Exception, e:
+		except Exception:
 			instance = self.data.acquireInstance(instanceId)
 			if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
 				self.data.removeInstance(instance)
 			else:
 				# XXXstroucki what can we do about pending hosts in the scheduler?
 				# put them at the end of the queue and keep trying?
-				self.stateTransition(instance, None, InstanceState.Held)
+				self.__stateTransition(instance, None, InstanceState.Held)
 				instance.hostId = None
 				self.data.releaseInstance(instance)
 			return "failure"
@@ -594,7 +664,7 @@ class ClusterManagerService(object):
 		else:
 			if ('__resume_source' not in instance.hints):
 				# XXXstroucki should we just wait for NM to update?
-				#self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+				#self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
 				pass
 
 		self.data.releaseInstance(instance)
@@ -606,9 +676,41 @@ class ClusterManagerService(object):
                         self.log.info("Host %s is already registered, it was updated now" % hostname)
                 else:
                         self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST REGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+
                 return hostId
 
         def unregisterHost(self, hostId):
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST UNREGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+			return
+
                 self.data.unregisterHost(hostId)
                 self.log.info("Host %s was unregistered" % hostId)
                 return
+
+	# service thread
+	def __monitorCluster(self):
+		while True:
+			sleepFor = min(self.expireHostTime, self.allowDecayed)
+
+			try:
+				self.__checkHosts()
+				self.__checkInstances()
+			except:
+				self.log.exception('monitorCluster iteration failed')
+			#  XXXrgass too chatty.  Remove
+			# XXXstroucki the risk is that a deadlock in obtaining
+			# data could prevent this loop from continuing.
+			#self.log.info("Sleeping for %d seconds" % sleepFor)
+			time.sleep(sleepFor)
+
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Wed Jan 11 14:00:56 2012
@@ -20,7 +20,7 @@ import threading
 import os
 import ConfigParser
 
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
 from tashi.clustermanager.data import DataInterface
 
 class FromConfig(DataInterface):
@@ -109,7 +109,7 @@ class FromConfig(DataInterface):
 	def releaseInstance(self, instance):
 		try:
 			if (instance.id not in self.instances): # MPR: should never be true, but good to check
-				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instanceId)})
+				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
 		finally:
 			self.releaseLock(instance._lock)
 	

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Wed Jan 11 14:00:56 2012
@@ -18,7 +18,7 @@
 import cPickle
 import os
 import threading
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi.clustermanager.data import FromConfig, DataInterface
 
 class Pickled(FromConfig):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Wed Jan 11 14:00:56 2012
@@ -17,11 +17,9 @@
 
 import logging
 import threading
-import time
-import types
 # XXXstroucki getImages needs os?
 import os
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
 from tashi.clustermanager.data.datainterface import DataInterface
 from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
 
@@ -45,7 +43,7 @@ class SQL(DataInterface):
 			self.password = self.config.get('SQL', 'password')
 			self.conn = MySQLdb.connect(host=host, user=user, passwd=self.password, db=db)
 		else:
-			raise ValueException, 'Unknown SQL database engine by URI: %s' % (self.uri)
+			raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
 
 		self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
 		self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
@@ -319,7 +317,7 @@ class SQL(DataInterface):
 		for r in res:
 			if r[1] == hostname:
 				id = r[0]
-				print "Host %s already registered, update will be done" % id
+				self.log.warning("Host %s already registered, update will be done" % id)
 				s = ""
 				host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
 				l = self.makeHostList(host)

Modified: incubator/tashi/trunk/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/connectionmanager.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/trunk/src/tashi/connectionmanager.py Wed Jan 11 14:00:56 2012
@@ -15,9 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import rpyc
 from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import *
+#from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
 	def __init__(self, username, password, port, timeout=10000.0):

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Wed Jan 11 14:00:56 2012
@@ -22,7 +22,6 @@ import signal
 import sys
 
 from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi import ConnectionManager
 import tashi
 from tashi import boolean
 

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Wed Jan 11 14:00:56 2012
@@ -15,17 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import cPickle
 import logging
-import os
 import socket
-import sys
 import threading
 import time
 
-from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.nodemanager import RPC
-from tashi import boolean, vmStates, logged, ConnectionManager, timed
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import InstanceState, TashiException, Errors, Instance
+from tashi import boolean, vmStates, ConnectionManager
 import tashi
 
 
@@ -52,167 +49,263 @@ class NodeManagerService(object):
 		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
 		self.infoFile = self.config.get('NodeManagerService', 'infoFile')
 		self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
-		self.id = None
+		self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+		try:
+			self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+		except:
+			self.log.exception("Could not connect to CM")
+			return
+
+		self.accountingHost = None
+		self.accountingPort = None
+		try:
+			self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+			self.accountingPort = self.config.get('NodeManagerService', 'accountingPort')
+		except:
+			pass
+
 		self.notifyCM = []
-		self.loadVmInfo()
-		vmList = self.vmm.listVms()
-		for vmId in vmList:
-			if (vmId not in self.instances):
-				self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
-				self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
-		for vmId in self.instances.keys():
-			if (vmId not in vmList):
-				self.log.warning('vmcontrol backend does not report %d' % (vmId))
-				self.vmStateChange(vmId, None, InstanceState.Exited)
-		self.registerHost()
-		threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
-		threading.Thread(target=self.registerWithClusterManager).start()
-		threading.Thread(target=self.statsThread).start()
-	
-	def loadVmInfo(self):
+
+		self.__initAccounting()
+
+		self.id = None
+		# XXXstroucki this fn could be in this level maybe?
+		self.host = self.vmm.getHostInfo(self)
+
+		# populate self.instances
+		self.__loadVmInfo()
+
+		self.__registerHost()
+
+		self.id = self.cm.registerNodeManager(self.host, self.instances.values())
+
+		# XXXstroucki cut cross check for NM/VMM state
+
+		# start service threads
+		threading.Thread(target=self.__registerWithClusterManager).start()
+		threading.Thread(target=self.__statsThread).start()
+	
+	def __initAccounting(self):
+                self.accountBuffer = []
+                self.accountLines = 0
+                self.accountingClient = None
+                try:
+                        if (self.accountingHost is not None) and \
+                                    (self.accountingPort is not None):
+                                self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+                except:
+                        self.log.exception("Could not init accounting")
+			
+	def __loadVmInfo(self):
 		try:
 			self.instances = self.vmm.getInstances()
-		except Exception, e:
+		except Exception:
 			self.log.exception('Failed to obtain VM info')
 			self.instances = {}
-	
-	def saveVmInfo(self):
+
+	# send data to CM
+	# XXXstroucki adapt this for accounting?
+	def __flushNotifyCM(self):
+		start = time.time()
+		# send data to CM, adding message to buffer if
+		# it fails
 		try:
-			data = cPickle.dumps(self.instances)
-			f = open(self.infoFile, "w")
-			f.write(data)
-			f.close()
+			notifyCM = []
+			try:
+				while (len(self.notifyCM) > 0):
+					value = self.notifyCM.pop(0)
+					(instanceId, newInst, old, success) = value
+					try:
+						self.cm.vmUpdate(instanceId, newInst, old)
+					except TashiException, e:
+						notifyCM.append((instanceId, newInst, old, success))
+						if (e.errno != Errors.IncorrectVmState):
+							raise
+					except:
+						notifyCM.append((instanceId, newInst, old, success))
+						raise
+					else:
+						success()
+			finally:
+				if len(notifyCM) > 0:
+					self.notifyCM.append(notifyCM)
 		except Exception, e:
-			self.log.exception('Failed to save VM info to %s' % (self.infoFile))
-	
-	def vmStateChange(self, vmId, old, cur):
-		instance = self.getInstance(vmId)
-		if (old and instance.state != old):
-			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-		if (cur == InstanceState.Exited):
-			del self.instances[vmId]
-			return True
+			self.log.exception('Failed to send data to the CM')
 
-		if (instance.state == cur):
-			# Don't do anything if state is what it should be
-			return True
+		toSleep = start - time.time() + self.registerFrequency
+		if (toSleep > 0):
+			time.sleep(toSleep)
 
-		instance.state = cur
-		newInst = Instance(d={'state':cur})
-		success = lambda: None
+        def __ACCOUNTFLUSH(self):
 		try:
-			cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-			cm.vmUpdate(instance.id, newInst, old)
-		except Exception, e:
-			self.log.exception('RPC failed for vmUpdate on CM')
-			self.notifyCM.append((instance.id, newInst, old, success))
-		else:
-			success()
-		return True
-	
-	def backupVmInfoAndFlushNotifyCM(self):
-		cm = None
-		cmUseCount = 0
-		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					# XXXstroucki hope rpyc handles destruction
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
+			if (self.accountingClient is not None):
+				self.accountingClient.record(self.accountBuffer)
+			self.accountLines = 0
+			self.accountBuffer = []
+		except:
+			self.log.exception("Failed to flush accounting data")
 
-			cmUseCount = cmUseCount + 1
-			start = time.time()
+
+        def __ACCOUNT(self, text, instance=None, host=None):
+                now = time.time()
+                instanceText = None
+                hostText = None
+
+                if instance is not None:
 			try:
-				self.saveVmInfo()
-			except Exception, e:
-				self.log.exception('Failed to save VM info')
+                        	instanceText = 'Instance(%s)' % (instance)
+			except:
+				self.log.exception("Invalid instance data")
+
+                if host is not None:
 			try:
-				notifyCM = []
-				try:
-					while (len(self.notifyCM) > 0):
-						(instanceId, newInst, old, success) = self.notifyCM.pop(0)
-						try:
-							cm.vmUpdate(instanceId, newInst, old)
-						except TashiException, e:
-							notifyCM.append((instanceId, newInst, old, success))
-							if (e.errno != Errors.IncorrectVmState):
-								raise
-						except:
-							notifyCM.append((instanceId, newInst, old, success))
-							raise
-						else:
-							success()
-				finally:
-					self.notifyCM = self.notifyCM + notifyCM
-			except Exception, e:
-				self.log.exception('Failed to register with the CM')
-			toSleep = start - time.time() + self.registerFrequency
-			if (toSleep > 0):
-				time.sleep(toSleep)
-	
-	def registerWithClusterManager(self):
-		cm = None
-		cmUseCount = 0
+                        	hostText = "Host(%s)" % (host)
+			except:
+				self.log.exception("Invalid host data")
+
+                secondary = ','.join(filter(None, (hostText, instanceText)))
+
+                line = "%s|%s|%s" % (now, text, secondary)
+
+                self.accountBuffer.append(line)
+                self.accountLines += 1
+
+		# XXXstroucki think about force flush every so often
+                if (self.accountLines > 0):
+                        self.__ACCOUNTFLUSH()
+
+
+	# service thread function
+	def __registerWithClusterManager(self):
 		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
-			cmUseCount = cmUseCount + 1
+			#self.__ACCOUNT("TESTING")
 			start = time.time()
 			try:
-				host = self.vmm.getHostInfo(self)
 				instances = self.instances.values()
-				#import pprint
-				#self.log.warning("Instances: " + pprint.saferepr(instances))
-				self.id = cm.registerNodeManager(host, instances)
-			except Exception, e:
+				self.id = self.cm.registerNodeManager(self.host, instances)
+			except Exception:
 				self.log.exception('Failed to register with the CM')
+
 			toSleep = start - time.time() + self.registerFrequency
 			if (toSleep > 0):
 				time.sleep(toSleep)
-	
-	def getInstance(self, vmId):
+
+	# service thread function
+	def __statsThread(self):
+		if (self.statsInterval == 0):
+			return
+		while True:
+			try:
+				publishList = []
+				for vmId in self.instances.keys():
+					try:
+						instance = self.instances.get(vmId, None)
+						if (not instance):
+							continue
+						id = instance.id
+						stats = self.vmm.getStats(vmId)
+						for stat in stats:
+							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
+					except:
+						self.log.exception('statsThread threw an exception')
+				if (len(publishList) > 0):
+					tashi.publisher.publishList(publishList)
+			except:
+				self.log.exception('statsThread threw an exception')
+			time.sleep(self.statsInterval)
+
+        def __registerHost(self):
+                hostname = socket.gethostname()
+		# populate some defaults
+		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+		memory = 0
+		cores = 0
+		version = "empty"
+                #self.cm.registerHost(hostname, memory, cores, version)
+
+	def __getInstance(self, vmId):
 		instance = self.instances.get(vmId, None)
 		if (instance is None):
 			raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
 		return instance
 	
-	def instantiateVm(self, instance):
-		vmId = self.vmm.instantiateVm(instance)
-		instance.vmId = vmId
-		instance.state = InstanceState.Running
+	# remote
+	# Called from VMM to update self.instances
+	# but only changes are Exited, MigrateTrans and Running
+	# qemu.py calls this in the matchSystemPids thread
+	# xenpv.py: i have no real idea why it is called there
+	def vmStateChange(self, vmId, old, cur):
+		instance = self.__getInstance(vmId)
+
+		if (instance.state == cur):
+			# Don't do anything if state is what it should be
+			return True
+
+		if (old and instance.state != old):
+			# make a note of mismatch, but go on.
+			# the VMM should know best
+			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+                        
+		instance.state = cur
+
+		self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
+			      
+		newInst = Instance(d={'state':cur})
+		success = lambda: None
+		# send the state change up to the CM
+		self.notifyCM.append((instance.id, newInst, old, success))
+		self.__flushNotifyCM()
+
+		# cache change locally
+		self.instances[vmId] = instance
+
+		if (cur == InstanceState.Exited):
+			# At this point, the VMM will clean up,
+			# so forget about this instance
+			del self.instances[vmId]
+			return True
+
+		return True
+
+	# remote
+	def createInstance(self, instance):
+		vmId = instance.vmId
 		self.instances[vmId] = instance
-		return vmId
+		
 	
+	# remote
+	def instantiateVm(self, instance):
+		self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
+		try:
+			vmId = self.vmm.instantiateVm(instance)
+			#instance.vmId = vmId
+			#instance.state = InstanceState.Running
+			#self.instances[vmId] = instance
+			return vmId
+		except:
+			self.log.exception("Failed to start instance")
+	
+	# remote
 	def suspendVm(self, vmId, destination):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SUSPEND", instance=instance)
+
 		instance.state = InstanceState.Suspending
+		self.instances[vmId] = instance
 		threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
 	
-	def resumeVmHelper(self, instance, name):
+	# called by resumeVm as thread
+	def __resumeVmHelper(self, instance, name):
 		self.vmm.resumeVmHelper(instance, name)
 		instance.state = InstanceState.Running
 		newInstance = Instance(d={'id':instance.id,'state':instance.state})
 		success = lambda: None
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in resumeVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
-		else:
-			success()
-	
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
+		self.__flushNotifyCM()
+
+	# remote
 	def resumeVm(self, instance, name):
+		self.__ACCOUNT("NM VM RESUME", instance=instance)
 		instance.state = InstanceState.Resuming
 		instance.hostId = self.id
 		try:
@@ -224,27 +317,39 @@ class NodeManagerService(object):
 			raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
 		return instance.vmId
 	
+	# remote
 	def prepReceiveVm(self, instance, source):
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
 		instance.vmId = -1
 		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
 		return transportCookie
 
+	# remote
 	def prepSourceVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE SOURCE PREP", instance=instance)
 		instance.state = InstanceState.MigratePrep
-	
-	def migrateVmHelper(self, instance, target, transportCookie):
+		self.instances[vmId] = instance
+
+	# called by migrateVm as thread
+	# XXXstroucki migrate out?
+	def __migrateVmHelper(self, instance, target, transportCookie):
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
 		del self.instances[instance.vmId]
-		
+
+	# remote
+	# XXXstroucki migrate out?
 	def migrateVm(self, vmId, target, transportCookie):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE", instance=instance)
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
+		self.instances[vmId] = instance
+		threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
 		return
 	
-	def receiveVmHelper(self, instance, transportCookie):
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+	# called by receiveVm as thread
+	# XXXstroucki migrate in?
+	def __receiveVmHelper(self, instance, transportCookie):
 		vmId = self.vmm.receiveVm(transportCookie)
 		instance.state = InstanceState.Running
 		instance.hostId = self.id
@@ -252,83 +357,69 @@ class NodeManagerService(object):
 		self.instances[vmId] = instance
 		newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
 		success = lambda: None
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in receiveVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
-		else:
-			success()
-	
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+		self.__flushNotifyCM()
+
+	# remote
+	# XXXstroucki migrate in?
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
+		vmId = instance.vmId
+		self.instances[vmId] = instance
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
+		threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
 		return
-	
+
+	# remote
 	def pauseVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM PAUSE", instance=instance)
 		instance.state = InstanceState.Pausing
+		self.instances[vmId] = instance
 		self.vmm.pauseVm(vmId)
 		instance.state = InstanceState.Paused
-	
+		self.instances[vmId] = instance
+
+	# remote
 	def unpauseVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM UNPAUSE", instance=instance)
 		instance.state = InstanceState.Unpausing
+		self.instances[vmId] = instance
 		self.vmm.unpauseVm(vmId)
 		instance.state = InstanceState.Running
-	
+		self.instances[vmId] = instance
+
+	# remote
 	def shutdownVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SHUTDOWN", instance=instance)
 		instance.state = InstanceState.ShuttingDown
+		self.instances[vmId] = instance
 		self.vmm.shutdownVm(vmId)
-	
+
+	# remote
 	def destroyVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM DESTROY", instance=instance)
 		instance.state = InstanceState.Destroying
+		self.instances[vmId] = instance
 		self.vmm.destroyVm(vmId)
-	
+
+	# remote
 	def getVmInfo(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
 		return instance
-	
+
+	# remote
 	def vmmSpecificCall(self, vmId, arg):
 		return self.vmm.vmmSpecificCall(vmId, arg)
-	
+
+	# remote
 	def listVms(self):
 		return self.instances.keys()
 
+	# remote
 	def liveCheck(self):
 		return "alive"
 	
-	def statsThread(self):
-		if (self.statsInterval == 0):
-			return
-		while True:
-			try:
-				publishList = []
-				for vmId in self.instances.keys():
-					try:
-						instance = self.instances.get(vmId, None)
-						if (not instance):
-							continue
-						id = instance.id
-						stats = self.vmm.getStats(vmId)
-						for stat in stats:
-							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
-					except:
-						self.log.exception('statsThread threw an exception')
-				if (len(publishList) > 0):
-					tashi.publisher.publishList(publishList)
-			except:
-				self.log.exception('statsThread threw an exception')
-			time.sleep(self.statsInterval)
-
-        def registerHost(self):
-                cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-                hostname = socket.gethostname()
-		# populate some defaults
-		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
-		memory = 0
-		cores = 0
-		version = "empty"
-                #cm.registerHost(hostname, memory, cores, version)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Wed Jan 11 14:00:56 2012
@@ -27,11 +27,9 @@ import subprocess
 import sys
 import time
 
-# for scratch space support
-from os import system
-
-from tashi.rpycservices.rpyctypes import *
-from tashi.util import broken, logged, scrubString, boolean
+#from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import InstanceState, Host
+from tashi.util import scrubString, boolean
 from tashi import version, stringPartition
 from vmcontrolinterface import VmControlInterface
 
@@ -47,7 +45,7 @@ def controlConsole(child, port):
 		try:
 			listenSocket.listen(5)
 			ls = listenSocket.fileno()
-			input = child.monitorFd
+			#input = child.monitorFd
 			output = child.monitorFd
 			#print "listen"
 			select.select([ls], [], [])
@@ -96,6 +94,10 @@ class Qemu(VmControlInterface):
 		self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
 		self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
 		self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
+		# XXXstroucki amount of reserved memory could be configurable
+		self.reservedMem = 512
+		# XXXstroucki perhaps make this configurable
+		self.ifPrefix = "tashi"
 		self.controlledVMs = {}
 		self.usedPorts = []
 		self.usedPortsLock = threading.Lock()
@@ -115,8 +117,10 @@ class Qemu(VmControlInterface):
 			os.mkdir(self.INFO_DIR)
 		except:
 			pass
-		self.scanInfoDir()
-		threading.Thread(target=self.pollVMsLoop).start()
+
+		self.__scanInfoDir()
+
+		threading.Thread(target=self.__pollVMsLoop).start()
 		if (self.statsInterval > 0):
 			threading.Thread(target=self.statsThread).start()
 	
@@ -124,7 +128,7 @@ class Qemu(VmControlInterface):
 		def __init__(self, **attrs):
 			self.__dict__.update(attrs)
 
-	def getSystemPids(self):
+	def __getHostPids(self):
 		"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
 		pids = []
 		for f in os.listdir("/proc"):
@@ -136,83 +140,101 @@ class Qemu(VmControlInterface):
 				pass
 		return pids
 
+	# extern
 	def getInstances(self):
 		"""Will return a dict of instances by vmId to the caller"""
 		return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
 
-	def matchSystemPids(self, controlledVMs):
+	def __matchHostPids(self, controlledVMs):
 		"""This is run in a separate polling thread and it must do things that are thread safe"""
-		if self.nm is None:
-			#XXXstroucki log may not be there yet either
-			#self.log.info("NM hook not yet available")
-			return
 
 		vmIds = controlledVMs.keys()
-		pids = self.getSystemPids()
+		pids = self.__getHostPids()
+
 		for vmId in vmIds:
 			child = controlledVMs[vmId]
+			name = child.instance.name
 
 			if vmId not in pids:
+				# VM is no longer running, but is still
+				# considered controlled
+
+				# remove info file
 				os.unlink(self.INFO_DIR + "/%d"%(vmId))
+
+				# XXXstroucki why not use self.controlledVMs
+				# argument, so why modify this fn's formal?
 				del controlledVMs[vmId]
+
+				# remove any stats (appropriate?)
 				try:
 					del self.stats[vmId]
 				except:
 					pass
+
 				if (child.vncPort >= 0):
 					self.vncPortLock.acquire()
 					self.vncPorts.remove(child.vncPort)
 					self.vncPortLock.release()
-				log.info("Removing vmId %d" % (vmId))
+
+				log.info("Removing vmId %d because it is no longer running" % (vmId))
+
+				# if the VM was started from this process,
+				# wait on it
 				if (child.OSchild):
 					try:
 						os.waitpid(vmId, 0)
 					except:
-						log.exception("waitpid failed")
+						log.exception("waitpid failed for vmId" % (vmId))
+				# recover the child's stderr and monitor
+				# output if possible
 				if (child.errorBit):
 					if (child.OSchild):
 						f = open("/tmp/%d.err" % (vmId), "w")
 						f.write(child.stderr.read())
 						f.close()
+
 					f = open("/tmp/%d.pty" % (vmId), "w")
 					for i in child.monitorHistory:
 						f.write(i)
 					f.close()
-				#XXXstroucki remove scratch storage
+
+				# remove scratch storage
 				try:
 					if self.scratchVg is not None:
-						scratch_name = child.instance.name
-						log.info("Removing scratch for " + scratch_name)
+						log.info("Removing scratch for %s" % (name))
 						cmd = "/sbin/lvremove -f %s" % self.scratchVg
     						result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
 				except:
 					pass
 
+				# let the NM know
 				try:
 					if (not child.migratingOut):
 						self.nm.vmStateChange(vmId, None, InstanceState.Exited)
-				except Exception, e:
-					log.exception("vmStateChange failed")
+				except Exception:
+					log.exception("vmStateChange failed for VM %s" % (name))
 			else:
+				# VM is still running
 				try:
 					if (child.migratingOut):
 						self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
 					else:
 						self.nm.vmStateChange(vmId, None, InstanceState.Running)
 				except:
-					#XXXstroucki nm is initialised at different time
-					log.exception("vmStateChange failed")
+					log.exception("vmStateChange failed for VM %s" % (name))
 						
-	
-	def scanInfoDir(self):
+
+	# called once on startup
+	def __scanInfoDir(self):
 		"""This is not thread-safe and must only be used during class initialization"""
 		controlledVMs = {}
 		controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
 		if (len(controlledVMs) == 0):
-			log.info("No vm information found in %s", self.INFO_DIR)
+			log.info("No VM information found in %s" % (self.INFO_DIR))
 		for vmId in controlledVMs:
 			try:
-				child = self.loadChildInfo(vmId)
+				child = self.__loadChildInfo(vmId)
 				self.vncPortLock.acquire()
 				if (child.vncPort >= 0):
 					self.vncPorts.append(child.vncPort)
@@ -223,37 +245,46 @@ class Qemu(VmControlInterface):
 				#XXXstroucki ensure instance has vmId
 				child.instance.vmId = vmId
 				
-				self.controlledVMs[child.pid] = child
-				log.info("Adding vmId %d" % (child.pid))
-			except Exception, e:
+				self.controlledVMs[vmId] = child
+			except Exception:
 				log.exception("Failed to load VM info for %d", vmId)
 			else:
 				log.info("Loaded VM info for %d", vmId)
-		# XXXstroucki NM may not be available yet here.
-		self.matchSystemPids(self.controlledVMs)
-	
-	def pollVMsLoop(self):
+	# service thread
+	def __pollVMsLoop(self):
 		"""Infinite loop that checks for dead VMs"""
+
+		# As of 2011-12-30, nm is None when this is called, and
+		# is set later by the NM. Things further down require
+		# access to the NM, so wait until it is set.
+		# Moved into __pollVMsLoop since putting it in this thread
+		# will allow the init to complete and nm to be actually
+		# set.
+
+		while self.nm is None:
+			log.info("Waiting for NM initialization")
+			time.sleep(2)
+
 		while True:
 			try:
 				time.sleep(self.POLL_DELAY)
-				self.matchSystemPids(self.controlledVMs)
+				self.__matchHostPids(self.controlledVMs)
 			except:
 				log.exception("Exception in poolVMsLoop")
 	
-	def waitForExit(self, vmId):
+	def __waitForExit(self, vmId):
 		"""This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
 		while vmId in self.controlledVMs:
 			time.sleep(self.POLL_DELAY)
 	
-	def getChildFromPid(self, pid):
+	def __getChildFromPid(self, pid):
 		"""Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
 		child = self.controlledVMs.get(pid, None)
 		if (not child):
 			raise Exception, "Uncontrolled vmId %d" % (pid)
 		return child
 	
-	def consumeAvailable(self, child):
+	def __consumeAvailable(self, child):
 		"""Consume characters one-by-one until they stop coming"""
 		monitorFd = child.monitorFd
 		buf = ""
@@ -296,9 +327,9 @@ class Qemu(VmControlInterface):
 			child.monitorHistory.append(buf[len(needle):])
 		return buf[len(needle):]
 		
-	def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
+	def __enterCommand(self, child, command, expectPrompt = True, timeout = -1):
 		"""Enter a command on the qemu monitor"""
-		res = self.consumeAvailable(child)
+		res = self.__consumeAvailable(child)
 		os.write(child.monitorFd, command + "\n")
 		if (expectPrompt):
 			# XXXstroucki: receiving a vm can take a long time
@@ -306,7 +337,7 @@ class Qemu(VmControlInterface):
 			res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
 		return res
 
-	def loadChildInfo(self, vmId):
+	def __loadChildInfo(self, vmId):
 		child = self.anonClass(pid=vmId)
 		info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
 		(instance, pid, ptyFile) = cPickle.load(info)
@@ -328,11 +359,12 @@ class Qemu(VmControlInterface):
 			child.vncPort = -1
 		return child
 	
-	def saveChildInfo(self, child):
+	def __saveChildInfo(self, child):
 		info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
 		cPickle.dump((child.instance, child.pid, child.ptyFile), info)
 		info.close()
 	
+	# extern
 	def getHostInfo(self, service):
 		host = Host()
 		host.id = service.id
@@ -341,7 +373,7 @@ class Qemu(VmControlInterface):
 		memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
 		if (memoryStr[2] == "kB"):
 			# XXXstroucki should have parameter for reserved mem
-			host.memory = (int(memoryStr[1])/1024) - 512
+			host.memory = (int(memoryStr[1])/1024) - self.reservedMem
 		else:
 			log.warning('Unable to determine amount of physical memory - reporting 0')
 			host.memory = 0
@@ -350,22 +382,39 @@ class Qemu(VmControlInterface):
 		host.decayed = False
 		host.version = version
 		return host
-	
-	def startVm(self, instance, source):
+
+	def __stripSpace(self, s):
+		return "".join(s.split())
+
+	def __startVm(self, instance, source):
 		"""Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
 
-		#  Capture startVm Hints
+		#  Capture __startVm Hints
 		#  CPU hints
 		cpuModel = instance.hints.get("cpumodel")
+
 		cpuString = ""
 		if cpuModel:
+			# clean off whitespace
+			cpuModel = self.__stripSpace(cpuModel)
 			cpuString = "-cpu " + cpuModel 
 
 		#  Clock hints
 		clockString = instance.hints.get("clock", "dynticks")
+		# clean off whitespace
+		clockString = self.__stripSpace(clockString)
 
 		#  Disk hints
+		# XXXstroucki: insert commentary on jcipar's performance
+		# measurements
+		# virtio is recommended, but linux will name devices
+		# vdX instead of sdX. This adds a trap for someone who
+		# converts a physical machine or other virtualization
+		# layer's image to run under Tashi.
 		diskInterface = instance.hints.get("diskInterface", "ide")
+		# clean off whitespace
+		diskInterface = self.__stripSpace(diskInterface)
+
 		diskString = ""
 
 		for index in range(0, len(instance.disks)):
@@ -395,10 +444,10 @@ class Qemu(VmControlInterface):
 
 			diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
 
-		# scratch disk (should be integrated better)
+		# scratch disk
 		scratchSize = instance.hints.get("scratchSpace", "0")
 		scratchSize = int(scratchSize)
-		scratch_file = None
+		scratchName = None
 
 		try:
 			if scratchSize > 0:
@@ -407,18 +456,21 @@ class Qemu(VmControlInterface):
 				# create scratch disk
 				# XXXstroucki: needs to be cleaned somewhere
 				# XXXstroucki: clean user provided instance name
-				scratch_name = "lv" + instance.name
+				scratchName = "lv%s" % instance.name
 				# XXXstroucki hold lock
 				# XXXstroucki check for capacity
-				cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
+				cmd = "/sbin/lvcreate -n%s -L %dG %s" % (scratchName, scratchSize, self.scratchVg)
+				# XXXstroucki check result
 				result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
 				index += 1
 
-				thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
+				thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratchName) ]
 				thisDiskList.append("if=%s" % diskInterface)
 				thisDiskList.append("index=%d" % index)
 				thisDiskList.append("cache=off")
 				
+				# XXXstroucki force scratch disk to be
+				# persistent
 				if (True or disk.persistent):
 					snapshot = "off"
 					migrate = "off"
@@ -431,18 +483,21 @@ class Qemu(VmControlInterface):
 				if (self.useMigrateArgument):
 					thisDiskList.append("migrate=%s" % migrate)
 
-				diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+				diskString = "%s-drive %s " % (diskString, ",".join(thisDiskList))
 
 		except:
-			print 'caught exception'
-			raise 'exception'
+			log.exception('caught exception in scratch disk formation')
+			raise
 	
 		#  Nic hints
 		nicModel = instance.hints.get("nicModel", "virtio")
+		# clean off whitespace
+		nicModel = self.__stripSpace(nicModel)
+
 		nicString = ""
 		for i in range(0, len(instance.nics)):
 			nic = instance.nics[i]
-			nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network)
+			nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
 
 		#  ACPI
 		if (boolean(instance.hints.get("noAcpi", False))):
@@ -452,14 +507,15 @@ class Qemu(VmControlInterface):
 
 		#  Construct the qemu command
 		strCmd = "%s %s %s -clock %s %s %s -m %d -smp %d -serial null -vnc none -monitor pty" % (self.QEMU_BIN, noAcpiString, cpuString, clockString, diskString, nicString, instance.memory, instance.cores)
-		cmd = strCmd.split()
 		if (source):
-			cmd = cmd + ["-incoming", source]
-			strCmd = strCmd + " -incoming %s" % (source)
-		log.info("QEMU command: %s" % (strCmd))
+			strCmd = "%s -incoming %s" % (strCmd, source)
+		cmd = strCmd.split()
+
+		log.info("Executing command: %s" % (strCmd))
 		(pipe_r, pipe_w) = os.pipe()
 		pid = os.fork()
 		if (pid == 0):
+			# child process
 			pid = os.getpid()
 			os.setpgid(pid, pid)
 			os.close(pipe_r)
@@ -474,146 +530,192 @@ class Qemu(VmControlInterface):
 					os.close(i)
 				except:
 					pass
+
 			# XXXstroucki unfortunately no kvm option yet
+			# to direct COW differences elsewhere, so change
+			# this process' TMPDIR, which kvm will honour
 			os.environ['TMPDIR'] = self.scratchDir
 			os.execl(self.QEMU_BIN, *cmd)
 			sys.exit(-1)
+
+		# parent process
 		os.close(pipe_w)
 		child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
 		child.ptyFile = None
 		child.vncPort = -1
 		child.instance.vmId = child.pid
-		self.saveChildInfo(child)
+		self.__saveChildInfo(child)
 		self.controlledVMs[child.pid] = child
 		log.info("Adding vmId %d" % (child.pid))
 		return (child.pid, cmd)
 
-	def getPtyInfo(self, child, issueContinue):
+	def __getPtyInfo(self, child, issueContinue):
 		ptyFile = None
 		while not ptyFile:
-			l = child.stderr.readline()
-			if (l == ""):
+			line = child.stderr.readline()
+			if (line == ""):
 				try:
 					os.waitpid(child.pid, 0)
 				except:
 					log.exception("waitpid failed")
 				raise Exception, "Failed to start VM -- ptyFile not found"
-			if (l.find("char device redirected to ") != -1):
-				ptyFile=l[26:].strip()
+			redirLine = "char device redirected to "
+			if (line.find(redirLine) != -1):
+				ptyFile=line[len(redirLine):].strip()
 				break
 		child.ptyFile = ptyFile
 		child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
 		child.monitor = os.fdopen(child.monitorFd)
-		self.saveChildInfo(child)
+		self.__saveChildInfo(child)
 		if (issueContinue):
 			# XXXstroucki: receiving a vm can take a long time
-			self.enterCommand(child, "c", timeout=None)
+			self.__enterCommand(child, "c", timeout=None)
 	
-	def stopVm(self, vmId, target, stopFirst):
+	def __stopVm(self, vmId, target, stopFirst):
 		"""Universal function to stop a VM -- used by suspendVM, migrateVM """
-		child = self.getChildFromPid(vmId)
+		child = self.__getChildFromPid(vmId)
 		if (stopFirst):
-			self.enterCommand(child, "stop")
+			self.__enterCommand(child, "stop")
 		if (target):
 			retry = self.migrationRetries
 			while (retry > 0):
-				res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
+				# migrate in foreground respecting cow backed
+				# images
+				# XXXstroucki if we're doing this in the fg
+				# then it may still be ongoing when the timeout
+				# happens, and no way of interrupting it
+				# trying to restart the migration by running
+				# the command again (when qemu is ready to
+				# listen again) is probably not helpful
+				res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
 				retry = retry - 1
 				if (res.find("migration failed") == -1):
-					retry = -1
+					retry = 0
+					break
 				else:
 					log.error("Migration (transiently) failed: %s\n", res)
 			if (retry == 0):
 				log.error("Migration failed: %s\n", res)
 				child.errorBit = True
 				raise RuntimeError
-		self.enterCommand(child, "quit", expectPrompt=False)
+		# XXXstroucki what if migration is still ongoing, and
+		# qemu is not listening?
+		self.__enterCommand(child, "quit", expectPrompt=False)
 		return vmId
-	
+
+	# extern	
 	def instantiateVm(self, instance):
-		(vmId, cmd) = self.startVm(instance, None)
-		child = self.getChildFromPid(vmId)
-		self.getPtyInfo(child, False)
-		child.cmd = cmd
-		self.saveChildInfo(child)
-		return vmId
+		try:
+			(vmId, cmd) = self.__startVm(instance, None)
+			child = self.__getChildFromPid(vmId)
+			self.__getPtyInfo(child, False)
+			child.cmd = cmd
+			self.nm.createInstance(child.instance)
+			self.nm.vmStateChange(vmId, None, InstanceState.Running)
+			# XXXstroucki Should make sure Running state is saved
+			# otherwise on restart it will appear as Activating
+			# until we update the state in __matchSystemPids
+			child.instance.state = InstanceState.Running
+			self.__saveChildInfo(child)
+			return vmId
+		except:
+			log.exception("instantiateVm failed")
+			raise
 	
+	# extern
 	def suspendVm(self, vmId, target):
-		tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
+		tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
 		# XXX: Use fifo to improve performance
-		vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
+		vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
 		self.dfs.copyTo(tmpTarget, target)
 		return vmId
 	
+	# extern
 	def resumeVmHelper(self, instance, source):
-		child = self.getChildFromPid(instance.vmId)
+		child = self.__getChildFromPid(instance.vmId)
 		try:
-			self.getPtyInfo(child, True)
-		except RuntimeError, e:
+			self.__getPtyInfo(child, True)
+		except RuntimeError:
 			log.error("Failed to get pty info -- VM likely died")
 			child.errorBit = True
 			raise
 		status = "paused"
 		while ("running" not in status):
-			status = self.enterCommand(child, "info status")
+			status = self.__enterCommand(child, "info status")
 			time.sleep(1)
+		child.instance.state = InstanceState.Running
+		self.__saveChildInfo(child)
 	
+	# extern
 	def resumeVm(self, instance, source):
 		fn = self.dfs.getLocalHandle("%s" % (source))
-		(vmId, cmd) = self.startVm(instance, "exec:zcat %s" % (fn))
-		child = self.getChildFromPid(vmId)
+		(vmId, cmd) = self.__startVm(instance, "exec:zcat %s" % (fn))
+		child = self.__getChildFromPid(vmId)
 		child.cmd = cmd
 		return vmId
-	
+
+	def __checkPortListening(self, port):
+		lc = 0
+		# XXXpipe: find whether something is listening yet on the port
+		(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+		stdin.close()
+		r = stdout.read()
+		lc = int(r.strip())
+		if (lc < 1):
+			return False
+		else:
+			return True
+
+	# extern
 	def prepReceiveVm(self, instance, source):
 		self.usedPortsLock.acquire()
-		port = int(random.random()*1000+19000)
-		while port in self.usedPorts:
-			port = int(random.random()*1000+19000)
+		while True:
+			port = random.randint(19000, 20000)
+			if port not in self.usedPorts:
+				break
+
 		self.usedPorts.append(port)
 		self.usedPortsLock.release()
-		(vmId, cmd) = self.startVm(instance, "tcp:0.0.0.0:%d" % (port))
+		(vmId, cmd) = self.__startVm(instance, "tcp:0.0.0.0:%d" % (port))
 		transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
-		child = self.getChildFromPid(vmId)
+		child = self.__getChildFromPid(vmId)
 		child.cmd = cmd
 		child.transportCookie = transportCookie
-		self.saveChildInfo(child)
-		# XXX: Cleanly wait until the port is open
-		lc = 0
-		while (lc < 1):
-# XXXpipe: find whether something is listening yet on the port
-			(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
-			stdin.close()
-			r = stdout.read()
-			lc = int(r.strip())
-			if (lc < 1):
-				time.sleep(1.0)
+		self.__saveChildInfo(child)
+		# XXX: Cleanly wait until the port is listening
+		while self.__checkPortListening(port) is not True:
+			time.sleep(1)
+
 		return transportCookie
 	
+	# extern
 	def migrateVm(self, vmId, target, transportCookie):
 		self.migrationSemaphore.acquire()
 		try:
 			(port, _vmId, _hostname) = cPickle.loads(transportCookie)
-			child = self.getChildFromPid(vmId)
+			child = self.__getChildFromPid(vmId)
 			child.migratingOut = True
-			res = self.stopVm(vmId, "tcp:%s:%d" % (target, port), False)
+			# tell the VM to live-migrate out
+			res = self.__stopVm(vmId, "tcp:%s:%d" % (target, port), False)
 			# XXX: Some sort of feedback would be nice
 			# XXX: Should we block?
-			self.waitForExit(vmId)
+			# XXXstroucki: isn't this what __waitForExit does?
+			self.__waitForExit(vmId)
 		finally:
 			self.migrationSemaphore.release()
 		return res
 	
+	# extern
 	def receiveVm(self, transportCookie):
 		(port, vmId, _hostname) = cPickle.loads(transportCookie)
 		try:
-			child = self.getChildFromPid(vmId)
+			child = self.__getChildFromPid(vmId)
 		except:
 			log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
 			raise
 		try:
-			self.getPtyInfo(child, True)
-		except RuntimeError, e:
+			self.__getPtyInfo(child, True)
+		except RuntimeError:
 			log.error("Failed to get pty info -- VM likely died")
 			child.errorBit = True
 			raise
@@ -622,79 +724,121 @@ class Qemu(VmControlInterface):
 		self.usedPortsLock.release()
 		return vmId
 	
+	# extern
 	def pauseVm(self, vmId):
-		child = self.getChildFromPid(vmId)
-		self.enterCommand(child, "stop")
+		child = self.__getChildFromPid(vmId)
+		self.__enterCommand(child, "stop")
+		# XXXstroucki we have no Stopped state, so consider
+		# the VM still Running?
 	
+	# extern
 	def unpauseVm(self, vmId):
-		child = self.getChildFromPid(vmId)
-		self.enterCommand(child, "c")
+		child = self.__getChildFromPid(vmId)
+		self.__enterCommand(child, "c")
+		# XXXstroucki as above, should this be a state change
+		# or not?
 	
+	# extern
 	def shutdownVm(self, vmId):
 		"""'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM with some versions of Linux"""
-		child = self.getChildFromPid(vmId)
-		self.enterCommand(child, "system_powerdown")
+		# If clean shutdown is desired, should try on VM first,
+		# shutdownVm second and if that doesn't work use
+		# destroyVm
+		child = self.__getChildFromPid(vmId)
+		self.__enterCommand(child, "system_powerdown")
 	
+	# extern
 	def destroyVm(self, vmId):
-		child = self.getChildFromPid(vmId)
+		child = self.__getChildFromPid(vmId)
 		child.migratingOut = False
 		# XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
 		os.kill(child.pid, signal.SIGKILL)
 	
+	def __specificStartVnc(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		hostname = socket.gethostname()
+		if (child.vncPort == -1):
+			self.vncPortLock.acquire()
+			port = 0
+			while (port in self.vncPorts):
+				port += 1
+
+			self.vncPorts.append(port)
+			self.vncPortLock.release()
+			self.__enterCommand(child, "change vnc :%d" % (port))
+			child.vncPort = port
+			self.__saveChildInfo(child)
+		port = child.vncPort
+		return "VNC running on %s:%d" % (hostname, port + 5900)
+
+	def __specificStopVnc(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		self.__enterCommand(child, "change vnc none")
+		if (child.vncPort != -1):
+			self.vncPortLock.acquire()
+			self.vncPorts.remove(child.vncPort)
+			self.vncPortLock.release()
+			child.vncPort = -1
+			self.__saveChildInfo(child)
+		return "VNC halted"
+
+	def __specificChangeCdRom(self, vmId, iso):
+		child = self.__getChildFromPid(vmId)
+		imageLocal = self.dfs.getLocalHandle("images/" + iso)
+		self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
+		return "Changed ide1-cd0 to %s" % (iso)
+
+	def __specificStartConsole(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		hostname = socket.gethostname()
+		self.consolePortLock.acquire()
+		# XXXstroucki why not use the existing ports scheme?
+		consolePort = self.consolePort
+		self.consolePort += 1
+		self.consolePortLock.release()
+		threading.Thread(target=controlConsole, args=(child,consolePort)).start()
+		return "Control console listening on %s:%d" % (hostname, consolePort)
+
+	# extern
 	def vmmSpecificCall(self, vmId, arg):
 		arg = arg.lower()
+		changeCdText = "changecdrom:"
+
 		if (arg == "startvnc"):
-			child = self.getChildFromPid(vmId)
-			hostname = socket.gethostname()
-			if (child.vncPort == -1):
-				self.vncPortLock.acquire()
-				port = 0
-				while (port in self.vncPorts):
-					port = port + 1
-				self.vncPorts.append(port)
-				self.vncPortLock.release()
-				self.enterCommand(child, "change vnc :%d" % (port))
-				child.vncPort = port
-				self.saveChildInfo(child)
-			port = child.vncPort
-			return "VNC started on %s:%d" % (hostname, port+5900)
+			return self.__specificStartVnc(vmId)
+
 		elif (arg == "stopvnc"):
-			child = self.getChildFromPid(vmId)
-			self.enterCommand(child, "change vnc none")
-			if (child.vncPort != -1):
-				self.vncPortLock.acquire()
-				self.vncPorts.remove(child.vncPort)
-				self.vncPortLock.release()
-				child.vncPort = -1
-				self.saveChildInfo(child)
-			return "VNC halted"
-		elif (arg.startswith("changecdrom:")):
-			child = self.getChildFromPid(vmId)
-			iso = scrubString(arg[12:])
-			imageLocal = self.dfs.getLocalHandle("images/" + iso)
-			self.enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
-			return "Changed ide1-cd0 to %s" % (iso)
+			return self.__specificStopVnc(vmId)
+
+		elif (arg.startswith(changeCdText)):
+			iso = scrubString(arg[len(changeCdText):])
+			return self.__specificChangeCdRom(vmId, iso)
+
 		elif (arg == "startconsole"):
-			child = self.getChildFromPid(vmId)
-			hostname = socket.gethostname()
-			self.consolePortLock.acquire()
-			consolePort = self.consolePort
-			self.consolePort = self.consolePort+1
-			self.consolePortLock.release()
-			threading.Thread(target=controlConsole, args=(child,consolePort)).start()
-			return "Control console listenting on %s:%d" % (hostname, consolePort)
+			return self.__specificStartConsole(vmId)
+
 		elif (arg == "list"):
-			return "startVnc\nstopVnc\nchangeCdrom:<image.iso>\nstartConsole"
+			commands = [
+				"startVnc",
+				"stopVnc",
+				"changeCdrom:<image.iso>",
+				"startConsole",
+				]
+			return "\n".join(commands)
+				
 		else:
-			return "Unknown arg %s" % (arg)
+			return "Unknown command %s" % (arg)
 	
+	# extern
 	def listVms(self):
 		return self.controlledVMs.keys()
-	
+
+	# thread
 	def statsThread(self):
 		ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
 		netStats = {}
 		cpuStats = {}
+		# XXXstroucki be more exact here?
 		last = time.time() - self.statsInterval
 		while True:
 			now = time.time()
@@ -703,7 +847,7 @@ class Qemu(VmControlInterface):
 				netData = f.readlines()
 				f.close()
 				for l in netData:
-					if (l.find("tashi") != -1):
+					if (l.find(self.ifPrefix) != -1):
 						(dev, sep, ld) = stringPartition(l, ":")
 						dev = dev.strip()
 						ws = ld.split()
@@ -711,6 +855,9 @@ class Qemu(VmControlInterface):
 						sendBytes = float(ws[8])
 						(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
 						if (recvBytes < lastRecvBytes):
+							# We seem to have overflowed
+							# XXXstroucki How likely is this to happen?
+
 							if (lastRecvBytes > 2**32):
 								lastRecvBytes = lastRecvBytes - 2**64
 							else:
@@ -740,13 +887,13 @@ class Qemu(VmControlInterface):
 					child = self.controlledVMs[vmId]
 					(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
 					for i in range(0, len(child.instance.nics)):
-						netDev = "tashi%d.%d" % (child.instance.id, i)
+						netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
 						(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
 						(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
 					self.stats[vmId] = self.stats.get(vmId, {})
 					child = self.controlledVMs.get(vmId, None)
 					if (child):
-						res = self.enterCommand(child, "info blockstats")
+						res = self.__enterCommand(child, "info blockstats")
 						for l in res.split("\n"):
 							(device, sep, data) = stringPartition(l, ": ")
 							if (data != ""):
@@ -764,6 +911,7 @@ class Qemu(VmControlInterface):
 				log.exception("statsThread threw an exception")
 			last = now
 			time.sleep(self.statsInterval)
-	
+
+	# extern	
 	def getStats(self, vmId):
 		return self.stats.get(vmId, {})

Modified: incubator/tashi/trunk/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/parallel.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/parallel.py (original)
+++ incubator/tashi/trunk/src/tashi/parallel.py Wed Jan 11 14:00:56 2012
@@ -116,8 +116,8 @@ def synchronizedmethod(func):
 # Test Code
 ##############################
 import unittest
-import sys
-import time
+#import sys
+#import time
 
 class TestThreadPool(unittest.TestCase):
 	def setUp(self):

Modified: incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py Wed Jan 11 14:00:56 2012
@@ -16,11 +16,12 @@
 # under the License.
 
 import rpyc
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host, User
 import cPickle
 
 clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage']
 nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo', 'liveCheck']
+accountingRPCs = ['record']
 
 def clean(args):
 	"""Cleans the object so cPickle can be used."""
@@ -61,7 +62,7 @@ class client:
 		"""Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
 		if self.conn.closed == True:
 			self.conn = self.createConn()
-		if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+		if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
 			return None
 		def connectWrap(*args):
 			args = cPickle.dumps(clean(args))
@@ -81,6 +82,8 @@ class ManagerService(rpyc.Service):
 	# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
 	def checkValidUser(self, functionName, clientUsername, args):
 		"""Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
+		if self._type == 'AccountingService':
+			return
 		if self._type == 'NodeManagerService':
 			return
 		if clientUsername in ['nodeManager', 'agent', 'root']:
@@ -114,4 +117,7 @@ class ManagerService(rpyc.Service):
 			return makeCall
 		if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
 			return makeCall
+		if self._type == 'AccountingService' and name in accountingRPCs:
+			return makeCall
+
 		raise AttributeError('RPC does not exist')

Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=1230043&r1=1230042&r2=1230043&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Wed Jan 11 14:00:56 2012
@@ -16,11 +16,11 @@
 # under the License.    
 
 import ConfigParser
-import cPickle
+#import cPickle
 import os
-import select
+#import select
 import signal
-import struct
+#import struct
 import sys
 import threading
 import time
@@ -28,7 +28,6 @@ import traceback
 import types
 import getpass
 
-import rpyc
 from tashi.rpycservices import rpycservices
 from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
 
@@ -109,7 +108,7 @@ class failsafe(object):
 		def newFunc(*args, **kw):
 			try:
 				return cur(*args, **kw)
-			except Exception, e:
+			except:
 				self.__dict__['__current_obj__'] = self.__dict__['__failsafe_obj__']
 				return fail(*args, **kw)
 		return newFunc
@@ -197,9 +196,9 @@ def convertExceptions(oldFunc):
 	def newFunc(*args, **kw):
 		try:
 			return oldFunc(*args, **kw)
-		except TashiException, e:
+		except TashiException:
 			raise
-		except Exception, e:
+		except:
 			self = args[0]
 			if (self.convertExceptions):
 				raise TashiException(d={'errno':Errors.ConvertedException, 'msg': traceback.format_exc(10)})
@@ -264,10 +263,12 @@ def scrubString(s, allowed="ABCDEFGHIJKL
 def createClient(config):
 	cfgHost = config.get('Client', 'clusterManagerHost')
 	cfgPort = config.get('Client', 'clusterManagerPort')
-	cfgTimeout = config.get('Client', 'clusterManagerTimeout')
+	#XXXstroucki nothing uses timeout right now
+	#cfgTimeout = config.get('Client', 'clusterManagerTimeout')
 	host = os.getenv('TASHI_CM_HOST', cfgHost)
 	port = os.getenv('TASHI_CM_PORT', cfgPort)
-	timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
+	#XXXstroucki nothing uses timeout right now
+	#timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
 
 	authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
 	if authAndEncrypt:



Mime
View raw message