incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1295368 - in /incubator/tashi/trunk/src/tashi: ./ clustermanager/ clustermanager/data/ nodemanager/ nodemanager/vmcontrol/
Date Thu, 01 Mar 2012 01:11:41 GMT
Author: stroucki
Date: Thu Mar  1 01:11:38 2012
New Revision: 1295368

URL: http://svn.apache.org/viewvc?rev=1295368&view=rev
Log:
bring in resilient RPC client object improvement
bring in data type guards form CM database debugging

Modified:
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/connectionmanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/trunk/src/tashi/util.py

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu Mar  1 01:11:38
2012
@@ -36,7 +36,7 @@ class ClusterManagerService(object):
 		else:
 			self.username = None
 			self.password = None
-		self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager',
'nodeManagerPort')))
+		self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager',
'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
@@ -71,7 +71,7 @@ class ClusterManagerService(object):
 		try:
 			if (self.accountingHost is not None) and \
 				    (self.accountingPort is not None):
-				self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+				self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
 		except:
 			self.log.exception("Could not init accounting")
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu Mar  1 01:11:38
2012
@@ -78,6 +78,10 @@ class FromConfig(DataInterface):
 		return instanceId
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		self.acquireLock(self.instanceLock)
 		try:
 			if (instance.id is not None and instance.id not in self.instances):
@@ -107,6 +111,10 @@ class FromConfig(DataInterface):
 		return instance
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		try:
 			if (instance.id not in self.instances): # MPR: should never be true, but good to check
 				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d"
% (instance.id)})
@@ -114,6 +122,10 @@ class FromConfig(DataInterface):
 			self.releaseLock(instance._lock)
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		self.acquireLock(self.instanceLock)
 		try:
 			del self.instances[instance.id]
@@ -122,6 +134,10 @@ class FromConfig(DataInterface):
 			self.releaseLock(self.instanceLock)
 	
 	def acquireHost(self, hostId):
+		if type(hostId) is not int:
+                        self.log.exception("Argument is not of type int, but of type %s"
% (type(hostId)))
+                        raise TypeError
+
 		self.hostLock.acquire()
 		host = self.hosts.get(hostId, None)
 		if (host is None):
@@ -134,6 +150,10 @@ class FromConfig(DataInterface):
 
 	
 	def releaseHost(self, host):
+		if type(host) is not Instance:
+                        self.log.exception("Argument is not of type Host, but of type %s"
% (type(host)))
+                        raise TypeError
+
 		try:
 			if (host.id not in self.hosts): # MPR: should never be true, but good to check
 				raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Thu Mar  1 01:11:38
2012
@@ -33,21 +33,41 @@ class GetentOverride(DataInterface):
 		self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		return self.baseDataObject.registerInstance(instance)
 	
 	def acquireInstance(self, instanceId):
 		return self.baseDataObject.acquireInstance(instanceId)
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		return self.baseDataObject.releaseInstance(instance)
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+                        self.log.exception("Argument is not of type Instance, but of type
%s" % (type(instance)))
+                        raise TypeError
+
 		return self.baseDataObject.removeInstance(instance)
 	
 	def acquireHost(self, hostId):
+	if type(hostId) is not int:
+                        self.log.exception("Argument is not of type int, but of type %s"
% (type(hostId)))
+                        raise TypeError
+
 		return self.baseDataObject.acquireHost(hostId)
 	
 	def releaseHost(self, host):
+		if type(host) is not Instance:
+                        self.log.exception("Argument is not of type Host, but of type %s"
% (type(host)))
+                        raise TypeError
+
 		return self.baseDataObject.releaseHost(host)
 	
 	def getHosts(self):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Thu Mar  1 01:11:38 2012
@@ -130,6 +130,10 @@ class SQL(DataInterface):
 		return h
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			if (instance.id is not None and instance.id not in self.getInstances()):
@@ -173,6 +177,10 @@ class SQL(DataInterface):
 		return instance
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			l = self.makeInstanceList(instance)
@@ -191,6 +199,10 @@ class SQL(DataInterface):
 			self.instanceLock.release()
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
@@ -205,6 +217,10 @@ class SQL(DataInterface):
 			self.instanceLock.release()
 	
 	def acquireHost(self, hostId):
+		if type(hostId) is not int:
+			self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+			raise TypeError
+
 		host = self.getHost(hostId)
 		self.hostLock.acquire()
 		self.hostLocks[host.id] = self.hostLocks.get(host.id, threading.Lock())
@@ -214,6 +230,10 @@ class SQL(DataInterface):
 		return host
 	
 	def releaseHost(self, host):
+		if type(host) is not Host:
+			self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+			raise TypeError
+
 		l = self.makeHostList(host)
 		s = ""
 		for e in range(0, len(self.hostOrder)):

Modified: incubator/tashi/trunk/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/connectionmanager.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/trunk/src/tashi/connectionmanager.py Thu Mar  1 01:11:38 2012
@@ -16,14 +16,16 @@
 # under the License.    
 
 from tashi.rpycservices import rpycservices
+from tashi import Connection
 #from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
-	def __init__(self, username, password, port, timeout=10000.0):
+	def __init__(self, username, password, port, timeout=10000.0, authAndEncrypt=False):
 		self.username = username
 		self.password = password
 		self.timeout = timeout
 		self.port = port
+		self.authAndEncrypt = authAndEncrypt
 	
 	def __getitem__(self, hostname):
 		port = self.port
@@ -31,4 +33,4 @@ class ConnectionManager(object):
 			port = hostname[1]
 			hostname = hostname[0]
 
-		return rpycservices.client(hostname, port, username=self.username, password=self.password)
+		return Connection(hostname, port, credentials=(self.username, self.password), authAndEncrypt=self.authAndEncrypt)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu Mar  1 01:11:38
2012
@@ -112,6 +112,8 @@ class NodeManagerService(object):
 			notifyCM = []
 			try:
 				while (len(self.notifyCM) > 0):
+					# XXXstroucki ValueError: need more than 1 value to unpack
+					# observed here. How?
 					value = self.notifyCM.pop(0)
 					(instanceId, newInst, old, success) = value
 					try:

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Thu Mar  1 01:11:38 2012
@@ -852,6 +852,54 @@ class Qemu(VmControlInterface):
 	def listVms(self):
 		return self.controlledVMs.keys()
 
+	def __processVmStats(self, vmId):
+		try:
+			f = open("/proc/%d/stat" % (vmId))
+			procData = f.read()
+			f.close()
+		except:
+			log.warning("Unable to get data for instance %d" % vmId)
+			return
+
+		ws = procData.strip().split()
+		userTicks = float(ws[13])
+		sysTicks = float(ws[14])
+		myTicks = userTicks + sysTicks
+		vsize = (int(ws[22]))/1024.0/1024.0
+		rss = (int(ws[23])*4096)/1024.0/1024.0
+		cpuSeconds = myTicks/ticksPerSecond
+		lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
+		cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
+		cpuStats[vmId] = cpuSeconds
+		try:
+			child = self.controlledVMs[vmId]
+		except:
+			log.warning("Unable to obtain information on instance %d" % vmId)
+			return
+
+		(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
+		for i in range(0, len(child.instance.nics)):
+			netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
+			(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0,
0.0, 0.0))
+			(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs,
recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
+		self.stats[vmId] = self.stats.get(vmId, {})
+		child = self.controlledVMs.get(vmId, None)
+		if (child):
+			res = self.__enterCommand(child, "info blockstats")
+			for l in res.split("\n"):
+				(device, sep, data) = stringPartition(l, ": ")
+				if (data != ""):
+					for field in data.split(" "):
+						(label, sep, val) = stringPartition(field, "=")
+						if (val != ""):
+							self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s'
% (device, label), 0)))/self.statsInterval
+							self.stats[vmId]['%s_%s' % (device, label)] = int(val)
+		self.stats[vmId]['cpuLoad'] = cpuLoad
+		self.stats[vmId]['rss'] = rss
+		self.stats[vmId]['vsize'] = vsize
+		self.stats[vmId]['recvMBs'] = sendMBs
+		self.stats[vmId]['sendMBs'] = recvMBs
+
 	# thread
 	def statsThread(self):
 		ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
@@ -889,43 +937,11 @@ class Qemu(VmControlInterface):
 						recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
 						sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
 						netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+
+
 				for vmId in self.controlledVMs:
-					f = open("/proc/%d/stat" % (vmId))
-					procData = f.read()
-					f.close()
-					ws = procData.strip().split()
-					userTicks = float(ws[13])
-					sysTicks = float(ws[14])
-					myTicks = userTicks + sysTicks
-					vsize = (int(ws[22]))/1024.0/1024.0
-					rss = (int(ws[23])*4096)/1024.0/1024.0
-					cpuSeconds = myTicks/ticksPerSecond
-					lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
-					cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
-					cpuStats[vmId] = cpuSeconds
-					child = self.controlledVMs[vmId]
-					(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
-					for i in range(0, len(child.instance.nics)):
-						netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
-						(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0,
0.0, 0.0))
-						(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs,
recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
-					self.stats[vmId] = self.stats.get(vmId, {})
-					child = self.controlledVMs.get(vmId, None)
-					if (child):
-						res = self.__enterCommand(child, "info blockstats")
-						for l in res.split("\n"):
-							(device, sep, data) = stringPartition(l, ": ")
-							if (data != ""):
-								for field in data.split(" "):
-									(label, sep, val) = stringPartition(field, "=")
-									if (val != ""):
-										self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s'
% (device, label), 0)))/self.statsInterval
-										self.stats[vmId]['%s_%s' % (device, label)] = int(val)
-					self.stats[vmId]['cpuLoad'] = cpuLoad
-					self.stats[vmId]['rss'] = rss
-					self.stats[vmId]['vsize'] = vsize
-					self.stats[vmId]['recvMBs'] = sendMBs
-					self.stats[vmId]['sendMBs'] = recvMBs
+					self.__processVmStats(vmId)
+
 			except:
 				log.exception("statsThread threw an exception")
 			last = now

Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=1295368&r1=1295367&r2=1295368&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Thu Mar  1 01:11:38 2012
@@ -27,6 +27,7 @@ import time
 import traceback
 import types
 import getpass
+import functools
 
 from tashi.rpycservices import rpycservices
 from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -268,6 +269,68 @@ def scrubString(s, allowed="ABCDEFGHIJKL
 			ns = ns + c
 	return ns
 
+class Connection:
+	def __init__(self, host, port, authAndEncrypt=False, credentials=None):
+		self.host = host
+		self.port = port
+		self.credentials = credentials
+		self.authAndEncrypt = authAndEncrypt
+		self.connection = None
+		# XXXstroucki some thing may still depend on this (client)
+		self.username = None
+		if credentials is not None:
+			self.username = credentials[0]
+
+	def __connect(self):
+		# create new connection
+
+		username = None
+		password = None
+
+		if self.credentials is not None:
+			username = self.credentials[0]
+			password = self.credentials[1]
+
+		if self.authAndEncrypt:
+			if username is None:
+				username = raw_input("Enter Username:")
+
+			if password is None:
+				password = raw_input("Enter Password:")
+
+			if self.credentials != (username, password):
+				self.credentials = (username, password)
+
+			client = rpycservices.client(self.host, self.port, username=username, password=password)
+		else:
+			client = rpycservices.client(self.host, self.port)
+
+		self.connection = client
+
+
+	def __do(self, name, *args, **kwargs):
+		if self.connection is None:
+			self.__connect()
+
+		remotefn = getattr(self.connection, name, None)
+
+		try:
+			if callable(remotefn):
+				returns = remotefn(*args, **kwargs)
+
+			else:
+				raise TashiException({'msg':'%s not callable' % name})
+
+		except:
+			self.connection = None
+			raise
+
+		return returns
+
+	def __getattr__(self, name):
+		return functools.partial(self.__do, name)
+
+
 def createClient(config):
 	cfgHost = config.get('Client', 'clusterManagerHost')
 	cfgPort = config.get('Client', 'clusterManagerPort')
@@ -281,14 +344,12 @@ def createClient(config):
 	authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
 	if authAndEncrypt:
 		username = config.get('AccessClusterManager', 'username')
-		if username == '':
-			username = raw_input('Enter Username:')
 		password = config.get('AccessClusterManager', 'password')
-		if password == '':
-			password = getpass.getpass('Enter Password:')
-		client = rpycservices.client(host, port, username=username, password=password)
+		client = Connection(host, port, authAndEncrypt, (username, password))
+
 	else:
-		client = rpycservices.client(host, port)
+		client = Connection(host, port)
+
 	return client
 
 def enumToStringDict(cls):



Mime
View raw message