incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1227907 - in /incubator/tashi/branches/stroucki-accounting/src/tashi: accounting/__init__.py accounting/accounting.py clustermanager/clustermanagerservice.py nodemanager/nodemanagerservice.py rpycservices/rpycservices.py
Date Fri, 06 Jan 2012 00:26:26 GMT
Author: stroucki
Date: Fri Jan  6 00:26:26 2012
New Revision: 1227907

URL: http://svn.apache.org/viewvc?rev=1227907&view=rev
Log:
clustermanagerservice: fix deadlock in logging code with exception handler
nodemanagerservice: activate accounting to server
accounting: run as rpyc service
rpycservices: add (one) accounting rpc

Added:
    incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py
Modified:
    incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accounting.py
    incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/branches/stroucki-accounting/src/tashi/rpycservices/rpycservices.py

Added: incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py?rev=1227907&view=auto
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py (added)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/__init__.py Fri Jan
 6 00:26:26 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+from accountingservice import AccountingService

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accounting.py?rev=1227907&r1=1227906&r2=1227907&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/accounting/accounting.py Fri Jan
 6 00:26:26 2012
@@ -22,12 +22,17 @@ import os
 import socket
 import sys
 import threading
+import signal
 import time
 import random
 import logging.config
 
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
+
 from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole,
signalHandler
 import tashi
 
 class Accounting(object):
@@ -48,20 +53,41 @@ class Accounting(object):
 				except:
 					self.log.exception("Failed to load hook %s" % (value))
 					
-	def start(self):
-		while True:
-			try:
-				instances = self.cm.getInstances()
-				for instance in instances:
-					# XXXstroucki this currently duplicates what the CM was doing.
-					# perhaps implement a diff-like log?
-					self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id,
instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
-			except:
-				self.log.warning("Accounting iteration failed")
-
-			# wait to do the next iteration
-			# XXXstroucki make this configurable?
-			time.sleep(60)
+	def initAccountingServer(self):
+		service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+
+		if boolean(self.config.get("Security", "authAndEncrypt")):
+			users = {}
+			userDatabase = data.getUsers()
+			for user in userDatabase.values():
+				if user.passwd != None:
+					users[user.name] = user.passwd
+			users[self.config.get('AllowedUsers', 'clusterManagerUser')] = self.config.get('AllowedUsers',
'clusterManagerPassword')
+			users[self.config.get('AllowedUsers', 'nodeManagerUser')] = self.config.get('AllowedUsers',
'nodeManagerPassword')
+			users[self.config.get('AllowedUsers', 'agentUser')] = self.config.get('AllowedUsers',
'agentPassword')
+			authenticator = TlsliteVdbAuthenticator.from_dict(users)
+			t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService',
'port')), auto_register=False, authenticator=authenticator)
+		else:
+			t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService',
'port')), auto_register=False)
+			print "ah"
+
+		t.logger.setLevel(logging.ERROR)
+		t.service.service = service
+		t.service._type = 'AccountingService'
+
+		debugConsole(globals())
+
+		try:
+			t.start()
+		except KeyboardInterrupt:
+			handleSIGTERM(signal.SIGTERM, None)
+
+	@signalHandler(signal.SIGTERM)
+	def handleSIGTERM(signalNumber, stackFrame):
+		global log
+		
+		log.info('Exiting cluster manager after receiving a SIGINT signal')
+		sys.exit(0)
 
 def main():
 	(config, configFiles) = getConfig(["Accounting"])
@@ -70,7 +96,8 @@ def main():
 	cmclient = createClient(config)
 	logging.config.fileConfig(configFiles)
 	accounting = Accounting(config, cmclient)
-	accounting.start()
+
+	accounting.initAccountingServer()
 
 if __name__ == "__main__":
 	main()

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py?rev=1227907&r1=1227906&r2=1227907&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
(original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/clustermanager/clustermanagerservice.py
Fri Jan  6 00:26:26 2012
@@ -99,10 +99,17 @@ class ClusterManagerService(object):
 		hostText = None
 
 		if instance is not None:
-			instanceText = 'Instance(id %d host %d vmId %d user %d cores %d memory %d)' % (instance.id,
instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory)
+			try:
+				instanceText = 'Instance(id %d host %d vmId %d user %d cores %d memory %d)' % (instance.id,
instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory)
+			except:
+				self.log.exception("Invalid instance data")
+				raise
 
 		if host is not None:
-			hostText = "Host(id %d memory %d cores %d)" % (host.id, host.memory, host.cores)
+			try:
+				hostText = "Host(id %d memory %d cores %d)" % (host.id, host.memory, host.cores)
+			except:
+				self.log.exception("Invalid host data")
 
                 secondary = ','.join(filter(None, (hostText, instanceText)))
 
@@ -111,7 +118,8 @@ class ClusterManagerService(object):
 		self.accountBuffer.append(line)
 		self.accountLines += 1
 
-		if (self.accountLines > 5):
+		# XXXstroucki think about autoflush by time
+		if (self.accountLines > 0):
 			self.__ACCOUNTFLUSH()
 
 
@@ -546,7 +554,7 @@ class ClusterManagerService(object):
 
 		self.instanceLastContactTime[instanceId] = self.__now()
 		oldInstance.decayed = False
-		self.__ACCOUNT("CM VM UPDATE", instance=instance)
+		self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
 
 		if (instance.state == InstanceState.Exited):
 			# determine why a VM has exited

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py?rev=1227907&r1=1227906&r2=1227907&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
(original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/nodemanagerservice.py
Fri Jan  6 00:26:26 2012
@@ -113,16 +113,21 @@ class NodeManagerService(object):
 			finally:
 				self.notifyCM.append(notifyCM)
 		except Exception, e:
-			self.log.exception('Failed to register with the CM')
+			self.log.exception('Failed to send data to the CM')
 
 		toSleep = start - time.time() + self.registerFrequency
 		if (toSleep > 0):
 			time.sleep(toSleep)
 
         def __ACCOUNTFLUSH(self):
-                print "Called account flush"
-                self.accountLines = 0
-                self.accountBuffer = []
+		try:
+			from tashi.rpycservices import rpycservices
+			client=rpycservices.client("clustermanager", 31337)
+			client.record(self.accountBuffer)
+			self.accountLines = 0
+			self.accountBuffer = []
+		except:
+			self.log.exception("Failed to flush accounting data")
 
 
         def __ACCOUNT(self, text, instance=None, host=None):
@@ -143,19 +148,22 @@ class NodeManagerService(object):
                 self.accountBuffer.append(line)
                 self.accountLines += 1
 
-                if (self.accountLines > 5):
+		# XXXstroucki think about force flush every so often
+                if (self.accountLines > 0):
                         self.__ACCOUNTFLUSH()
 
 
 	# service thread function
 	def __registerWithClusterManager(self):
 		while True:
+			#self.__ACCOUNT("TESTING")
 			start = time.time()
 			try:
 				instances = self.instances.values()
 				self.id = self.cm.registerNodeManager(self.host, instances)
 			except Exception, e:
 				self.log.exception('Failed to register with the CM')
+
 			toSleep = start - time.time() + self.registerFrequency
 			if (toSleep > 0):
 				time.sleep(toSleep)

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/rpycservices/rpycservices.py?rev=1227907&r1=1227906&r2=1227907&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/rpycservices/rpycservices.py Fri
Jan  6 00:26:26 2012
@@ -21,6 +21,7 @@ import cPickle
 
 clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm',
'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall',
'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage']
 nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm',
'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms',
'vmmSpecificCall', 'getHostInfo', 'liveCheck']
+accountingRPCs = ['record']
 
 def clean(args):
 	"""Cleans the object so cPickle can be used."""
@@ -61,7 +62,7 @@ class client:
 		"""Returns a function that makes the RPC call. No keyword arguments allowed when calling
this function."""
 		if self.conn.closed == True:
 			self.conn = self.createConn()
-		if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+		if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
 			return None
 		def connectWrap(*args):
 			args = cPickle.dumps(clean(args))
@@ -81,6 +82,8 @@ class ManagerService(rpyc.Service):
 	# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
 	def checkValidUser(self, functionName, clientUsername, args):
 		"""Checks whether the operation requested by the user is valid based on clientUsername.
An exception will be thrown if not valid."""
+		if self._type == 'AccountingService':
+			return
 		if self._type == 'NodeManagerService':
 			return
 		if clientUsername in ['nodeManager', 'agent', 'root']:
@@ -114,4 +117,7 @@ class ManagerService(rpyc.Service):
 			return makeCall
 		if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
 			return makeCall
+		if self._type == 'AccountingService' and name in accountingRPCs:
+			return makeCall
+
 		raise AttributeError('RPC does not exist')



Mime
View raw message