incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1043823 - in /incubator/tashi/trunk/src/tashi: client/ clustermanager/ clustermanager/data/ nodemanager/
Date Thu, 09 Dec 2010 02:41:36 GMT
Author: stroucki
Date: Thu Dec  9 02:41:36 2010
New Revision: 1043823

URL: http://svn.apache.org/viewvc?rev=1043823&view=rev
Log:
=== Changes from Michael Stroucken
- nodemanager.py
	- get fresh instance data (per Miha Stopar)

=== Changes from Andrew Edmonds and Miha Stopar
- sql.py:
	- self.idLock = threading.Lock() added into __init__
	- registerHost, unregisterHost, getNewId functions added

- datainterface.py:
	- registerHost, unregisterHost added

- fromconfig.py:
	- import os, import ConfigParser
	- the following section added into __init__:
		self.hostLocks = {}
		self.hostLock = threading.Lock()
		self.idLock = threading.Lock()
		if not self.config.has_section("FromConfig"):
			return
	- the following line fixed - instance.id instead of instanceId:
		raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d"
% (instance.id)})
	- acquireHost was changed:
		at the beginning the following line was added:
			self.hostLock.acquire()
		before acquireLock call the following section was added:
			# hostLocks dict added when registerHost was implemented, otherwise newly added hosts don't
have _lock
			self.hostLocks[hostId] = self.hostLocks.get(hostId, threading.Lock())
			host._lock = self.hostLocks[host.id]
	- releaseHost was changed:
		- hostId into host.id (see raise TashiException)
		- self.save() and self.hostLock.release() calls added

	- getHosts was changed:
		- now return self.hosts() instead of return self.cleanHosts() (FromConfig does not have
cleanHosts method, it is available only when Pickled overrides it)
	- registerHost, unregisterHost, getNewId, save functions added

- pickled.py:
	- the following section added to the __init__:
		self.hostLock = threading.Lock()
		self.hostLocks = {}
		self.idLock = threading.Lock()

- getentoverride.py: registerHost, unregisterHost added
- ldapoverride.py: registerHost, unregisterHost added

- clustermanagerservice.py: registerHost, unregisterHost added

- nodemanagerservice.py:
 	- self.registerHost() call added into __init__ (before threads)
	- registerHost added

- tashi-client.py: unregisterHost added to the argLists dict, convertArgs dict, description
dict and to the examples dict

- rpycservices.py: added registerHost and unregisterHost to the clusterManagerRPCs list

Modified:
    incubator/tashi/trunk/src/tashi/client/tashi-client.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py

Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Thu Dec  9 02:41:36 2010
@@ -214,6 +214,7 @@ argLists = {
 'getMyInstances': [],
 'getVmLayout': [],
 'vmmSpecificCall': [('instance', checkIid, lambda: requiredArg('instance'), True), ('arg',
str, lambda: requiredArg('arg'), True)],
+'unregisterHost': [('hostId', int, lambda: requiredArg('hostId'), True)],
 }
 
 # Used to convert the dictionary built from the arguments into an object that can be used
by thrift
@@ -229,6 +230,7 @@ convertArgs = {
 'pauseVm': '[instance]',
 'unpauseVm': '[instance]',
 'vmmSpecificCall': '[instance, arg]',
+'unregisterHost' : '[hostId]',
 }
 
 # Descriptions
@@ -249,7 +251,8 @@ description = {
 'getInstances': 'Gets a list of all VMs in Tashi',
 'getMyInstances': 'Utility function that only lists VMs owned by the current user',
 'getVmLayout': 'Utility function that displays what VMs are placed on what hosts',
-'vmmSpecificCall': 'Direct access to VMM-specific functionality'
+'vmmSpecificCall': 'Direct access to VMM-specific functionality',
+'unregisterHost' : 'Unregisters host. Registration happens when starting node manager',
 }
 
 # Example use strings
@@ -270,7 +273,8 @@ examples = {
 'getInstances': [''],
 'getMyInstances': [''],
 'getVmLayout': [''],
-'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc']
+'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc'],
+'unregisterHost' : ['--hostId 2'],
 }
 
 show_hide = []

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu Dec  9 02:41:36
2010
@@ -254,7 +254,7 @@ class ClusterManagerService(object):
 		try:
 			# Prepare the target
 			self.log.info("migrateVm: Calling prepSourceVm on source host %s" % sourceHost.name)
-			self.proxy[sourceHost.name].prepSourceVm(instance)
+			self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
 			self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
 			cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
 		except Exception, e:
@@ -488,3 +488,16 @@ class ClusterManagerService(object):
 				self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
 			self.data.releaseInstance(instance)
 		return
+
+        def registerHost(self, hostname, memory, cores, version):
+                hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores,
version)
+                if alreadyRegistered:
+                        self.log.info("Host %s is already registered, it was updated now"
% hostname)
+                else:
+                        self.log.info("A host was registered - hostname: %s, version: %s,
memory: %s, cores: %s" % (hostname, version, memory, cores))
+                return hostId
+
+        def unregisterHost(self, hostId):
+                self.data.unregisterHost(hostId)
+                self.log.info("Host %s was unregistered" % hostId)
+                return

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py Thu Dec  9 02:41:36
2010
@@ -63,3 +63,9 @@ class DataInterface(object):
 	
 	def getUser(self, id):
 		raise NotImplementedError
+		
+	def registerHost(self, hostname, memory, cores, version):
+		raise NotImplementedError
+	
+	def unregisterHost(self, hostId):
+		raise NotImplementedError

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu Dec  9 02:41:36
2010
@@ -16,6 +16,8 @@
 # under the License.    
 
 import threading
+import os
+import ConfigParser
 
 from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import DataInterface
@@ -34,6 +36,11 @@ class FromConfig(DataInterface):
 		self.instanceIdLock = threading.Lock()
 		self.lockNames[self.instanceIdLock] = "instanceIdLock"
 		self.maxInstanceId = 1
+		self.hostLocks = {}
+		self.hostLock = threading.Lock()
+		self.idLock = threading.Lock()
+		if not self.config.has_section("FromConfig"):
+			return
 		for (name, value) in self.config.items("FromConfig"):
 			name = name.lower()
 			if (name.startswith("host")):
@@ -114,21 +121,28 @@ class FromConfig(DataInterface):
 			self.releaseLock(self.instanceLock)
 	
 	def acquireHost(self, hostId):
+		self.hostLock.acquire()
 		host = self.hosts.get(hostId, None)
 		if (host is None):
 			raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (hostId)})
+		# hostLocks dict added when registerHost was implemented, otherwise newly added hosts don't
have _lock 
+		self.hostLocks[hostId] = self.hostLocks.get(hostId, threading.Lock())
+		host._lock = self.hostLocks[host.id]
 		self.acquireLock(host._lock)
 		return host
+
 	
 	def releaseHost(self, host):
 		try:
 			if (host.id not in self.hosts): # MPR: should never be true, but good to check
-				raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (hostId)})
+				raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})
 		finally:
+			self.save()
 			self.releaseLock(host._lock)
+			self.hostLock.release()
 	
 	def getHosts(self):
-		return self.cleanHosts()
+		return self.hosts
 	
 	def getHost(self, id):
 		host = self.hosts.get(id, None)
@@ -156,3 +170,81 @@ class FromConfig(DataInterface):
 	
 	def getUser(self, id):
 		return self.users[id]
+		
+	def registerHost(self, hostname, memory, cores, version):
+		self.hostLock.acquire()
+		for id in self.hosts.keys():
+			if self.hosts[id].name == hostname:
+				host = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+				self.hosts[id] = host
+				self.save()
+				self.hostLock.release()
+				return id, True
+		id = self.getNewId("hosts")
+		self.hosts[id] = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+		self.save()
+		self.hostLock.release()
+		return id, False
+		
+	def unregisterHost(self, hostId):
+		self.hostLock.acquire()
+		del self.hosts[hostId]
+		self.save()
+		self.hostLock.release()
+
+	def getNewId(self, table):
+		""" Generates id for a new object. For example for hosts and users.  
+		"""
+		self.idLock.acquire()
+		maxId = 0
+		l = []
+		if(table == "hosts"):
+			for id in self.hosts.keys():
+				l.append(id)
+				if id >= maxId:
+					maxId = id
+		l.sort() # sort to enable comparing with range output
+		# check if some id is released:
+		t = range(maxId + 1)
+		t.remove(0)
+		if l != t and l != []:
+			releasedIds = filter(lambda x : x not in l, t)
+			self.idLock.release()
+			return releasedIds[0]
+		else:
+			self.idLock.release()
+			return maxId + 1
+		
+	def save(self):
+		# XXXstroucki: a relative path? Where does it go
+		# and in what order does it get loaded
+		fileName = "./etc/Tashi.cfg"
+		if not os.path.exists(fileName):
+			file = open(fileName, "w")
+			file.write("[FromConfig]")
+			file.close()	
+		parser = ConfigParser.ConfigParser()
+		parser.read(fileName)
+		
+		if not parser.has_section("FromConfig"):
+			parser.add_section("FromConfig")
+		
+		hostsInFile = []
+		for (name, value) in parser.items("FromConfig"):
+			name = name.lower()
+			if (name.startswith("host")):
+				hostsInFile.append(name)
+				
+		for h in hostsInFile:
+			parser.remove_option("FromConfig", h)
+			
+		for hId in self.hosts.keys():
+			host = self.hosts[hId]
+			hostPresentation = "Host(d={'id':%s,'name':'%s','state':HostState.Normal,'memory':%s,'cores':%s,'version':'%s'})"
% (hId, host.name, host.memory, host.cores, host.version)
+			parser.set("FromConfig", "host%s" % hId, hostPresentation)
+		
+		with open(fileName, 'wb') as configfile:
+			parser.write(configfile)
+		
+		
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Thu Dec  9 02:41:36
2010
@@ -91,3 +91,10 @@ class GetentOverride(DataInterface):
 	def getUser(self, id):
 		self.fetchFromGetent()
 		return self.users[id]
+		
+	def registerHost(self, hostname, memory, cores, version):
+		return self.baseDataObject.registerHost(hostname, memory, cores, version)
+	
+	def unregisterHost(self, hostId):
+		return self.baseDataObject.unregisterHost(hostId)
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py Thu Dec  9 02:41:36
2010
@@ -104,3 +104,10 @@ class LdapOverride(DataInterface):
 	def getUser(self, id):
 		self.fetchFromLdap()
 		return self.users[id]
+		
+	def registerHost(self, hostname, memory, cores, version):
+		return self.baseDataObject.registerHost(hostname, memory, cores, version)
+	
+	def unregisterHost(self, hostId):
+		return self.baseDataObject.unregisterHost(hostId)
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Thu Dec  9 02:41:36 2010
@@ -32,6 +32,9 @@ class Pickled(FromConfig):
 		self.instanceIdLock = threading.Lock()
 		self.lockNames[self.instanceIdLock] = "instanceIdLock"
 		self.maxInstanceId = 1
+		self.hostLock = threading.Lock()
+		self.hostLocks = {}
+		self.idLock = threading.Lock()
 		self.load()
 	
 	def cleanInstances(self):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Thu Dec  9 02:41:36 2010
@@ -50,6 +50,7 @@ class SQL(DataInterface):
 		self.hostLock = threading.Lock()
 		self.hostLocks = {}
 		self.maxInstanceId = 1
+		self.idLock = threading.Lock()
 		self.sqlLock = threading.Lock()
 		self.verifyStructure()
 
@@ -258,3 +259,62 @@ class SQL(DataInterface):
 		r = cur.fetchone()
 		user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
 		return user
+		
+	def registerHost(self, hostname, memory, cores, version):
+		self.hostLock.acquire()
+		cur = self.executeStatement("SELECT * from hosts")
+		res = cur.fetchall()
+		for r in res:
+			if r[1] == hostname:
+				id = r[0]
+				print "Host %s already registered, update will be done" % id
+				s = ""
+				host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory,
'cores': cores, 'version':version})
+				l = self.makeHostList(host)
+				for e in range(0, len(self.hostOrder)):
+					s = s + self.hostOrder[e] + "=" + l[e]
+					if (e < len(self.hostOrder)-1):
+						s = s + ", "
+				self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
+				self.hostLock.release()
+				return r[0], True
+		id = self.getNewId("hosts")
+		host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory,
'cores': cores, 'version':version})
+		l = self.makeHostList(host)
+		self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
+		self.hostLock.release()
+		return id, False
+	
+	def unregisterHost(self, hostId):
+		self.hostLock.acquire()
+		cur = self.executeStatement("SELECT * from hosts")
+		res = cur.fetchall()
+		for r in res:
+			if r[0] == hostId:
+				self.executeStatement("DELETE FROM hosts WHERE id = %d" % hostId)
+		self.hostLock.release()
+
+	def getNewId(self, table):
+		""" Generates id for a new object. For example for hosts and users.  
+		"""
+		self.idLock.acquire()
+		cur = self.executeStatement("SELECT * from %s" % table)
+		res = cur.fetchall()
+		maxId = 0 # the first id would be 1
+		l = []
+		for r in res:
+			id = r[0]
+			l.append(id)
+			if id >= maxId:
+				maxId = id
+		l.sort() # sort to enable comparing with range output
+		# check if some id is released:
+		t = range(maxId + 1)
+		t.remove(0)
+		if l != t and l != []:
+			releasedIds = filter(lambda x : x not in l, t)
+			self.idLock.release()
+			return releasedIds[0]
+		else:
+			self.idLock.release()
+			return maxId + 1

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu Dec  9 02:41:36
2010
@@ -64,6 +64,7 @@ class NodeManagerService(object):
 			if (vmId not in vmList):
 				self.log.warning('vmcontrol backend does not report %d' % (vmId))
 				self.vmStateChange(vmId, None, InstanceState.Exited)
+		self.registerHost()
 		threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
 		threading.Thread(target=self.registerWithClusterManager).start()
 		threading.Thread(target=self.statsThread).start()
@@ -202,7 +203,8 @@ class NodeManagerService(object):
 		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
 		return transportCookie
 
-	def prepSourceVm(self, instance):
+	def prepSourceVm(self, vmId):
+		instance = self.getInstance(vmId)
 		instance.state = InstanceState.MigratePrep
 	
 	def migrateVmHelper(self, instance, target, transportCookie):
@@ -291,3 +293,13 @@ class NodeManagerService(object):
 			except:
 				self.log.exception('statsThread threw an exception')
 			time.sleep(self.statsInterval)
+
+        def registerHost(self):
+                cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+                hostname = socket.gethostname()
+		# populate some defaults
+		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering
with the clustermanager
+		memory = 0
+		cores = 0
+		version = "empty"
+                cm.registerHost(hostname, memory, cores, version)



Mime
View raw message