From tashi-commits-return-101-apmail-incubator-tashi-commits-archive=incubator.apache.org@incubator.apache.org Wed Jul 08 20:15:56 2009 Return-Path: Delivered-To: apmail-incubator-tashi-commits-archive@minotaur.apache.org Received: (qmail 76617 invoked from network); 8 Jul 2009 20:15:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Jul 2009 20:15:56 -0000 Received: (qmail 88699 invoked by uid 500); 8 Jul 2009 20:16:06 -0000 Delivered-To: apmail-incubator-tashi-commits-archive@incubator.apache.org Received: (qmail 88678 invoked by uid 500); 8 Jul 2009 20:16:06 -0000 Mailing-List: contact tashi-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: tashi-dev@incubator.apache.org Delivered-To: mailing list tashi-commits@incubator.apache.org Received: (qmail 88668 invoked by uid 99); 8 Jul 2009 20:16:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jul 2009 20:16:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jul 2009 20:15:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 80C6B2388857; Wed, 8 Jul 2009 20:15:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r792287 - in /incubator/tashi/trunk: etc/TashiDefaults.cfg src/tashi/agents/mauipacket.py src/tashi/agents/mauiwiki.py src/tashi/agents/primitive.py Date: Wed, 08 Jul 2009 20:15:37 -0000 To: tashi-commits@incubator.apache.org From: mryan3@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090708201537.80C6B2388857@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mryan3 Date: Wed Jul 8 20:15:36 2009 New Revision: 792287 URL: http://svn.apache.org/viewvc?rev=792287&view=rev Log: Jim Cipar's Maui patches with modifications: 1. The tashi.publisher object is initialized in mauiwiki.py 2. The "publisher" config option was moved into the "Agent" section of the file to avoid duplication 3. Indentation cleanups This patch adds a new agent: mauiwiki, that allows the maui cluster scheduler to use Tashi as a resource manager. It basically translates between the Tashi cluster manager interface, and the maui wiki interface ( http://www.clusterresources.com/products/maui/docs/wiki/ ). There is a bug in the way checksums are converted to strings: leading 0s are truncated. Here is the fixed patch for mauipacket.py Added: incubator/tashi/trunk/src/tashi/agents/mauipacket.py incubator/tashi/trunk/src/tashi/agents/mauiwiki.py (with props) Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg incubator/tashi/trunk/src/tashi/agents/primitive.py Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=792287&r1=792286&r2=792287&view=diff ============================================================================== --- incubator/tashi/trunk/etc/TashiDefaults.cfg (original) +++ incubator/tashi/trunk/etc/TashiDefaults.cfg Wed Jul 8 20:15:36 2009 @@ -113,11 +113,20 @@ clusterManagerTimeout = 5.0 # Agent portion +[Agent] +publisher = tashi.messaging.GangliaPublisher + [Primitive] hook1 = tashi.agents.DhcpDns scheduleDelay = 2.0 densePack = False -publisher = tashi.messaging.GangliaPublisher + +[MauiWiki] +hook1 = tashi.agents.DhcpDns +refreshTime = 5 +authuser = changeme +authkey = 1111 +defaultJobTime = 8640000000 [DhcpDns] dnsKeyFile = /location/of/private/key/for/dns Added: incubator/tashi/trunk/src/tashi/agents/mauipacket.py URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauipacket.py?rev=792287&view=auto ============================================================================== --- incubator/tashi/trunk/src/tashi/agents/mauipacket.py (added) +++ incubator/tashi/trunk/src/tashi/agents/mauipacket.py Wed Jul 8 20:15:36 2009 @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import subprocess +import time +import pseudoDes + +class MauiPacket: + def __init__(self, key=0): + self.size = 0 + self.char = '\n' + self.chksum = '0'*16 + self.timestamp = int(time.time()) + self.auth = '' + self.data = [] + self.msg = '' + self.key=key + def readPacket(self, istream): + self.msg = '' + + size = istream.read(8) + self.msg = self.msg+size + self.size = int(size) + + self.char = istream.read(1) + self.msg = self.msg + self.char + + packet = istream.read(self.size) + self.msg = self.msg + packet + + packet = packet.split() + + for i in range(len(packet)): + item = packet[i].split('=') + if item[0] == 'CK': + self.chksum = item[1] + if item[0] == 'TS': + self.timestamp = int(item[1]) + if item[0] == 'AUTH': + self.auth = item[1] + if item[0] == 'DT': + self.data = packet[i:] + self.data=self.data[0].split('=',1)[1:] + self.data[1:] + + def checksumMessage(self, message, key=None): + if key == None: + key = self.key + if type(key) == type(''): + key = int(key) + chksum = pseudoDes.generateKey(message, key) + chksum = '%016x' % chksum + return chksum + def getChecksum(self): + cs = self.msg.partition('TS=') + cs = cs[1]+cs[2] + chksum = self.checksumMessage(cs) + return chksum + def verifyChecksum(self): + chksum = self.getChecksum() + if chksum != self.chksum: + print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum) + print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum)) + return False + return True + def set(self, data, auth=None, key=None, timestamp=None): + if timestamp==None: + timestamp = int(time.time()) + self.data = data + if auth !=None: + self.auth = auth + if key != None: + self.key = key + self.timstamp=timestamp + self.fixup() + def fixup(self): + datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data))) + self.chksum = self.checksumMessage(datastring) + + pktstring = 'CK=%s %s'%(self.chksum, datastring) + self.size = len(pktstring) + def __str__(self): + datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data))) + self.chksum = self.checksumMessage(datastring) + + pktstring = 'CK=%s %s'%(self.chksum, datastring) + self.msg = '' + self.msg = self.msg + '%08i'%len(pktstring) + self.msg = self.msg + self.char + self.msg = self.msg + pktstring + + return self.msg + def prettyString(self): + s = '''Maui Packet +----------- +size:\t\t%i +checksum:\t%s +timestamp:\t%s +auth:\t\t%s +data: +%s +-----------''' + s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data) + return s Added: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauiwiki.py?rev=792287&view=auto ============================================================================== --- incubator/tashi/trunk/src/tashi/agents/mauiwiki.py (added) +++ incubator/tashi/trunk/src/tashi/agents/mauiwiki.py Wed Jul 8 20:15:36 2009 @@ -0,0 +1,467 @@ +#! /usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +import hashlib +import sys +import subprocess +import socket, SocketServer +from socket import gethostname +import os +import threading +import logging.config + +from tashi.parallel import synchronizedmethod +from tashi.services.ttypes import * +from tashi.util import getConfig, createClient, instantiateImplementation, boolean +from tashi.agents.mauipacket import MauiPacket +import tashi.util + +def jobnameToId(jobname): + return int(jobname.split('.')[-1]) + +class InstanceHooks(): + def __init__(self, config): + self.log = logging.getLogger(__file__) + self.hooks=[] + items = config.items("MauiWiki") + items.sort() + for item in items: + (name, value) = item + name = name.lower() + if (name.startswith("hook")): + try: + self.hooks.append(instantiateImplementation(value, config, client, transport, False)) + except: + self.log.exception("Failed to load hook %s" % (value)) + (self.client, self.transport) = createClient(config) + def preCreate(self, inst): + for hook in self.hooks: + hook.preCreate(inst) + def postDestroy(self, inst): + for hook in self.hooks: + hook.postDestroy(inst) + def idToInst(self, id): + instances = self.client.getInstances() + print 'instances ', instances + insts = [i for i in instances if str(i.id)==str(id)] + if len(insts) == 0: + raise "No instance with ID %s"%id + if len(insts) > 1: + raise "Multiple instances with ID %s"%id + inst = insts[0] + return inst + def destroyById(self, id): + inst = self.idToInst(id) + self.client.destroyVm(int(id)) + self.postDestroy(inst) + def activateById(self, id, host): + inst = self.idToInst(id) + self.preCreate(inst) + self.client.activateVm(int(id), host) + +def cmplists(a, b): + for i in range(len(a)): + if a[i] < b[i]: + return -1 + if a[i] > b[i]: + return 1 + return 0 + +class TashiConnection(threading.Thread): + def __init__(self, config, client, transport): + (self.client, self.transport) = createClient(config) + + self.hosts={} + self.instances={} + self.users={} + + self.config = config + self.ihooks = InstanceHooks(config) + self.log = logging.getLogger(__file__) + self.refreshTime = float(self.config.get('MauiWiki', 'refreshTime')) + self.defaultJobTime = str(self.config.get('MauiWiki', 'defaultJobTime')) + threading.Thread.__init__(self) + self.daemon = True + def run(self): + while True: + print 'TashiConnection:run updating hosts ...' + self.updateHosts() + print 'TashiConnection:run updating instances ...' + self.updateInstances() + print 'TashiConnection:run updating users ...' + self.updateUsers() + time.sleep(self.refreshTime) + def wikiHostState(self, host): + '''Returns a string representing the host state in a form compatable + with the maui-wiki protocol. This code simply chooses between + "Down" and "Unknown": + + - Busy: Node is running some jobs and will not accept additional jobs + - Down: Resource Manager problems have been detected. Node is + incapable of running jobs. + - Draining: Node is responding but will not accept new jobs + - Idle: Node is ready to run jobs but currently is not running any. + Running: Node is running some jobs and will accept additional jobs + - Unknown: Node is capable of running jobs but the scheduler + will need to determine if the node state is actually Idle, + Running, or Busy.''' + if host.up == False or host.state == HostState.VersionMismatch: + return "Down" + if host.state == HostState.Drained: + return "Draining" + return "Unknown" + + def wikiInstanceState(self, instance): + '''Returns a string representing the instance stat in a form compatable + with the maui-wiki protocol. + + Completed: Job has completed + Hold: Job is in the queue but is not allowed to run + Idle: Job is ready to run + Removed: Job has been canceled or otherwise terminated externally + Running: Job is currently executing + Suspended: job has started but execution has temporarily been suspended''' + tashiToWiki = {InstanceState.Pending:'Idle', + InstanceState.Held:'Hold', + InstanceState.Exited:'Removed'} + if tashiToWiki.has_key(instance.state): + return tashiToWiki[instance.state] + else: + return 'Running' + + # Host handling + def compareHosts(self, host1, host2): + def ii(host): + try: + state = tashi.util.hostStates[host.state] + except: + state = 'Unknown' + return [host.id, host.name, host.up, state, host.memory, host.cores] + return cmplists(ii(host1), ii(host2)) +# @synchronizedmethod + def updateHost(self, host): + self.hosts[host.id] = host + self.hosts[host.id].updateTime = time.time() +# @synchronizedmethod + def addHost(self, host): + self.hosts[host.id] = host + self.hosts[host.id].updateTime = time.time() +# @synchronizedmethod + def removeHost(self, host): + self.hosts.pop(host.id) +# @synchronizedmethod + def updateHosts(self): + if (not self.transport.isOpen): + self.transport.open() + hosts = self.client.getHosts() + for host in hosts: + if not self.hosts.has_key(host.id): + self.addHost(host) + elif self.compareHosts(self.hosts[host.id], host) != 0: + self.updateHost(host) + hhosts = {} + for host in hosts: + hhosts[host.id] = host + for host in self.hosts.values(): + if not hhosts.has_key(host.id): + self.removeHost(host) + # Instance handling + def compareInstances(self, instance1, instance2): + def ii(inst): + return [inst.id, + inst.vmId, + inst.hostId, + tashi.util.vmStates[inst.state], + inst.userId, + inst.name, + inst.cores, + inst.memory, + len(inst.disks), # FIXME: this isn't a good way to compare + len(inst.nics), # FIXME: this isn't a good way to compare + len(inst.hints)] + return cmplists(ii(instance1), ii(instance2)) + return 0 + @synchronizedmethod + def updateInstance(self, instance): + qt = self.instances[instance.id].queueTime + self.instances[instance.id] = instance + self.instances[instance.id].updateTime = time.time() + self.instances[instance.id].queueTime = qt + @synchronizedmethod + def addInstance(self, instance): + self.instances[instance.id] = instance + self.instances[instance.id].updateTime = time.time() + self.instances[instance.id].queueTime = time.time() + @synchronizedmethod + def removeInstance(self, instance): + self.instances[instance.id].state = InstanceState.Exited + self.ihooks.postDestroy(instance) + @synchronizedmethod + def updateInstances(self): + if (not self.transport.isOpen): + self.transport.open() + instances = self.client.getInstances() + for instance in instances: + print 'found instance', instance.id + if not self.instances.has_key(instance.id): + print "it's a new instance" + self.addInstance(instance) + elif self.compareInstances(self.instances[instance.id], instance) != 0: + self.updateInstance(instance) + iinsts = {} + for instance in instances: + iinsts[instance.id] = instance + for instance in self.instances.values(): + if instance.state == InstanceState.Exited: + continue + if not iinsts.has_key(instance.id): + print 'removing instance ', instance.id + self.removeInstance(instance) + # User handling + def compareUsers(self, user1, user2): + if user1.id < user2.id: + return -1 + elif user1.id > user2.id: + return 1 + if user1.name < user2.name: + return -1 + if user1.name > user2.name: + return 1 + return 0 + @synchronizedmethod + def updateUser(self, user): + self.users[user.id] = user + self.users[user.id].updatetime = time.time() + @synchronizedmethod + def addUser(self, user): + self.users[user.id] = user + self.users[user.id].updatetime = time.time() + @synchronizedmethod + def removeUser(self, user): + self.users.pop(user.id) + @synchronizedmethod + def updateUsers(self): + if (not self.transport.isOpen): + self.transport.open() + users = self.client.getUsers() + for user in users: + if not self.users.has_key(user.id): + self.addUser(user) + elif self.compareUsers(self.users[user.id], user) != 0: + self.updateUser(user) + uusers = {} + for user in users: + uusers[user.id] = user + for user in self.users.values(): + if not uusers.has_key(user.id): + self.removeUser(user) + # Get data structures for maui + # Format is {id:{field:value}} + @synchronizedmethod + def getNodes(self, updatetime=0, nodelist=['ALL']): + if len(nodelist) == 0: + return {} + if nodelist[0]=='ALL': + nodes = [n for n in self.hosts.values() if n.updateTime >= updatetime] + else: + nodes = [n for n in self.hosts.values() + if n.updateTime >= updatetime and n.name in nodelist] + nl = {} + for node in nodes: + nl[node.name] = {'STATE':self.wikiHostState(node), + 'UPDATETIME':str(int(node.updateTime)), + 'CPROC':str(node.cores), + 'CMEMORY':str(node.memory)} + return nl + @synchronizedmethod + def getJobs(self, updatetime=0, joblist=['ALL']): + if len(joblist) == 0: + return {} + if joblist[0] == 'ALL': + jobs = [j for j in self.instances.values() if j.updateTime >= updatetime] + else: + jobs = [j for j in self.instances.values() + if j.updateTime >= updatetime and j.id in joblist] + jl = {} + for job in jobs: + id = "%s.%i"%(job.name, job.id) + jl[id] = {'STATE':self.wikiInstanceState(job), + 'UNAME':self.users[job.userId].name, + 'GNAME':self.users[job.userId].name, + 'UPDATETIME':int(job.updateTime), + 'QUEUETIME':job.queueTime, + 'TASKS':'1', + 'DPROCS':str(job.cores), + 'DMEM':str(job.memory), + 'RMEM':str(job.memory), + 'WCLIMIT':str(self.defaultJobTime)} + if job.hostId != None: + jl[id]['TASKLIST'] = self.hosts[job.hostId].name + return jl + @synchronizedmethod + def activateById(self, id, host): + if not self.instances.has_key(id): + raise "no such instance" + self.ihooks.activateById(id, host) + self.instances[id].state=InstanceState.Activating + +class MauiListener(SocketServer.StreamRequestHandler): + def setup(self): + global config + self.log = logging.getLogger(__file__) + SocketServer.StreamRequestHandler.setup(self) + self.ihooks = InstanceHooks(config) + (self.client, self.transport) = createClient(config) + self.tashiconnection=tashiconnection + self.auth = config.get('MauiWiki', 'authuser') + self.key = config.get('MauiWiki', 'authkey') + + def handle(self): + p = MauiPacket(key=self.key) + self.istream = self.ostream = self.rfile + p.readPacket(self.istream) + self.processPacket(p) + + def processGetNodes(self, p): + arg = p.data[1] + arg = arg.split('=') + arg = arg[1].split(':') + updatetime = int(arg[0]) + nodelist = arg[1:] + print 'got GETNODES packet "%s" "%s"'%(updatetime, nodelist) + r = MauiPacket() + nodes = tashiconnection.getNodes(updatetime, nodelist) + numNodes = len(nodes) + dat = 'ARG=%i#'%numNodes + first = True + for node, attributes in nodes.iteritems(): + if first: + dat = dat + '%s:'%node + first = False + else: + dat = dat + '#%s:'%node + attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()] + dat = dat + ';'.join(attrs)+';' + r.set(['SC=0', dat], auth=self.auth, key=self.key) + return r + + def processGetJobs(self, p): + arg = p.data[1] + arg = arg.split('=') + arg = arg[1].split(':') + updatetime = int(arg[0]) + joblist = arg[1:] + print 'got GETJOBS packet "%s" "%s"'%(updatetime, joblist) + r = MauiPacket(); + jobs = tashiconnection.getJobs(updatetime, joblist) + numJobs = len(jobs) + dat = 'ARG=%i#'%numJobs + first = True + for job, attributes in jobs.iteritems(): + if first: + dat = dat +'%s:'%job + first = False + else: + dat = dat +'#%s:'%job + # FIXME: support limits + attributes['WCLIMIT']=str(10000) + attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()] + dat = dat + ';'.join(attrs) + ';' + r.set(['SC=0', dat], auth=self.auth, key=self.key) + return r + + def processStartJob(self, p): + job = p.data[1] + job = job.split('=')[1].strip() + job = job.split('.')[-1] + tasklist = p.data[2].split('=')[1].split(':') + print 'STARTJOB ', job, tasklist + try: + hosts = self.client.getHosts() + print 'hosts ', hosts + host = [h for h in hosts if h.name == tasklist[0]][0] + self.tashiconnection.activateById(jobnameToId(job), host) + print '\tactivated VM!' + r = MauiPacket() + r.set(['SC=0','RESPONSE=VM %s started on host %s'%(job, tasklist[0])]) + return r + except Exception, e: + # FIXME: make this a real failure response + print 'Oh noes! ', e + r = MauiPacket() + r.set(['SC=-1', 'RESPONSE=%s'%str(e)]) + return r + + def processCancelJob(self, p): + job = p.data[1] + job = job.split('=')[1].strip() + print 'CANCELJOB ', job + try: + self.client.destroyVm(jobnameToId(job)) + print '\tdestroyed VM!' + r = MauiPacket() + r.set(['SC=0', 'RESPONSE=VM %s destroyed'%job]) + return r + except Exception, e: + # FIXME: make this a real failure response + print 'Oh noes! ', e + r = MauiPacket() + r.set(['SC=-1', 'RESPONSE=%s'%str(e)]) + return r + + def processPacket(self,p): + dat = p.data + if not p.verifyChecksum(): + print p + print 'bad checksum' + return + r = None + if dat[0] == 'CMD=GETNODES': + r = self.processGetNodes(p) + elif dat[0] == 'CMD=GETJOBS': + r = self.processGetJobs(p) + elif dat[0] == 'CMD=STARTJOB': + r = self.processStartJob(p) + elif dat[0] == 'CMD=CANCELJOB': + r = self.processCancelJob(p) + else: + print 'got unknown packet' + print p.prettyString() + r = MauiPacket() + r.set(['SC=-810', 'RESPONSE=command not supported']) + print r.prettyString() + self.ostream.write(str(r)) + self.ostream.flush() + self.ostream.close() + self.ostream.close() + self.istream.close() + +if __name__ == '__main__': + (config, configFiles) = getConfig(["Agent"]) + publisher = instantiateImplementation(config.get("Agent", "publisher"), config) + tashi.publisher = publisher + (client, transport) = createClient(config) + logging.config.fileConfig(configFiles) + tashiconnection = TashiConnection(config, client, transport) + tashiconnection.start() + + HOST, PORT = '', 1717 + server = SocketServer.TCPServer((HOST,PORT), MauiListener) + server.serve_forever() Propchange: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py ------------------------------------------------------------------------------ svn:executable = * Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=792287&r1=792286&r2=792287&view=diff ============================================================================== --- incubator/tashi/trunk/src/tashi/agents/primitive.py (original) +++ incubator/tashi/trunk/src/tashi/agents/primitive.py Wed Jul 8 20:15:36 2009 @@ -135,7 +135,7 @@ def main(): (config, configFiles) = getConfig(["Agent"]) - publisher = instantiateImplementation(config.get("Primitive", "publisher"), config) + publisher = instantiateImplementation(config.get("Agent", "publisher"), config) tashi.publisher = publisher (client, transport) = createClient(config) logging.config.fileConfig(configFiles)