incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mry...@apache.org
Subject svn commit: r750133 - in /incubator/tashi/trunk: ./ etc/ src/tashi/agents/
Date Wed, 04 Mar 2009 20:06:18 GMT
Author: mryan3
Date: Wed Mar  4 20:06:17 2009
New Revision: 750133

URL: http://svn.apache.org/viewvc?rev=750133&view=rev
Log:
Reorganization of the agent code:
The DHCP and DNS code is now a hook in the primitive scheduler and is turned on and off via
config options instead of being mostly duplicated code
The relevant options were added to TashiDefaults.cfg
The primitive scheduler is also placed in bin/ by the Makefile now


Added:
    incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
    incubator/tashi/trunk/src/tashi/agents/instancehook.py
    incubator/tashi/trunk/src/tashi/agents/primitive.py
Removed:
    incubator/tashi/trunk/src/tashi/agents/dhcpdnsscheduler.py
    incubator/tashi/trunk/src/tashi/agents/examplepolicy.py
Modified:
    incubator/tashi/trunk/Makefile
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/agents/__init__.py

Modified: incubator/tashi/trunk/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/Makefile?rev=750133&r1=750132&r2=750133&view=diff
==============================================================================
--- incubator/tashi/trunk/Makefile (original)
+++ incubator/tashi/trunk/Makefile Wed Mar  4 20:06:17 2009
@@ -55,10 +55,10 @@
 	if test -d src/tashi/thrift/gen-py; then echo Removing tashi.thrift.gen-py...; rm -rf src/tashi/thrift/gen-py;
fi
 	if test -d src/tashi/messaging/messagingthrift; then echo Removing tashi.messaging.messagingthrift;
rm -rf src/tashi/messaging/messagingthrift; fi
 
-bin: bindir bin/clustermanager.py bin/nodemanager.py bin/tashi-client.py
+bin: bindir bin/clustermanager.py bin/nodemanager.py bin/tashi-client.py bin/primitive.py
 bindir:
 	if test ! -d bin; then mkdir bin; fi
-rmbin: rmclustermanager rmnodemanager rmtashi-client
+rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive
 	if test -d bin; then rmdir bin; fi
 bin/getInstances: src/tashi/services
 	if test ! -e bin/getInstances; then (echo "Generating client symlinks..."; cd bin; PYTHONPATH=../src
../src/tashi/client/client.py --makesyms); fi
@@ -74,6 +74,11 @@
 	(cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py .)
 rmnodemanager:
 	if test -e bin/nodemanager.py; then echo Removing nodemanager symlink...; rm bin/nodemanager.py;
fi
+bin/primitive.py: src/tashi/agents/primitive.py
+	@echo Symlinking in primitive...
+	(cd bin; ln -s ../src/tashi/agents/primitive.py .)
+rmprimitive:
+	if test -e bin/primitive.py; then echo Removing primitve-agent symlink...; rm bin/primitive.py;
fi
 bin/tashi-client.py:
 	@echo Symlinking in tashi-client...
 	(cd bin; ln -s ../src/tashi/client/tashi-client.py .)

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=750133&r1=750132&r2=750133&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Wed Mar  4 20:06:17 2009
@@ -88,7 +88,11 @@
 clusterManagerTimeout = 5.0
 
 # Agent portion
-[DhcpDnsScheduler]
+[Primitive]
+hook1 = tashi.agents.DhcpDns
+scheduleDelay = 2.0
+
+[DhcpDns]
 dnsKeyFile = /location/of/private/key/for/dns
 dnsServer = 1.2.3.4 53
 dnsDomain = your.domain.com

Modified: incubator/tashi/trunk/src/tashi/agents/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/__init__.py?rev=750133&r1=750132&r2=750133&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/__init__.py Wed Mar  4 20:06:17 2009
@@ -15,3 +15,5 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+from instancehook import InstanceHook
+from dhcpdns import DhcpDns

Added: incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/dhcpdns.py?rev=750133&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/dhcpdns.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/dhcpdns.py Wed Mar  4 20:06:17 2009
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+import logging
+import os
+import socket
+from instancehook import InstanceHook
+from tashi import boolean
+
+class DhcpDns(InstanceHook):
+	def __init__(self, config, client, transport, post=False):
+		InstanceHook.__init__(self, config, client, post)
+		self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
+		self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
+		self.dnsDomain = self.config.get('DhcpDns', 'dnsDomain')
+		self.dnsExpire = int(self.config.get('DhcpDns', 'dnsExpire'))
+		self.dhcpServer = self.config.get('DhcpDns', 'dhcpServer')
+		self.dhcpKeyName = self.config.get('DhcpDns', 'dhcpKeyName')
+		self.dhcpSecretKey = self.config.get('DhcpDns', 'dhcpSecretKey')
+		self.ipRange = self.config.get('DhcpDns', 'ipRange')
+		self.reverseDns = boolean(self.config.get('DhcpDns', 'reverseDns'))
+		self.log = logging.getLogger(__file__)
+		(ip, bits) = self.ipRange.split("/")
+		bits = int(bits)
+		ipNum = self.strToIp(ip)
+		self.ipMin = ((ipNum>>(32-bits))<<(32-bits)) + 2
+		self.ipMax = self.ipMin + (1<<(32-bits)) - 3
+		self.usedIPs = {}
+		self.currentIP = self.ipMin
+		instances = self.client.getInstances()
+		for i in instances:
+			try:
+				ip = socket.gethostbyname(i.name)
+				ipNum = self.strToIp(ip)
+				self.log.info('Added %s->%s during reinitialization' % (i.name, ip))
+				self.usedIPs[ipNum] = ip
+			except Exception, e:
+				pass
+		
+	def strToIp(self, s):
+		ipNum = reduce(lambda x, y: x*256+y, map(int, s.split(".")))
+		return ipNum
+	
+	def ipToStr(self, ip):
+		return "%d.%d.%d.%d" % (ip>>24, (ip>>16)%256, (ip>>8)%256, ip%256)
+	
+	def allocateIP(self):
+		self.currentIP = self.currentIP + 1
+		while (self.currentIP in self.usedIPs or self.currentIP > self.ipMax):
+			if (self.currentIP > self.ipMax):
+				self.currentIP = self.ipMin
+			else:
+				self.currentIP = self.currentIP + 1
+		ipString = self.ipToStr(self.currentIP)
+		self.usedIPs[self.currentIP] = ipString
+		return ipString
+	
+	def addDhcp(self, name, ipaddr, hwaddr):
+		cmd = "omshell"
+		(stdin, stdout) = os.popen2(cmd)
+		stdin.write("server %s\n" % (self.dhcpServer))
+		if (self.dhcpSecretKey != ""):
+			stdin.write("key %s %s\n" % (self.dhcpKeyName, self.dhcpSecretKey))
+		stdin.write("connect\n")
+		stdin.write("new \"host\"\n")
+		stdin.write("set name = \"%s\"\n" % (name))
+		stdin.write("set ip-address = %s\n" % (ipaddr))
+		stdin.write("set hardware-address = %s\n" % (hwaddr))
+		stdin.write("set hardware-type = 00:00:00:01\n") # Ethernet
+		stdin.write("create\n")
+		stdin.close()
+		output = stdout.read()
+		stdout.close()
+
+	def removeDhcp(self, name):
+		cmd = "omshell"
+		(stdin, stdout) = os.popen2(cmd)
+		stdin.write("server %s\n" % (self.dhcpServer))
+		if (self.dhcpSecretKey != ""):
+			stdin.write("key %s %s\n" % (self.dhcpKeyName, self.dhcpSecretKey))
+		stdin.write("connect\n")
+		stdin.write("new \"host\"\n")
+		stdin.write("set name = \"%s\"\n" % (name))
+		stdin.write("open\n")
+		stdin.write("remove\n")
+		stdin.close()
+		output = stdout.read()
+		stdout.close()
+
+	def addDns(self, name, ip):
+		if (self.dnsKeyFile != ""):
+			cmd = "nsupdate -k %s" % (self.dnsKeyFile)
+		else:
+			cmd = "nsupdate"
+		(stdin, stdout) = os.popen2(cmd)
+		stdin.write("server %s\n" % (self.dnsServer))
+		stdin.write("update add %s.%s %d A %s\n" % (name, self.dnsDomain, self.dnsExpire, ip))
+		stdin.write("\n")
+		if (self.reverseDns):
+			ipSegments = map(int, ip.split("."))
+			ipSegments.reverse()
+			reverseIpStr = ("%d.%d.%d.%d.in-addr.arpa" % (ipSegments[0], ipSegments[1], ipSegments[2],
ipSegments[3]))
+			stdin.write("update add %s %d IN PTR %s.%s.\n" % (reverseIpStr, self.dnsExpire, name,
self.dnsDomain))
+			stdin.write("\n")
+		stdin.close()
+		output = stdout.read()
+		stdout.close()
+
+	def removeDns(self, name):
+		if (self.dnsKeyFile != ""):
+			cmd = "nsupdate -k %s" % (self.dnsKeyFile)
+		else:
+			cmd = "nsupdate"
+		(stdin, stdout) = os.popen2(cmd)
+		stdin.write("server %s\n" % (self.dnsServer))
+		if (self.reverseDns):
+			ip = socket.gethostbyname(name)
+			ipSegments = map(int, ip.split("."))
+			ipSegments.reverse()
+			reverseIpStr = ("%d.%d.%d.%d.in-addr.arpa" % (ipSegments[0], ipSegments[1], ipSegments[2],
ipSegments[3]))
+			stdin.write("update delete %s IN PTR\n" % (reverseIpStr))
+			stdin.write("\n")
+		stdin.write("update delete %s.%s A\n" % (name, self.dnsDomain))
+		stdin.write("\n")
+		stdin.close()
+		output = stdout.read()
+		stdout.close()
+	
+	def preCreate(self, instance):
+		ip = self.allocateIP()
+		self.log.info("Adding %s:{%s->%s, %s->%s} to DHCP/DNS" % (instance.name, instance.nics[0].mac,
ip, instance.name, ip))
+		try:
+			self.addDhcp(instance.name, ip, instance.nics[0].mac)
+			self.addDns(instance.name, ip)
+		except Exception, e:
+			self.log.exception("Failed to add host %s to DHCP/DNS" % (instance.name))
+
+	def postDestroy(self, instance):
+		try:
+			ip = socket.gethostbyname(instance.name)
+			ipNum = self.strToIp(ip)
+			del self.usedIPs[ipNum]
+		except Exception, e:
+			self.log.exception("Failed to remove host %s from pool of usedIPs" % (instance.name))
+		self.log.info("Removing %s from DHCP/DNS" % (instance.name))
+		try:
+			self.removeDns(instance.name)
+			self.removeDhcp(instance.name)
+		except Exception, e:
+			self.log.exception("Failed to remove host %s from DHCP/DNS" % (instance.name))
+

Added: incubator/tashi/trunk/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/instancehook.py?rev=750133&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/instancehook.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/instancehook.py Wed Mar  4 20:06:17 2009
@@ -0,0 +1,33 @@
+#! /usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+class InstanceHook(object):
+	def __init__(self, config, client, transport, post=False):
+		if (self.__class__ is InstanceHook):
+			raise NotImplementedError
+		self.config = config
+		self.client = client
+		self.transport = transport
+		self.post = post
+	
+	def preCreate(self, instance):
+		raise NotImplementedError
+	
+	def postDestroy(self, instance):
+		raise NotImplementedError

Added: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=750133&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Wed Mar  4 20:06:17 2009
@@ -0,0 +1,121 @@
+#! /usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+from socket import gethostname
+import os
+import socket
+import sys
+import threading
+import time
+import logging.config
+
+from tashi.services.ttypes import *
+from tashi.util import getConfig, createClient, instantiateImplementation
+
+class Primitive(object):
+	def __init__(self, config, client, transport):
+		self.config = config
+		self.client = client
+		self.transport = transport
+		self.hooks = []
+		self.log = logging.getLogger(__file__)
+		self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
+		items = self.config.items("Primitive")
+		items.sort()
+		for item in items:
+			(name, value) = item
+			name = name.lower()
+			if (name.startswith("hook")):
+				self.hooks.append(instantiateImplementation(value, config, client, transport, False))
+	
+	def start(self):
+		oldInstances = {}
+		while True:
+			try:
+				# Make sure transport is open
+				if (not self.transport.isOpen()):
+					self.transport.open()
+				# Generate a list of VMs/host
+				hosts = {}
+				load = {}
+				for h in self.client.getHosts():
+					hosts[h.id] = h
+					load[h.id] = []
+				load[None] = []
+				_instances = self.client.getInstances()
+				instances = {}
+				for i in _instances:
+					instances[i.id] = i
+				for i in instances.itervalues():
+					if (i.hostId or i.state == InstanceState.Pending):
+						load[i.hostId] = load[i.hostId] + [i.id]
+				# Check for VMs that have exited
+				for i in oldInstances:
+					if (i not in instances):
+						for hook in self.hooks:
+							hook.postDestroy(oldInstances[i])
+				# Schedule new VMs
+				oldInstances = instances
+				if (len(load.get(None, [])) > 0):
+					for i in load[None]:
+						inst = instances[i]
+						try:
+							min = None
+							minHost = None
+							for h in hosts.values():
+								if ((min is None or len(load[h.id]) < min) and h.up == True and h.state == HostState.Normal):
+									memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
+									coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
+									if (memUsage <= h.memory and coreUsage <= h.cores):
+										min = len(load[h.id])
+										minHost = h
+							if (minHost):
+								for hook in self.hooks:
+									hook.preCreate(inst)
+								self.log.info("Scheduling instance %s on host %s" % (inst.name, minHost.name))	
+								self.client.activateVm(i, minHost)
+							else:
+								self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
+						except Exception, e:
+							self.log.exception("Failed to schedule or activate %s" % (inst.name))
+				time.sleep(self.scheduleDelay)
+			except TashiException, e:
+				self.log.exception("Tashi exception")
+				try:
+					self.transport.close()
+				except Exception, e:
+					self.log.exception("Failed to close the transport")
+				time.sleep(self.scheduleDelay)
+			except Exception, e:
+				self.log.exception("General exception")
+				try:
+					self.transport.close()
+				except Exception, e:
+					self.log.exception("Failed to close the transport")
+				time.sleep(self.scheduleDelay)
+
+def main():
+	(config, configFiles) = getConfig(["Agent"])
+	(client, transport) = createClient(config)
+	logging.config.fileConfig(configFiles)
+	agent = Primitive(config, client, transport)
+	agent.start()
+
+if __name__ == "__main__":
+	main()



Mime
View raw message