incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1298170 [1/2] - in /incubator/tashi/branches/oldstable: ./ doc/ etc/ scripts/ src/tashi/ src/tashi/accounting/ src/tashi/agents/ src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/dfs/ src/tashi/nodemanage...
Date Wed, 07 Mar 2012 22:29:25 GMT
Author: stroucki
Date: Wed Mar  7 22:29:24 2012
New Revision: 1298170

URL: http://svn.apache.org/viewvc?rev=1298170&view=rev
Log:
merge oldstable from stable

Added:
    incubator/tashi/branches/oldstable/INSTALL
      - copied unchanged from r1298163, incubator/tashi/branches/stable/INSTALL
    incubator/tashi/branches/oldstable/doc/INSTALL2
      - copied unchanged from r1298163, incubator/tashi/branches/stable/doc/INSTALL2
    incubator/tashi/branches/oldstable/doc/RELEASES
      - copied unchanged from r1298163, incubator/tashi/branches/stable/doc/RELEASES
    incubator/tashi/branches/oldstable/etc/Accounting.cfg
      - copied unchanged from r1298163, incubator/tashi/branches/stable/etc/Accounting.cfg
    incubator/tashi/branches/oldstable/etc/Agent.cfg
      - copied unchanged from r1298163, incubator/tashi/branches/stable/etc/Agent.cfg
    incubator/tashi/branches/oldstable/src/tashi/accounting/
      - copied from r1298163, incubator/tashi/branches/stable/src/tashi/accounting/
Removed:
    incubator/tashi/branches/oldstable/scripts/
Modified:
    incubator/tashi/branches/oldstable/   (props changed)
    incubator/tashi/branches/oldstable/Makefile
    incubator/tashi/branches/oldstable/NOTICE
    incubator/tashi/branches/oldstable/doc/DEVELOPMENT
    incubator/tashi/branches/oldstable/etc/NodeManager.cfg
    incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
    incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
    incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
    incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py
    incubator/tashi/branches/oldstable/src/tashi/dfs/vfs.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/branches/oldstable/src/tashi/parallel.py
    incubator/tashi/branches/oldstable/src/tashi/rpycservices/rpycservices.py
    incubator/tashi/branches/oldstable/src/tashi/util.py
    incubator/tashi/branches/oldstable/src/tashi/version.py

Propchange: incubator/tashi/branches/oldstable/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar  7 22:29:24 2012
@@ -1,4 +1,9 @@
 /incubator/tashi/branches/cmu:1178106-1187632
+/incubator/tashi/branches/stable:1241774-1298163
 /incubator/tashi/branches/stablefix:1203848-1241771
+/incubator/tashi/branches/stroucki-accounting:1221525-1241770
+/incubator/tashi/branches/stroucki-accounting/branches/stroucki-accounting:1221525-1235607
+/incubator/tashi/branches/stroucki-irpbugs:1245857-1292894
+/incubator/tashi/branches/stroucki-slotsbug:1244839-1245041
 /incubator/tashi/branches/zoni-dev/trunk:1034098-1177646
-/incubator/tashi/trunk:1081943-1203835,1203845
+/incubator/tashi/trunk:1081943-1203835,1203845-1241774,1245045,1292488,1292541,1292543,1292895,1293348,1293401,1294310,1294409

Modified: incubator/tashi/branches/oldstable/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/Makefile?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/Makefile (original)
+++ incubator/tashi/branches/oldstable/Makefile Wed Mar  7 22:29:24 2012
@@ -27,6 +27,16 @@ default: bin src/utils/nmd
 all: bin src/utils/nmd src/tags doc/html aws
 	@echo Done
 
+package: src DISCLAIMER INSTALL LICENSE NOTICE README
+	@echo "Building package in apache-tashi.tar.gz"
+	rm -rf apache-tashi.tar.gz apache-tashi
+	mkdir apache-tashi
+	cp -rp doc etc Makefile src DISCLAIMER INSTALL LICENSE NOTICE README apache-tashi/
+	find apache-tashi -type d -name ".svn"|xargs rm -rf
+	-chgrp -R incubator apache-tashi
+	tar zcf apache-tashi.tar.gz apache-tashi
+	rm -rf apache-tashi
+
 doc: rmdoc doc/html
 	@echo Done
 
@@ -35,7 +45,7 @@ clean: rmnmd rmbin rmtags rmdoc rmaws
 	@echo Done
 
 version:
-	sed -i "s/version = .*/version = \"`date`\"/" src/tashi/version.py
+	sed -i "s/version = .*/version = \"`date +%Y-%m-%d`\"/" src/tashi/version.py
 
 aws: src/tashi/aws/wsdl/AmazonEC2_services_types.py src/tashi/aws/wsdl/AmazonEC2_services_server.py
 
@@ -51,49 +61,48 @@ src/tashi/aws/wsdl/2009-04-04.ec2.wsdl:
 rmaws:
 	if test -e src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; then echo Removing aws...; rm -f src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; rm -f src/tashi/aws/wsdl/AmazonEC2_*.py; fi
 
-# Implicit builds
-# src/utils/nmd: src/utils/Makefile src/utils/nmd.c
-#	@echo Building nmd...
-#	(cd src/utils; make)
-#	ln -s ../src/utils/nmd ./bin/nmd
-
 src/utils/nmd: src/utils/nmd.py
-	ln -s ../src/utils/nmd.py ./bin/nmd.py
+	ln -s ../src/utils/nmd.py ./bin/nmd
 
 #rmnmd:
 #	if test -e src/utils/nmd; then echo Removing nmd...; (cd src/utils; make clean); rm -f bin/nmd; fi
 rmnmd:
-	echo Removing nmd...; rm -f bin/nmd.py
+	echo Removing nmd...; rm -f bin/nmd
 
-bin: bindir bin/clustermanager.py bin/nodemanager.py bin/tashi-client.py bin/primitive.py bin/zoni-cli.py
+bin: bindir bin/clustermanager bin/nodemanager bin/tashi-client bin/primitive bin/zoni-cli bin/accounting
 bindir:
 	if test ! -d bin; then mkdir bin; fi
-rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli
+rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli rmaccounting
 	if test -d bin; then rmdir bin; fi
 bin/getInstances: 
 	if test ! -e bin/getInstances; then (echo "Generating client symlinks..."; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --makesyms); fi
 rmclients:
 	if test -e bin/getInstances; then (echo Removing client symlinks...; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --rmsyms; cd ..); fi
-bin/clustermanager.py: src/tashi/clustermanager/clustermanager.py
+bin/accounting: src/tashi/accounting/accounting.py
+	@echo Symlinking in Accounting server...
+	(cd bin; ln -s ../src/tashi/accounting/accounting.py accounting)
+rmaccounting:
+	if test -e bin/accounting; then echo Removing Accounting server symlink...; rm bin/accounting; fi
+bin/clustermanager: src/tashi/clustermanager/clustermanager.py
 	@echo Symlinking in clustermanager...
-	(cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py .)
+	(cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py clustermanager)
 rmclustermanager:
-	if test -e bin/clustermanager.py; then echo Removing clustermanager symlink...; rm bin/clustermanager.py; fi
-bin/nodemanager.py: src/tashi/nodemanager/nodemanager.py
+	if test -e bin/clustermanager; then echo Removing clustermanager symlink...; rm bin/clustermanager; fi
+bin/nodemanager: src/tashi/nodemanager/nodemanager.py
 	@echo Symlinking in nodemanager...
-	(cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py .)
+	(cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py nodemanager)
 rmnodemanager:
-	if test -e bin/nodemanager.py; then echo Removing nodemanager symlink...; rm bin/nodemanager.py; fi
-bin/primitive.py: src/tashi/agents/primitive.py
+	if test -e bin/nodemanager; then echo Removing nodemanager symlink...; rm bin/nodemanager; fi
+bin/primitive: src/tashi/agents/primitive.py
 	@echo Symlinking in primitive...
-	(cd bin; ln -s ../src/tashi/agents/primitive.py .)
+	(cd bin; ln -s ../src/tashi/agents/primitive.py primitive)
 rmprimitive:
-	if test -e bin/primitive.py; then echo Removing primitve-agent symlink...; rm bin/primitive.py; fi
-bin/tashi-client.py:
+	if test -e bin/primitive; then echo Removing primitve-agent symlink...; rm bin/primitive; fi
+bin/tashi-client:
 	@echo Symlinking in tashi-client...
-	(cd bin; ln -s ../src/tashi/client/tashi-client.py .)
+	(cd bin; ln -s ../src/tashi/client/tashi-client.py tashi-client)
 rmtashi-client:
-	if test -e bin/tashi-client.py; then echo Removing tashi-client symlink...; rm bin/tashi-client.py; fi
+	if test -e bin/tashi-client; then echo Removing tashi-client symlink...; rm bin/tashi-client; fi
 src/tags:
 	@echo Generating tags...
 	(cd src; ctags-exuberant -R --c++-kinds=+p --fields=+iaS --extra=+q -f ./tags .)
@@ -107,14 +116,15 @@ rmdoc:
 	if test -d doc/html; then echo Removing HTML docs...; rm -rf ./doc/html; fi
 
 #  Zoni 
-bin/zoni-cli.py:
+bin/zoni-cli:
 	@echo Symlinking in zoni-cli...
-	(cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+	(cd bin; ln -s ../src/zoni/client/zoni-cli.py zoni-client)
+# why necessarily put this in /usr/local/bin like nothing else?
 usr/local/bin/zoni:
 	@echo Creating /usr/local/bin/zoni
 	(echo '#!/bin/bash\nPYTHONPATH=$(shell pwd)/src $(shell pwd)/bin/zoni-cli.py $$*' > /usr/local/bin/zoni; chmod 755 /usr/local/bin/zoni)
 rmzoni-cli:
-	if test -e bin/zoni-cli.py; then echo Removing zoni-cli symlink...; rm bin/zoni-cli.py; fi
+	if test -e bin/zoni-cli; then echo Removing zoni-cli symlink...; rm bin/zoni-cli; fi
 	if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
 
 ## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks

Modified: incubator/tashi/branches/oldstable/NOTICE
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/NOTICE?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/NOTICE (original)
+++ incubator/tashi/branches/oldstable/NOTICE Wed Mar  7 22:29:24 2012
@@ -1,5 +1,5 @@
 Apache Tashi
-Copyright 2008-2011 The Apache Software Foundation
+Copyright 2008-2012 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).

Modified: incubator/tashi/branches/oldstable/doc/DEVELOPMENT
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/doc/DEVELOPMENT?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/doc/DEVELOPMENT (original)
+++ incubator/tashi/branches/oldstable/doc/DEVELOPMENT Wed Mar  7 22:29:24 2012
@@ -1,16 +1,10 @@
 Current goals:
-   * integrate bug fixes from Tashi sites into SVN.
-   * Tashi is working well for us developers, so we should fork
-     a stable version as beta release. Xen startup was problematic
-     as of September, so advise on caveats there. Probably not include
-     Zoni for now.
+   * Add more hardware support for Zoni.
+   * Add virtual hardware to Zoni to allow users to
+      add support for their own hardware?
 
 Future goals:
-   * How should Tashi accounting be done? Database entries or
-     flat files?
-   * Support host auto-registration, but only by affirmative option.
    * Consider using libraries like libcloud to abstract VMM interactions.
-   * Add more hardware support for Zoni.
 
 Other ideas:
    * Make available a console aggregator for user's VMs.

Modified: incubator/tashi/branches/oldstable/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/NodeManager.cfg?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/NodeManager.cfg Wed Mar  7 22:29:24 2012
@@ -58,7 +58,7 @@ handlers = consoleHandler
 propagate = 1
 
 [Vfs]
-prefix = /dfs
+prefix = /tmp
 
 [XenPV]
 vmNamePrefix = tashi
@@ -75,10 +75,11 @@ convertExceptions = True
 port = 9883
 registerHost = False
 registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
 clusterManagerHost = localhost ; Clustermanager hostname
 clusterManagerPort = 9882
 statsInterval = 0.0
+;accountingHost = clustermanager
+;accountingPort = 2228
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Security]

Modified: incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg Wed Mar  7 22:29:24 2012
@@ -55,6 +55,8 @@ allowMismatchedVersions = False
 maxMemory = 8192
 maxCores = 8
 allowDuplicateNames = False
+;accountingHost = clustermanager
+;accountingPort = 2228
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [GetentOverride]
@@ -87,6 +89,10 @@ file = /var/tmp/cm.dat
 uri = mysql://root@clustermanager/tashi
 password = changeme
 
+# Accounting portion
+[Accounting]
+publisher = tashi.messaging.GangliaPublisher
+
 # NodeManger portion
 [NodeManager]
 dfs = tashi.dfs.Vfs
@@ -100,7 +106,6 @@ convertExceptions = True
 port = 9883
 registerHost = False
 registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
 # Clustermanger hostname
 clusterManagerHost = localhost 
 clusterManagerPort = 9882
@@ -108,7 +113,7 @@ statsInterval = 0.0
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Qemu]
-qemuBin = /usr/local/bin/qemu-system-x86_64
+qemuBin = /usr/bin/kvm
 infoDir = /var/tmp/VmControlQemu/
 scratchDir = /tmp
 pollDelay = 1.0
@@ -131,7 +136,7 @@ defaultVmType = pygrub
 defaultDiskType=qcow
 
 [Vfs]
-prefix = /dfs
+prefix = /tmp
 
 [LocalityService]
 host = localityserverhostname
@@ -150,12 +155,12 @@ clusterManagerTimeout = 5.0
 publisher = tashi.messaging.GangliaPublisher
 
 [Primitive]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
 scheduleDelay = 2.0
 densePack = False
 
 [MauiWiki]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
 refreshTime = 5
 authuser = changeme
 authkey = 1111

Modified: incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py Wed Mar  7 22:29:24 2012
@@ -17,16 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from socket import gethostname
-import os
-import socket
-import sys
-import threading
 import time
-import random
 import logging.config
 
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
+
 from tashi.util import getConfig, createClient, instantiateImplementation, boolean
 import tashi
 
@@ -274,10 +269,10 @@ class Primitive(object):
 					# end for unassigned vms
 
 
-			except TashiException, e:
+			except TashiException:
 				self.log.exception("Tashi exception")
 
-			except Exception, e:
+			except Exception:
 				self.log.warning("Scheduler iteration failed")
 
 

Modified: incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py Wed Mar  7 22:29:24 2012
@@ -166,7 +166,7 @@ def getSlots(cores, memory):
 			continue
 		countbycores = int((h.cores - h.usedCores) / cores)
 		countbymemory = int((h.memory - h.usedMemory) / memory)
-		count += min(countbycores, countbymemory)
+		count += max(0, min(countbycores, countbymemory))
 
 	print "%d" % (count),
 	print (lambda:"instances", lambda:"instance")[count == 1](),
@@ -211,6 +211,7 @@ extraViews = {
 'getSlots': (getSlots, None),
 'getImages': (None, ['id', 'imageName', 'imageSize']), 
 'copyImage': (None, None), 
+'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
 'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
 'destroyMany': (destroyMany, None),
 'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
@@ -574,8 +575,11 @@ def main():
 			try:
 				if (type(res) == types.ListType):
 					makeTable(res, keys)
+				elif (type(res) == types.StringType):
+					print res
 				else:
-					pprint(res)
+					makeTable([res], keys)
+					
 			except IOError:
 				pass
 			except Exception, e:

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py Wed Mar  7 22:29:24 2012
@@ -17,13 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import os
 import sys
-import threading
 import signal
 import logging.config
-from getopt import getopt, GetoptError
-from ConfigParser import ConfigParser
 
 from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
 import tashi

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py Wed Mar  7 22:29:24 2012
@@ -15,15 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from datetime import datetime
-from random import randint
-from socket import gethostname
 import logging
 import threading
 import time
 
+from tashi.rpycservices import rpycservices             
 from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
+from tashi import boolean, ConnectionManager, vmStates, version, scrubString
 
 class ClusterManagerService(object):
 	"""RPC service for the ClusterManager"""
@@ -42,6 +40,7 @@ class ClusterManagerService(object):
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
+		self.log.setLevel(logging.ERROR)
 		self.hostLastContactTime = {}
 		#self.hostLastUpdateTime = {}
 		self.instanceLastContactTime = {}
@@ -51,27 +50,96 @@ class ClusterManagerService(object):
 		self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
 		self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
 		self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
-		now = self.__now()
+
+		self.accountingHost = None
+		self.accountingPort = None
+		try:
+			self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
+			self.accountingPort = self.config.getint('ClusterManagerService', 'accountingPort')
+		except:
+			pass
+
+		self.__initAccounting()
+		self.__initCluster()
+
+		threading.Thread(target=self.__monitorCluster).start()
+
+	def __initAccounting(self):
+		self.accountBuffer = []
+		self.accountLines = 0
+		self.accountingClient = None
+		try:
+			if (self.accountingHost is not None) and \
+				    (self.accountingPort is not None):
+				self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+		except:
+			self.log.exception("Could not init accounting")
+
+	def __initCluster(self):
+		# initialize state of VMs if restarting
 		for instance in self.data.getInstances().itervalues():
 			instanceId = instance.id
 			instance = self.data.acquireInstance(instanceId)
 			instance.decayed = False
 
 			if instance.hostId is None:
-				self.stateTransition(instance, None, InstanceState.Pending)
+				self.__stateTransition(instance, None, InstanceState.Pending)
 			else:
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
+
+		# initialize state of hosts if restarting
 		for host in self.data.getHosts().itervalues():
 			hostId = host.id
 			host = self.data.acquireHost(hostId)
 			host.up = False
 			host.decayed = False
 			self.data.releaseHost(host)
-		threading.Thread(target=self.monitorCluster).start()
 
-	def stateTransition(self, instance, old, cur):
+
+
+	def __ACCOUNTFLUSH(self):
+		try:
+			if (self.accountingClient is not None):
+				self.accountingClient.record(self.accountBuffer)
+			self.accountLines = 0
+			self.accountBuffer = []
+		except:
+			self.log.exception("Failed to flush accounting data")
+
+
+	def __ACCOUNT(self, text, instance=None, host=None):
+		now = self.__now()
+		instanceText = None
+		hostText = None
+
+		if instance is not None:
+			try:
+				instanceText = 'Instance(%s)' % (instance)
+			except:
+				self.log.exception("Invalid instance data")
+
+		if host is not None:
+			try:
+				hostText = "Host(%s)" % (host)
+			except:
+				self.log.exception("Invalid host data")
+
+                secondary = ','.join(filter(None, (hostText, instanceText)))
+
+		line = "%s|%s|%s" % (now, text, secondary)
+
+		self.accountBuffer.append(line)
+		self.accountLines += 1
+
+		# XXXstroucki think about autoflush by time
+		if (self.accountLines > 0):
+			self.__ACCOUNTFLUSH()
+
+
+
+	def __stateTransition(self, instance, old, cur):
 		if (old and instance.state != old):
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
 		if (instance.state == cur):
@@ -104,7 +172,7 @@ class ClusterManagerService(object):
 			instance = self.data.acquireInstance(instanceId)
 			if instance.hostId == host.id:
 				instance.decayed = True
-				self.stateTransition(instance, None, InstanceState.Orphaned)
+				self.__stateTransition(instance, None, InstanceState.Orphaned)
 
 			self.data.releaseInstance(instance)
 
@@ -148,10 +216,16 @@ class ClusterManagerService(object):
 		for hostId in self.hostLastContactTime.keys():
 			#self.log.warning("iterate %d" % hostId)
 			host = self.data.acquireHost(hostId)
-			if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+			# XXXstroucki: timing has changed with the message
+			# buffering in the NM, so this wasn't being run any-
+			# more because the time check was passing.
+			# I should think a bit more about this, but
+			# the "if True" is probably appropriate.
+			#if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+			if True:
 				host.decayed = True
 
-				self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+				self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
 				
 				myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
 
@@ -195,14 +269,24 @@ class ClusterManagerService(object):
 		
 		# iterate through all VMs I believe are active
 		for instanceId in self.instanceLastContactTime.keys():
+
+			# XXXstroucki should lock instance here?
 			if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
 				try:
 					instance = self.data.acquireInstance(instanceId)
+					# Don't query non-running VMs. eg. if a VM
+					# is suspended, and has no host, then there's
+					# no one to ask
+					if instance.state != InstanceState.Running and \
+					   instance.state != InstanceState.Activating and \
+					   instance.state != InstanceState.Orphaned:
+						self.data.releaseInstance(instance)
+						continue
 				except:
 					continue
 
 				instance.decayed = True
-				self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+				self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
 				if instance.hostId is None: raise AssertionError
 
 				# XXXstroucki check if host is down?
@@ -225,21 +309,6 @@ class ClusterManagerService(object):
 				self.data.releaseInstance(instance)
 
 
-
-	def monitorCluster(self):
-		while True:
-			sleepFor = min(self.expireHostTime, self.allowDecayed)
-
-			try:
-				self.__checkHosts()
-				self.__checkInstances()
-			except:
-				self.log.exception('monitorCluster iteration failed')
-			#  XXXrgass too chatty.  Remove
-			#self.log.info("Sleeping for %d seconds" % sleepFor)
-			time.sleep(sleepFor)
-
-
 	def normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
@@ -274,12 +343,14 @@ class ClusterManagerService(object):
 		instance = self.normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM REQUEST", instance=instance)
 		return instance
 	
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].shutdownVm(instance.vmId)
@@ -291,14 +362,17 @@ class ClusterManagerService(object):
 	def destroyVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
+			self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
 			self.data.removeInstance(instance)
 		elif (instance.state is InstanceState.Activating):
-			self.stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
+			self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
 			self.data.releaseInstance(instance)
 		else:
 			# XXXstroucki: This is a problem with keeping
 			# clean state.
-			self.stateTransition(instance, None, InstanceState.Destroying)
+			self.__ACCOUNT("CM VM DESTROY", instance=instance)
+			self.__stateTransition(instance, None, InstanceState.Destroying)
 			if instance.hostId is None:
 				self.data.removeInstance(instance)
 			else:
@@ -316,8 +390,9 @@ class ClusterManagerService(object):
 	
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM SUSPEND", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		destination = "suspend/%d_%s" % (instance.id, instance.name)
 		try:
@@ -329,14 +404,16 @@ class ClusterManagerService(object):
 	
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+		self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
 		source = "suspend/%d_%s" % (instance.id, instance.name)
 		instance.hints['__resume_source'] = source
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM RESUME", instance=instance)
 		return instance
 	
 	def migrateVm(self, instanceId, targetHostId):
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM MIGRATE", instance=instance)
 		try:
 			# FIXME: should these be acquire/release host?
 			targetHost = self.data.getHost(targetHostId)
@@ -345,7 +422,7 @@ class ClusterManagerService(object):
 		except:
 			self.data.releaseInstance(instance)
 			raise
-		self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
 		self.data.releaseInstance(instance)
 		try:
 			# Prepare the target
@@ -353,16 +430,16 @@ class ClusterManagerService(object):
 			self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
 			self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
 			cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
-		except Exception, e:
+		except Exception:
 			self.log.exception('prepReceiveVm failed')
 			raise
 		instance = self.data.acquireInstance(instance.id)
-		self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+		self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
 		self.data.releaseInstance(instance)
 		try:
 			# Send the VM
 			self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
-		except Exception, e:
+		except Exception:
 			self.log.exception('migrateVm failed')
 			raise
 		try:
@@ -374,15 +451,16 @@ class ClusterManagerService(object):
 		try:
 			# Notify the target
 			vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
-		except Exception, e:
+		except Exception:
 			self.log.exception('receiveVm failed')
 			raise
 		return
 	
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+		self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM PAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].pauseVm(instance.vmId)
@@ -390,14 +468,15 @@ class ClusterManagerService(object):
 			self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+		self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
 		self.data.releaseInstance(instance)
 		return
 
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+		self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
 		self.data.releaseInstance(instance)
+		self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
 		try:
 			self.proxy[hostname].unpauseVm(instance.vmId)
@@ -405,7 +484,7 @@ class ClusterManagerService(object):
 			self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+		self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
 		self.data.releaseInstance(instance)
 		return
 	
@@ -440,6 +519,7 @@ class ClusterManagerService(object):
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
+		self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
 		try:
 			res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
 		except Exception:
@@ -466,8 +546,7 @@ class ClusterManagerService(object):
 
 		if oldHost.up == False:
 			self.__upHost(oldHost)
-		self.hostLastContactTime[host.id] = time.time()
-		#self.hostLastUpdateTime[host.id] = time.time()
+		self.hostLastContactTime[host.id] = self.__now()
 		oldHost.version = host.version
 		oldHost.memory = host.memory
 		oldHost.cores = host.cores
@@ -479,10 +558,8 @@ class ClusterManagerService(object):
 			oldHost.state = HostState.Normal
 
 		# let the host communicate what it is running
-		# XXXrgass - This is too chatty for the console, I think we should remove this.
-		# XXXstroucki - My install depends on this, but I output to log files. This should be handled by a separate accounting server in future.
+		# and note that the information is not stale
 		for instance in instances:
-			self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
 			self.instanceLastContactTime.setdefault(instance.id, 0)
 
 		self.data.releaseHost(oldHost)
@@ -500,8 +577,9 @@ class ClusterManagerService(object):
 			self.log.exception("Could not acquire instance")
 			raise
 
-		self.instanceLastContactTime[instanceId] = time.time()
+		self.instanceLastContactTime[instanceId] = self.__now()
 		oldInstance.decayed = False
+		self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
 
 		if (instance.state == InstanceState.Exited):
 			# determine why a VM has exited
@@ -509,7 +587,7 @@ class ClusterManagerService(object):
 			if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
 				self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
 			if (oldInstance.state == InstanceState.Suspending):
-				self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
+				self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
 				self.data.releaseInstance(oldInstance)
@@ -552,13 +630,14 @@ class ClusterManagerService(object):
 
 		self.data.releaseHost(dataHost)
 		instance = self.data.acquireInstance(instanceId)
+		self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
 
 		if ('__resume_source' in instance.hints):
-			self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+			self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
 		else:
 			# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
-			#self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
-			self.stateTransition(instance, None, InstanceState.Activating)
+			#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+			self.__stateTransition(instance, None, InstanceState.Activating)
 
 		instance.hostId = host.id
 		self.data.releaseInstance(instance)
@@ -568,14 +647,14 @@ class ClusterManagerService(object):
 				vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
 			else:
 				vmId = self.proxy[host.name].instantiateVm(instance)
-		except Exception, e:
+		except Exception:
 			instance = self.data.acquireInstance(instanceId)
 			if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
 				self.data.removeInstance(instance)
 			else:
 				# XXXstroucki what can we do about pending hosts in the scheduler?
 				# put them at the end of the queue and keep trying?
-				self.stateTransition(instance, None, InstanceState.Held)
+				self.__stateTransition(instance, None, InstanceState.Held)
 				instance.hostId = None
 				self.data.releaseInstance(instance)
 			return "failure"
@@ -594,7 +673,7 @@ class ClusterManagerService(object):
 		else:
 			if ('__resume_source' not in instance.hints):
 				# XXXstroucki should we just wait for NM to update?
-				#self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+				#self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
 				pass
 
 		self.data.releaseInstance(instance)
@@ -606,9 +685,41 @@ class ClusterManagerService(object):
                         self.log.info("Host %s is already registered, it was updated now" % hostname)
                 else:
                         self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST REGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+
                 return hostId
 
         def unregisterHost(self, hostId):
+		try:
+			host = self.data.getHost(hostId)
+			self.__ACCOUNT("CM HOST UNREGISTER", host=host)
+		except:
+			self.log.warning("Failed to lookup host %s" % hostId)
+			return
+
                 self.data.unregisterHost(hostId)
                 self.log.info("Host %s was unregistered" % hostId)
                 return
+
+	# service thread
+	def __monitorCluster(self):
+		while True:
+			sleepFor = min(self.expireHostTime, self.allowDecayed)
+
+			try:
+				self.__checkHosts()
+				self.__checkInstances()
+			except:
+				self.log.exception('monitorCluster iteration failed')
+			#  XXXrgass too chatty.  Remove
+			# XXXstroucki the risk is that a deadlock in obtaining
+			# data could prevent this loop from continuing.
+			#self.log.info("Sleeping for %d seconds" % sleepFor)
+			time.sleep(sleepFor)
+
+

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py Wed Mar  7 22:29:24 2012
@@ -45,6 +45,9 @@ class DataInterface(object):
 	
 	def getHost(self, id):
 		raise NotImplementedError
+
+	def getImages(self):
+		raise NotImplementedError
 	
 	def getInstances(self):
 		raise NotImplementedError

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py Wed Mar  7 22:29:24 2012
@@ -20,7 +20,7 @@ import threading
 import os
 import ConfigParser
 
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
 from tashi.clustermanager.data import DataInterface
 
 class FromConfig(DataInterface):
@@ -109,7 +109,7 @@ class FromConfig(DataInterface):
 	def releaseInstance(self, instance):
 		try:
 			if (instance.id not in self.instances): # MPR: should never be true, but good to check
-				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instanceId)})
+				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
 		finally:
 			self.releaseLock(instance._lock)
 	

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py Wed Mar  7 22:29:24 2012
@@ -17,9 +17,12 @@
 
 import subprocess
 import time
+#XXXstroucki getImages requires os?
+import os
+from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
+from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
 from tashi.rpycservices.rpyctypes import User
 from tashi.clustermanager.data import DataInterface
-from tashi.util import instantiateImplementation
 
 class LdapOverride(DataInterface):
 	def __init__(self, config):
@@ -31,6 +34,7 @@ class LdapOverride(DataInterface):
 		self.nameKey = config.get("LdapOverride", "nameKey")
 		self.idKey = config.get("LdapOverride", "idKey")
 		self.ldapCommand = config.get("LdapOverride", "ldapCommand")
+		self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
 	
 	def registerInstance(self, instance):
 		return self.baseDataObject.registerInstance(instance)
@@ -68,6 +72,17 @@ class LdapOverride(DataInterface):
 	def getNetwork(self, id):
 		return self.baseDataObject.getNetwork(id)
 
+        def getImages(self):
+                count = 0
+                myList = []
+                for i in self.dfs.list("images"):
+                        myFile = self.dfs.getLocalHandle("images/" + i)
+                        if os.path.isfile(myFile):
+                                image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+                                myList.append(image)
+                                count += 1
+                return myList
+
 	def fetchFromLdap(self):
 		now = time.time()
 		if (now - self.lastUserUpdate > self.fetchThreshold):

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py Wed Mar  7 22:29:24 2012
@@ -18,7 +18,7 @@
 import cPickle
 import os
 import threading
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi.clustermanager.data import FromConfig, DataInterface
 
 class Pickled(FromConfig):
@@ -39,22 +39,27 @@ class Pickled(FromConfig):
 	
 	def cleanInstances(self):
 		ci = {}
-		for i in self.instances.itervalues():
+		for ignore, i in self.instances.items():
 			i2 = Instance(d=i.__dict__)
 			ci[i2.id] = i2
 		return ci
 	
 	def cleanHosts(self):
 		ch = {}
-		for h in self.hosts.itervalues():
+		for ignore, h in self.hosts.items():
 			h2 = Host(d=h.__dict__)
 			ch[h2.id] = h2
 		return ch
 	
 	def save(self):
-		file = open(self.file, "w")
+		filename = self.file
+		# XXXstroucki could be better
+		tempfile = "%s.new" % filename
+
+		file = open(tempfile, "w")
 		cPickle.dump((self.cleanHosts(), self.cleanInstances(), self.networks, self.users), file)
 		file.close()
+		os.rename(tempfile, filename)
 
 	def load(self):
 		if (os.access(self.file, os.F_OK)):
@@ -67,11 +72,11 @@ class Pickled(FromConfig):
 		self.instances = instances
 		self.networks = networks
 		self.users = users
-		for i in self.instances.itervalues():
+		for ignore, i in self.instances.items():
 			if (i.id >= self.maxInstanceId):
 				self.maxInstanceId = i.id + 1
 			i._lock = threading.Lock()
 			self.lockNames[i._lock] = "i%d" % (i.id)
-		for h in self.hosts.itervalues():
+		for ignore, h in self.hosts.items():
 			h._lock = threading.Lock()
 			self.lockNames[h._lock] = "h%d" % (h.id)

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py Wed Mar  7 22:29:24 2012
@@ -17,11 +17,9 @@
 
 import logging
 import threading
-import time
-import types
 # XXXstroucki getImages needs os?
 import os
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
 from tashi.clustermanager.data.datainterface import DataInterface
 from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
 
@@ -45,7 +43,7 @@ class SQL(DataInterface):
 			self.password = self.config.get('SQL', 'password')
 			self.conn = MySQLdb.connect(host=host, user=user, passwd=self.password, db=db)
 		else:
-			raise ValueException, 'Unknown SQL database engine by URI: %s' % (self.uri)
+			raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
 
 		self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
 		self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
@@ -319,7 +317,7 @@ class SQL(DataInterface):
 		for r in res:
 			if r[1] == hostname:
 				id = r[0]
-				print "Host %s already registered, update will be done" % id
+				self.log.warning("Host %s already registered, update will be done" % id)
 				s = ""
 				host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
 				l = self.makeHostList(host)

Modified: incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py Wed Mar  7 22:29:24 2012
@@ -15,9 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import rpyc
 from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import *
+#from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
 	def __init__(self, username, password, port, timeout=10000.0):

Modified: incubator/tashi/branches/oldstable/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/dfs/vfs.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/dfs/vfs.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/dfs/vfs.py Wed Mar  7 22:29:24 2012
@@ -27,52 +27,67 @@ class Vfs(DfsInterface):
 		DfsInterface.__init__(self, config)
 		self.prefix = self.config.get("Vfs", "prefix")
 
-# why do these three need to be separate?	
+	def __dfsToReal(self, dfspath):
+		realpath = os.path.join(self.prefix, dfspath)
+		return realpath
+
 	def copyTo(self, localSrc, dst):
-		shutil.copy(localSrc, os.path.join(self.prefix, dst))
-# just assuming this works
+		realdest = self.__dfsToReal(dst)
+		shutil.copy(localSrc, realdest)
+		# just assuming this works
 		return None
 	
 	def copyFrom(self, src, localDst):
-		shutil.copy(os.path.join(self.prefix, src), localDst)
-# just assuming this works
+		realsrc = self.__dfsToReal(src)
+		shutil.copy(realsrc, localDst)
+		# just assuming this works
 		return None
 
 	def copy(self, src, dst):
-		shutil.copy(os.path.join(self.prefix, src),
-			    os.path.join(self.prefix, dst))
-# just assuming this works
+		realsrc = self.__dfsToReal(src)
+		realdst = self.__dfsToReal(dst)
+		shutil.copy(realsrc, realdst)
+		# just assuming this works
 		return None
 	
 	def list(self, path):
 		try:
-			return os.listdir(os.path.join(self.prefix, path))
+			realpath = self.__dfsToReal(path)
+			return os.listdir(realpath)
 		except OSError, e:
+			# XXXstroucki error 20 = ENOTDIR
 			if (e.errno == 20):
 				return [path.split('/')[-1]]
 			else:
 				raise
 	
 	def stat(self, path):
-		return os.stat(os.path.join(self.prefix, path))
+		realpath = self.__dfsToReal(path)
+		return os.stat(realpath)
 	
 	def move(self, src, dst):
-		shutil.move(os.path.join(self.prefix, src), 
-			    os.path.join(self.prefix, dst))
-# just assuming this works
+		realsrc = self.__dfsToReal(src)
+		realdst = self.__dfsToReal(dst)
+		shutil.move(realsrc, realdst)
+		# just assuming this works
 		return None
 	
 	def mkdir(self, path):
-		return os.mkdir(os.path.join(self.prefix, path))
+		realpath = self.__dfsToReal(path)
+		return os.mkdir(realpath)
 	
 	def unlink(self, path):
-		return os.unlink(os.path.join(self.prefix, path))
+		realpath = self.__dfsToReal(path)
+		return os.unlink(realpath)
 	
 	def rmdir(self, path):
-		return os.rmdir(os.path.join(self.prefix, path))
+		realpath = self.__dfsToReal(path)
+		return os.rmdir(realpath)
 	
 	def open(self, path, perm):
-		return open(os.path.join(self.prefix, path), perm)
+		realpath = self.__dfsToReal(path)
+		return open(realpath, perm)
 	
 	def getLocalHandle(self, path):
-		return os.path.join(self.prefix, path)
+		realpath = self.__dfsToReal(path)
+		return realpath

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py Wed Mar  7 22:29:24 2012
@@ -22,7 +22,6 @@ import signal
 import sys
 
 from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi import ConnectionManager
 import tashi
 from tashi import boolean
 

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py?rev=1298170&r1=1298169&r2=1298170&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py Wed Mar  7 22:29:24 2012
@@ -15,17 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-import cPickle
 import logging
-import os
 import socket
-import sys
 import threading
 import time
 
-from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.nodemanager import RPC
-from tashi import boolean, vmStates, logged, ConnectionManager, timed
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import InstanceState, TashiException, Errors, Instance
+from tashi import boolean, vmStates, ConnectionManager
 import tashi
 
 
@@ -50,201 +47,316 @@ class NodeManagerService(object):
 		self.log = logging.getLogger(__file__)
 		self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
 		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
-		self.infoFile = self.config.get('NodeManagerService', 'infoFile')
 		self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
-		self.id = None
+		self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+		try:
+			self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+		except:
+			self.log.exception("Could not connect to CM")
+			return
+
+		self.accountingHost = None
+		self.accountingPort = None
+		try:
+			self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+			self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
+		except:
+			pass
+
 		self.notifyCM = []
-		self.loadVmInfo()
-		vmList = self.vmm.listVms()
-		for vmId in vmList:
-			if (vmId not in self.instances):
-				self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
-				self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
-		for vmId in self.instances.keys():
-			if (vmId not in vmList):
-				self.log.warning('vmcontrol backend does not report %d' % (vmId))
-				self.vmStateChange(vmId, None, InstanceState.Exited)
-		self.registerHost()
-		threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
-		threading.Thread(target=self.registerWithClusterManager).start()
-		threading.Thread(target=self.statsThread).start()
-	
-	def loadVmInfo(self):
+
+		self.__initAccounting()
+
+		self.id = None
+		# XXXstroucki this fn could be in this level maybe?
+		self.host = self.vmm.getHostInfo(self)
+
+		# populate self.instances
+		self.__loadVmInfo()
+
+		self.__registerHost()
+
+		self.id = self.cm.registerNodeManager(self.host, self.instances.values())
+
+		# XXXstroucki cut cross check for NM/VMM state
+
+		# start service threads
+		threading.Thread(target=self.__registerWithClusterManager).start()
+		threading.Thread(target=self.__statsThread).start()
+	
+	def __initAccounting(self):
+                self.accountBuffer = []
+                self.accountLines = 0
+                self.accountingClient = None
+                try:
+                        if (self.accountingHost is not None) and \
+                                    (self.accountingPort is not None):
+                                self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+                except:
+                        self.log.exception("Could not init accounting")
+			
+	def __loadVmInfo(self):
 		try:
 			self.instances = self.vmm.getInstances()
-		except Exception, e:
+		except Exception:
 			self.log.exception('Failed to obtain VM info')
 			self.instances = {}
-	
-	def saveVmInfo(self):
+
+	# send data to CM
+	# XXXstroucki adapt this for accounting?
+	def __flushNotifyCM(self):
+		start = time.time()
+		# send data to CM, adding message to buffer if
+		# it fails
 		try:
-			data = cPickle.dumps(self.instances)
-			f = open(self.infoFile, "w")
-			f.write(data)
-			f.close()
+			notifyCM = []
+			try:
+				while (len(self.notifyCM) > 0):
+					value = self.notifyCM.pop(0)
+					(instanceId, newInst, old, success) = value
+					try:
+						self.cm.vmUpdate(instanceId, newInst, old)
+					except TashiException, e:
+						notifyCM.append((instanceId, newInst, old, success))
+						if (e.errno != Errors.IncorrectVmState):
+							raise
+					except:
+						notifyCM.append((instanceId, newInst, old, success))
+						raise
+					else:
+						success()
+			finally:
+				if len(notifyCM) > 0:
+					self.notifyCM.append(notifyCM)
 		except Exception, e:
-			self.log.exception('Failed to save VM info to %s' % (self.infoFile))
-	
-	def vmStateChange(self, vmId, old, cur):
-		instance = self.getInstance(vmId)
-		if (old and instance.state != old):
-			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-		if (cur == InstanceState.Exited):
-			del self.instances[vmId]
-			return True
+			self.log.exception('Failed to send data to the CM')
 
-		if (instance.state == cur):
-			# Don't do anything if state is what it should be
-			return True
+		#toSleep = start - time.time() + self.registerFrequency
+		#if (toSleep > 0):
+			#time.sleep(toSleep)
 
-		instance.state = cur
-		newInst = Instance(d={'state':cur})
-		success = lambda: None
+        def __ACCOUNTFLUSH(self):
 		try:
-			cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-			cm.vmUpdate(instance.id, newInst, old)
-		except Exception, e:
-			self.log.exception('RPC failed for vmUpdate on CM')
-			self.notifyCM.append((instance.id, newInst, old, success))
-		else:
-			success()
-		return True
-	
-	def backupVmInfoAndFlushNotifyCM(self):
-		cm = None
-		cmUseCount = 0
-		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					# XXXstroucki hope rpyc handles destruction
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
+			if (self.accountingClient is not None):
+				self.accountingClient.record(self.accountBuffer)
+			self.accountLines = 0
+			self.accountBuffer = []
+		except:
+			self.log.exception("Failed to flush accounting data")
 
-			cmUseCount = cmUseCount + 1
-			start = time.time()
+
+        def __ACCOUNT(self, text, instance=None, host=None):
+                now = time.time()
+                instanceText = None
+                hostText = None
+
+                if instance is not None:
 			try:
-				self.saveVmInfo()
-			except Exception, e:
-				self.log.exception('Failed to save VM info')
+                        	instanceText = 'Instance(%s)' % (instance)
+			except:
+				self.log.exception("Invalid instance data")
+
+                if host is not None:
 			try:
-				notifyCM = []
-				try:
-					while (len(self.notifyCM) > 0):
-						(instanceId, newInst, old, success) = self.notifyCM.pop(0)
-						try:
-							cm.vmUpdate(instanceId, newInst, old)
-						except TashiException, e:
-							notifyCM.append((instanceId, newInst, old, success))
-							if (e.errno != Errors.IncorrectVmState):
-								raise
-						except:
-							notifyCM.append((instanceId, newInst, old, success))
-							raise
-						else:
-							success()
-				finally:
-					self.notifyCM = self.notifyCM + notifyCM
-			except Exception, e:
-				self.log.exception('Failed to register with the CM')
-			toSleep = start - time.time() + self.registerFrequency
-			if (toSleep > 0):
-				time.sleep(toSleep)
-	
-	def registerWithClusterManager(self):
-		cm = None
-		cmUseCount = 0
+                        	hostText = "Host(%s)" % (host)
+			except:
+				self.log.exception("Invalid host data")
+
+                secondary = ','.join(filter(None, (hostText, instanceText)))
+
+                line = "%s|%s|%s" % (now, text, secondary)
+
+                self.accountBuffer.append(line)
+                self.accountLines += 1
+
+		# XXXstroucki think about force flush every so often
+                if (self.accountLines > 0):
+                        self.__ACCOUNTFLUSH()
+
+
+	# service thread function
+	def __registerWithClusterManager(self):
 		while True:
-			if cmUseCount > 10 or cm is None:
-				try:
-					cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-					cmUseCount = 0
-				except Exception, e:
-					self.log.warning("Could not get a handle to the clustermanager")
-					time.sleep(60)
-					continue
-			cmUseCount = cmUseCount + 1
+			#self.__ACCOUNT("TESTING")
 			start = time.time()
 			try:
-				host = self.vmm.getHostInfo(self)
 				instances = self.instances.values()
-				#import pprint
-				#self.log.warning("Instances: " + pprint.saferepr(instances))
-				self.id = cm.registerNodeManager(host, instances)
-			except Exception, e:
+				self.id = self.cm.registerNodeManager(self.host, instances)
+			except Exception:
 				self.log.exception('Failed to register with the CM')
+
 			toSleep = start - time.time() + self.registerFrequency
 			if (toSleep > 0):
 				time.sleep(toSleep)
-	
-	def getInstance(self, vmId):
+
+	# service thread function
+	def __statsThread(self):
+		if (self.statsInterval == 0):
+			return
+		while True:
+			try:
+				publishList = []
+				for vmId in self.instances.keys():
+					try:
+						instance = self.instances.get(vmId, None)
+						if (not instance):
+							continue
+						id = instance.id
+						stats = self.vmm.getStats(vmId)
+						for stat in stats:
+							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
+					except:
+						self.log.exception('statsThread threw an exception')
+				if (len(publishList) > 0):
+					tashi.publisher.publishList(publishList)
+			except:
+				self.log.exception('statsThread threw an exception')
+			time.sleep(self.statsInterval)
+
+        def __registerHost(self):
+                hostname = socket.gethostname()
+		# populate some defaults
+		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+		memory = 0
+		cores = 0
+		version = "empty"
+                #self.cm.registerHost(hostname, memory, cores, version)
+
+	def __getInstance(self, vmId):
 		instance = self.instances.get(vmId, None)
-		if (instance is None):
-			raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
-		return instance
+		if instance is not None:
+			return instance
+
+		# refresh self.instances if not found
+		self.__loadVmInfo()
+		instance = self.instances.get(vmId, None)
+		if instance is not None:
+			return instance
+
+
+		raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
 	
-	def instantiateVm(self, instance):
-		vmId = self.vmm.instantiateVm(instance)
-		instance.vmId = vmId
-		instance.state = InstanceState.Running
+	# remote
+	# Called from VMM to update self.instances
+	# but only changes are Exited, MigrateTrans and Running
+	# qemu.py calls this in the matchSystemPids thread
+	# xenpv.py: i have no real idea why it is called there
+	def vmStateChange(self, vmId, old, cur):
+		instance = self.__getInstance(vmId)
+
+		if (instance.state == cur):
+			# Don't do anything if state is what it should be
+			return True
+
+		if (old and instance.state != old):
+			# make a note of mismatch, but go on.
+			# the VMM should know best
+			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+                        
+		instance.state = cur
+
+		self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
+			      
+		newInst = Instance(d={'state':cur})
+		success = lambda: None
+		# send the state change up to the CM
+		self.notifyCM.append((instance.id, newInst, old, success))
+		self.__flushNotifyCM()
+
+		# cache change locally
+		self.instances[vmId] = instance
+
+		if (cur == InstanceState.Exited):
+			# At this point, the VMM will clean up,
+			# so forget about this instance
+			del self.instances[vmId]
+			return True
+
+		return True
+
+	# remote
+	def createInstance(self, instance):
+		vmId = instance.vmId
 		self.instances[vmId] = instance
-		return vmId
+		
+	
+	# remote
+	def instantiateVm(self, instance):
+		self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
+		try:
+			vmId = self.vmm.instantiateVm(instance)
+			#instance.vmId = vmId
+			#instance.state = InstanceState.Running
+			#self.instances[vmId] = instance
+			return vmId
+		except:
+			self.log.exception("Failed to start instance")
 	
+	# remote
 	def suspendVm(self, vmId, destination):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SUSPEND", instance=instance)
+
 		instance.state = InstanceState.Suspending
+		self.instances[vmId] = instance
 		threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
 	
-	def resumeVmHelper(self, instance, name):
+	# called by resumeVm as thread
+	def __resumeVmHelper(self, instance, name):
 		self.vmm.resumeVmHelper(instance, name)
 		instance.state = InstanceState.Running
 		newInstance = Instance(d={'id':instance.id,'state':instance.state})
 		success = lambda: None
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in resumeVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
-		else:
-			success()
-	
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
+		self.__flushNotifyCM()
+
+	# remote
 	def resumeVm(self, instance, name):
+		self.__ACCOUNT("NM VM RESUME", instance=instance)
 		instance.state = InstanceState.Resuming
 		instance.hostId = self.id
 		try:
 			instance.vmId = self.vmm.resumeVm(instance, name)
 			self.instances[instance.vmId] = instance
-			threading.Thread(target=self.resumeVmHelper, args=(instance, name)).start()
+			threading.Thread(target=self.__resumeVmHelper, args=(instance, name)).start()
 		except:
 			self.log.exception('resumeVm failed')
 			raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
 		return instance.vmId
 	
+	# remote
 	def prepReceiveVm(self, instance, source):
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
 		instance.vmId = -1
 		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
 		return transportCookie
 
+	# remote
 	def prepSourceVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE SOURCE PREP", instance=instance)
 		instance.state = InstanceState.MigratePrep
-	
-	def migrateVmHelper(self, instance, target, transportCookie):
+		self.instances[vmId] = instance
+
+	# called by migrateVm as thread
+	# XXXstroucki migrate out?
+	def __migrateVmHelper(self, instance, target, transportCookie):
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
 		del self.instances[instance.vmId]
-		
+
+	# remote
+	# XXXstroucki migrate out?
 	def migrateVm(self, vmId, target, transportCookie):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM MIGRATE", instance=instance)
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
+		self.instances[vmId] = instance
+		threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
 		return
 	
-	def receiveVmHelper(self, instance, transportCookie):
-		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+	# called by receiveVm as thread
+	# XXXstroucki migrate in?
+	def __receiveVmHelper(self, instance, transportCookie):
 		vmId = self.vmm.receiveVm(transportCookie)
 		instance.state = InstanceState.Running
 		instance.hostId = self.id
@@ -252,83 +364,69 @@ class NodeManagerService(object):
 		self.instances[vmId] = instance
 		newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
 		success = lambda: None
-		try:
-			cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
-		except Exception, e:
-			self.log.exception('vmUpdate failed in receiveVmHelper')
-			self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
-		else:
-			success()
-	
+		self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+		self.__flushNotifyCM()
+
+	# remote
+	# XXXstroucki migrate in?
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
-		threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
+		vmId = instance.vmId
+		self.instances[vmId] = instance
+		self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
+		threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
 		return
-	
+
+	# remote
 	def pauseVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM PAUSE", instance=instance)
 		instance.state = InstanceState.Pausing
+		self.instances[vmId] = instance
 		self.vmm.pauseVm(vmId)
 		instance.state = InstanceState.Paused
-	
+		self.instances[vmId] = instance
+
+	# remote
 	def unpauseVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM UNPAUSE", instance=instance)
 		instance.state = InstanceState.Unpausing
+		self.instances[vmId] = instance
 		self.vmm.unpauseVm(vmId)
 		instance.state = InstanceState.Running
-	
+		self.instances[vmId] = instance
+
+	# remote
 	def shutdownVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM SHUTDOWN", instance=instance)
 		instance.state = InstanceState.ShuttingDown
+		self.instances[vmId] = instance
 		self.vmm.shutdownVm(vmId)
-	
+
+	# remote
 	def destroyVm(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
+		self.__ACCOUNT("NM VM DESTROY", instance=instance)
 		instance.state = InstanceState.Destroying
+		self.instances[vmId] = instance
 		self.vmm.destroyVm(vmId)
-	
+
+	# remote
 	def getVmInfo(self, vmId):
-		instance = self.getInstance(vmId)
+		instance = self.__getInstance(vmId)
 		return instance
-	
+
+	# remote
 	def vmmSpecificCall(self, vmId, arg):
 		return self.vmm.vmmSpecificCall(vmId, arg)
-	
+
+	# remote
 	def listVms(self):
 		return self.instances.keys()
 
+	# remote
 	def liveCheck(self):
 		return "alive"
 	
-	def statsThread(self):
-		if (self.statsInterval == 0):
-			return
-		while True:
-			try:
-				publishList = []
-				for vmId in self.instances.keys():
-					try:
-						instance = self.instances.get(vmId, None)
-						if (not instance):
-							continue
-						id = instance.id
-						stats = self.vmm.getStats(vmId)
-						for stat in stats:
-							publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
-					except:
-						self.log.exception('statsThread threw an exception')
-				if (len(publishList) > 0):
-					tashi.publisher.publishList(publishList)
-			except:
-				self.log.exception('statsThread threw an exception')
-			time.sleep(self.statsInterval)
-
-        def registerHost(self):
-                cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-                hostname = socket.gethostname()
-		# populate some defaults
-		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
-		memory = 0
-		cores = 0
-		version = "empty"
-                #cm.registerHost(hostname, memory, cores, version)



Mime
View raw message