incubator-tashi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Cipar <jci...@cs.cmu.edu>
Subject Re: Locality server
Date Tue, 21 Apr 2009 19:04:37 GMT
Everything looks good.  Those changes to xenpv were in response to a  
config change on my machines, so they shouldn't be in there anyway.   
With the new xen patch, it should be able to switch between pygrub,  
external kernels, HVM (testing that now), and pvgrub (not yet tested)  
just by using hints.


On Apr 21, 2009, at 2:48 PM, Ryan, Michael P wrote:

> Changes to Jim's patch:
>        * removed seemingly unrelated change to xenpv.py
>        * indentation fix (thanks "make tidy")
>        * tweaked changes to build.py (maintain similar ordering to  
> previous statements)
>        * made src/tashi/agents/locality-server.py executable
>        * used util.createClient instead of duplicating code in  
> locality-server.py
>        * added a main() method to locality-server.py (common for  
> executable files)
>        * changed the section called "LayoutService" to  
> "LocalityService" (it didn't run without doing this)
>
> Is there anything I wrongly changed?
>
> Index: src/utils/getLocality.py
> ===================================================================
> --- src/utils/getLocality.py    (revision 0)
> +++ src/utils/getLocality.py    (revision 0)
> @@ -0,0 +1,52 @@
> +#!/usr/bin/python
> +
> +import sys
> +import os
> +from os import system
> +
> +import tashi.services.layoutlocality.localityservice as  
> localityservice
> +
> +from thrift import Thrift
> +from thrift.transport import TSocket
> +from thrift.transport import TTransport
> +from thrift.protocol import TBinaryProtocol
> +
> +from tashi.util import getConfig
> +
> +(config, configFiles) = getConfig(["Client"])
> +host = config.get('LocalityService', 'host')
> +port = int(config.get('LocalityService', 'port'))
> +
> +socket = TSocket.TSocket(host, port)
> +transport = TTransport.TBufferedTransport(socket)
> +protocol = TBinaryProtocol.TBinaryProtocol(transport)
> +client = localityservice.Client(protocol)
> +transport.open()
> +
> +while True:
> +       line1 = "\n"
> +       line2 = "\n"
> +       while line1 != "":
> +               line1 = sys.stdin.readline()
> +               if line1 == "":
> +                       sys.exit(0)
> +               if line1 != "\n":
> +                       break
> +       line1 = line1.strip()
> +       while line2 != "":
> +               line2 = sys.stdin.readline()
> +               if line2 == "":
> +                       sys.exit(0)
> +               if line2 != "\n":
> +                       break
> +       line2 = line2.strip()
> +
> +       sources = line1.split(" ")
> +       destinations = line2.split(" ")
> +
> +       mat = client.getHopCountMatrix(sources, destinations)
> +       for r in mat:
> +               for c in r:
> +                       print '%f\t'%c,
> +               print '\n',
> +       print '\n',
>
> Property changes on: src/utils/getLocality.py
> ___________________________________________________________________
> Added: svn:executable
>   + *
>
> Index: src/tashi/thrift/build.py
> ===================================================================
> --- src/tashi/thrift/build.py   (revision 767156)
> +++ src/tashi/thrift/build.py   (working copy)
> @@ -48,3 +48,9 @@
>        print 'Copying generated code to  
> \'tashi.messaging.messagingthrift\' package...'
>        shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
>                        os.path.join('..', 'messaging',  
> 'messagingthrift'))
> +
> +       print 'Generating Python code for \'layoutlocality.thrift 
> \'...'
> +       os.system('thrift --gen py:new_style layoutlocality.thrift')
> +
> +       print 'Copying generated code to \'tashi.services\'  
> package...'
> +       shutil.copytree('gen-py/layoutlocality', '../services/ 
> layoutlocality')
> Index: src/tashi/thrift/layoutlocality.thrift
> ===================================================================
> --- src/tashi/thrift/layoutlocality.thrift      (revision 0)
> +++ src/tashi/thrift/layoutlocality.thrift      (revision 0)
> @@ -0,0 +1,25 @@
> +struct BlockLocation {
> +       list<string> hosts,           // hostnames of data nodes
> +       list<i32> ports,              // ports for data nodes
> +       list<string> names,           // hostname:port of data nodes
> +       i64 blocknum,
> +       i64 offset,
> +       i64 length
> +}
> +
> +struct Pathname {
> +       string pathname
> +}
> +
> +exception FileNotFoundException {
> +       string message
> +}
> +
> +service layoutservice {
> +       list <BlockLocation> getFileBlockLocations(1:Pathname path,  
> 2:i64 offset, 3:i64 length)
> +                            throws (1:FileNotFoundException ouch),
> +}
> +
> +service localityservice {
> +       list <list<double>> getHopCountMatrix(1:list<string>  
> sourceHosts, 2:list<string> destHosts),
> +}
> Index: src/tashi/agents/locality-server.py
> ===================================================================
> --- src/tashi/agents/locality-server.py (revision 0)
> +++ src/tashi/agents/locality-server.py (revision 0)
> @@ -0,0 +1,200 @@
> +#! /usr/bin/env python
> +
> +from socket import gethostname
> +import os
> +import threading
> +import time
> +import socket
> +
> +from tashi.services.ttypes import *
> +
> +from thrift import Thrift
> +from thrift.transport import TSocket
> +from thrift.transport import TTransport
> +from thrift.protocol import TBinaryProtocol
> +from thrift.server import TServer
> +
> +from tashi.services import clustermanagerservice
> +from tashi.util import getConfig, createClient
> +from tashi.parallel import *
> +
> +import tashi.services.layoutlocality.localityservice as  
> localityservice
> +
> +from numpy import *
> +from scipy import *
> +
> +cnames = {}
> +def cannonicalName(hn):
> +       try:
> +               if cnames.has_key(hn):
> +                       return cnames[hn]
> +               r = socket.gethostbyname_ex(hn)[0]
> +               cnames[hn] = r
> +               return r
> +       except:
> +               return hn
> +
> +def genMul(A, B, add, mult):
> +       '''generalized matrix multiplication'''
> +       C = zeros((shape(A)[0], shape(B)[1]))
> +       for i in range(shape(C)[0]):
> +               for j in range(shape(C)[1]):
> +                       C[i,j] = add(mult(A[i,:], B[:,j]))
> +       return C
> +
> +def addHost(graph, hostVals, host):
> +       if not graph.has_key(host):
> +               graph[host] = []
> +       if not hostVals.has_key(host):
> +               hostVals[host] = len(hostVals)
> +
> +def graphConnect(graph, h1, h2):
> +       if not h1 in graph[h2]:
> +               graph[h2].append(h1)
> +       if not h2 in graph[h1]:
> +               graph[h1].append(h2)
> +
> +def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
> +       f = open(fn)
> +       for line in f.readlines():
> +               line = line.split()
> +               if len(line) < 1:
> +                       continue
> +               server = cannonicalName(line[0].strip())
> +
> +               addHost(graph, hostVals, server)
> +               for peer in line[1:]:
> +                       peer = cannonicalName(peer.strip())
> +                       addHost(graph, hostVals, peer)
> +                       graphConnect(graph, server, peer)
> +       return graph, hostVals
> +
> +def graphFromTashi(client, transport, graph={}, hostVals={}):
> +       print 'getting graph'
> +       if not transport.isOpen():
> +               transport.open()
> +       hosts = client.getHosts()
> +       instances = client.getInstances()
> +       for instance in instances:
> +               host = [cannonicalName(h.name) for h in hosts if  
> h.id == instance.hostId]
> +               if len(host) <1 :
> +                       print 'cant find vm host'
> +                       continue
> +               host = host[0]
> +               print 'host is ', host
> +               addHost(graph, hostVals, host)
> +               print 'added host'
> +               vmhost = cannonicalName(instance.name)
> +               addHost(graph, hostVals, vmhost)
> +               print 'added vm'
> +               graphConnect(graph, host, vmhost)
> +               print 'connected'
> +       print 'returning from graphFromTashi'
> +       return graph, hostVals
> +
> +
> +
> +def graphToArray(graph, hostVals):
> +       a = zeros((len(hostVals), len(hostVals)))
> +       for host in graph.keys():
> +               if not hostVals.has_key(host):
> +                       continue
> +               a[hostVals[host], hostVals[host]] = 1
> +               for peer in graph[host]:
> +                       if not hostVals.has_key(peer):
> +                               continue
> +                       a[hostVals[host], hostVals[peer]] = 1
> +       a[a==0] = inf
> +       for i in range(shape(a)[0]):
> +               a[i,i]=0
> +       return a
> +
> +def shortestPaths(graphArray):
> +       a = graphArray
> +       for i in range(math.ceil(math.log(shape(a)[0],2))):
> +               a = genMul(a,a,min,plus)
> +       return a
> +
> +def plus(A, B):
> +       return A + B
> +
> +
> +def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
> +       a = zeros((len(sourceHosts), len(destHosts)))
> +       a[a==0] = inf
> +       for i in range(len(sourceHosts)):
> +               sh = cannonicalName(sourceHosts[i])
> +               shv = None
> +               if hostVals.has_key(sh):
> +                       shv = hostVals[sh]
> +               else:
> +                       print 'host not found', sh
> +                       continue
> +               for j in range(len(destHosts)):
> +                       dh = cannonicalName(destHosts[j])
> +                       dhv = None
> +                       if hostVals.has_key(dh):
> +                               dhv = hostVals[dh]
> +                       else:
> +                               print 'dest not found', dh
> +                               continue
> +                       print sh, dh, i,j, shv, dhv, array[shv, dhv]
> +                       a[i,j] = array[shv, dhv]
> +       return a
> +
> +
> +class LocalityService:
> +       def __init__(self):
> +               (config, configFiles) = getConfig(["Agent"])
> +               self.port = int(config.get('LocalityService', 'port'))
> +               print 'Locality service on port %i' % self.port
> +               self.processor = localityservice.Processor(self)
> +               self.transport = TSocket.TServerSocket(self.port)
> +               self.tfactory = TTransport.TBufferedTransportFactory()
> +               self.pfactory =  
> TBinaryProtocol.TBinaryProtocolFactory()
> +               self.server = TServer.TThreadedServer(self.processor,
> + 
>                                                                                     
    self 
> .transport,
> + 
>                                                                                     
    self 
> .tfactory,
> + 
>                                                                                     
    self 
> .pfactory)
> +
> +               self.hostVals =[]
> +               self.array = array([[]])
> +               self.rtime = 0
> +
> +
> +               self.fileName =  
> os.path.expanduser(config.get("LocalityService", "staticLayout"))
> +               (self.client, self.transport) = createClient(config)
> +
> +               self.server.serve()
> +
> +       @synchronizedmethod
> +       def refresh(self):
> +               if time.time() - self.rtime < 10:
> +                       return
> +               g, self.hostVals = graphFromFile(self.fileName)
> +               try:
> +                       g, self.hostVals =  
> graphFromTashi(self.client, self.transport, g, self.hostVals)
> +               except e:
> +                       print e
> +                       print 'could not get instance list from  
> cluster manager'
> +               print 'graph to array'
> +               a = graphToArray(g, self.hostVals)
> +               print 'calling shortest paths ', a.shape
> +               self.array = shortestPaths(a)
> +               print 'computed shortest paths'
> +               print self.array
> +               print self.hostVals
> +       @synchronizedmethod
> +       def getHopCountMatrix(self, sourceHosts, destHosts):
> +               self.refresh()
> +               print 'getting hop count matrix for', sourceHosts,  
> destHosts
> +               hcm =  getHopCountMatrix(sourceHosts, destHosts,  
> self.array, self.hostVals)
> +               print hcm
> +               return hcm
> +
> +
> +def main():
> +       ls = LocalityService()
> +
> +if __name__ == "__main__":
> +       main()
>
> Property changes on: src/tashi/agents/locality-server.py
> ___________________________________________________________________
> Added: svn:executable
>   + *
>
> Index: etc/TashiDefaults.cfg
> ===================================================================
> --- etc/TashiDefaults.cfg       (revision 767156)
> +++ etc/TashiDefaults.cfg       (working copy)
> @@ -82,6 +82,11 @@
> [Vfs]
> prefix = /var/tmp/
>
> +[LocalityService]
> +host = localityserver
> +port = 9884
> +staticLayout = /location/of/layout/file
> +
> # Client configuration
> [Client]
> clusterManagerHost = localhost
>
> -----Original Message-----
> From: James Cipar [mailto:jcipar@andrew.cmu.edu]
> Sent: Tuesday, April 14, 2009 3:37 PM
> To: tashi-dev@incubator.apache.org
> Subject: Re: Locality server
>
>
> Forgot to mention: requires numpy and scipy.  I'll work on getting rid
> of those requirements.
>
> On Apr 14, 2009, at 3:33 PM, Jim Cipar wrote:
>
>> I've been working on the Tashi locality server.  It exports a
>> service that will give the hop count between any set of nodes that
>> it knows about.  This can be used by, e.g., an HDFS Client that is
>> running in a VM, to determine which node to read data from.  This
>> case requires modifying the HDFS client code, but it is an example
>> of how something like this could be useful.
>>
>> The only call exported right now is:
>> list <list <float>> getHopCountMatrix(list <string>sources, list
>> <string> destinations)
>> This returns the complete hop count matrix between sources and
>> destinations, with positive infinity for any nodes that are either
>> unknown, or not connected.  It gathers information about static
>> network objects from a configuration file describing all adjacent
>> things (nodes, switches ...) in the network.  Each line of the file
>> has one source, and any number of destinations.  For instance, a
>> simple cluster with one top level switch, and two switches each with
>> two hosts might be configured like this:
>>
>> toplevel switch1 switch2
>> switch1 toplevel host1 host2
>> switch2 toplevel host3 host4
>>
>> All physical objects on the network are configured via the
>> configuration file.  Information about currently running Tashi VMs,
>> is automatically gathered from the Tashi Cluster Manager, and these
>> are added to the graph.
>>
>> Lastly, there is a script ( src/utils/getLocality.py ) that can be
>> used to contact this service.  The script takes two lines of input
>> on stdin: a whitespace separated list of sources, and a whitespace
>> separated list of destinations. It then prints the distance matrix
>> with a row for each source, and a column for each destination.  It
>> then waits for two more lines of input on stdin.  It will continue
>> until it receives an EOF on stdin.
>>
>> Here's the patch:
>>
>>
>>
>> Index: trunk/src/utils/getLocality.py
>> ===================================================================
>> --- trunk/src/utils/getLocality.py    (revision 0)
>> +++ trunk/src/utils/getLocality.py    (revision 0)
>> @@ -0,0 +1,53 @@
>> +#!/usr/bin/python
>> +
>> +import sys
>> +import os
>> +from os import system
>> +
>> +import tashi.services.layoutlocality.localityservice as
>> localityservice
>> +
>> +from thrift import Thrift
>> +from thrift.transport import TSocket
>> +from thrift.transport import TTransport
>> +from thrift.protocol import TBinaryProtocol
>> +
>> +from tashi.util import getConfig
>> +
>> +(config, configFiles) = getConfig(["Client"])
>> +host = config.get('LocalityService', 'host')
>> +port = int(config.get('LocalityService', 'port'))
>> +
>> +socket = TSocket.TSocket(host, port)
>> +transport = TTransport.TBufferedTransport(socket)
>> +protocol = TBinaryProtocol.TBinaryProtocol(transport)
>> +client = localityservice.Client(protocol)
>> +transport.open()
>> +
>> +
>> +while True:
>> +    line1 = "\n"
>> +    line2 = "\n"
>> +    while line1 != "":
>> +        line1 = sys.stdin.readline()
>> +        if line1 == "":
>> +            sys.exit(0)
>> +        if line1 != "\n":
>> +            break
>> +    line1 = line1.strip()
>> +    while line2 != "":
>> +        line2 = sys.stdin.readline()
>> +        if line2 == "":
>> +            sys.exit(0)
>> +        if line2 != "\n":
>> +            break
>> +    line2 = line2.strip()
>> +
>> +    sources = line1.split(" ")
>> +    destinations = line2.split(" ")
>> +
>> +    mat = client.getHopCountMatrix(sources, destinations)
>> +    for r in mat:
>> +        for c in r:
>> +            print '%f\t'%c,
>> +        print '\n',
>> +    print '\n',
>>
>> Property changes on: trunk/src/utils/getLocality.py
>> ___________________________________________________________________
>> Added: svn:executable
>>  + *
>>
>> Index: trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
>> ===================================================================
>> --- trunk/src/tashi/nodemanager/vmcontrol/xenpv.py    (revision  
>> 764908)
>> +++ trunk/src/tashi/nodemanager/vmcontrol/xenpv.py    (working copy)
>> @@ -156,9 +156,9 @@
>>                          image, macAddr, memory, cores):
>>              fn = os.path.join("/tmp", vmName)
>>              cfgstr = """
>> -# kernel="/boot/vmlinuz-2.6.24-17-xen"
>> -# ramdisk="/boot/initrd.img-2.6.24-17-xen"
>> -bootloader="/usr/bin/pygrub"
>> +kernel="/boot/vmlinuz-2.6.24-17-xen"
>> +ramdisk="/boot/initrd.img-2.6.24-17-xen"
>> +# bootloader="/usr/bin/pygrub"
>> disk=['tap:qcow:%s,xvda1,w']
>> vif = [ 'mac=%s' ]
>> # vif = ['ip=172.19.158.1']
>> Index: trunk/src/tashi/thrift/build.py
>> ===================================================================
>> --- trunk/src/tashi/thrift/build.py   (revision 764908)
>> +++ trunk/src/tashi/thrift/build.py   (working copy)
>> @@ -35,6 +35,7 @@
>>              print 'Removing \'../messaging/messagingthrift\'  
>> directory...'
>>              shutil.rmtree('../messaging/messagingthrift')
>>
>> +
>>      print 'Generating Python code for \'services.thrift\'...'
>>      os.system('thrift --gen py:new_style services.thrift')
>>
>> @@ -44,7 +45,12 @@
>>      print 'Generatign Python code for \'messagingthrift\'...'
>>      os.system('rm -rf gen-py')
>>      os.system('thrift --gen py messagingthrift.thrift')
>> -
>> +
>> +        print 'Generating Python code for \'layoutlocality.thrift
>> \'...'
>> +        os.system('thrift --gen py:new_style layoutlocality.thrift')
>> +        print 'Copying generated code to \'tashi.services\'
>> package...'
>> +        shutil.copytree('gen-py/layoutlocality', '../services/
>> layoutlocality')
>> +
>>      print 'Copying generated code to  
>> \'tashi.messaging.messagingthrift
>> \' package...'
>>      shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
>>                      os.path.join('..', 'messaging',  
>> 'messagingthrift'))
>> Index: trunk/src/tashi/thrift/layoutlocality.thrift
>> ===================================================================
>> --- trunk/src/tashi/thrift/layoutlocality.thrift      (revision 0)
>> +++ trunk/src/tashi/thrift/layoutlocality.thrift      (revision 0)
>> @@ -0,0 +1,26 @@
>> +struct BlockLocation {
>> +  list<string> hosts,           // hostnames of data nodes
>> +  list<i32> ports,              // ports for data nodes
>> +  list<string> names,           // hostname:port of data nodes
>> +  i64 blocknum,
>> +  i64 offset,
>> +  i64 length
>> +}
>> +
>> +struct Pathname {
>> +  string pathname
>> +}
>> +
>> +exception FileNotFoundException {
>> +  string message
>> +}
>> +
>> +service layoutservice {
>> +
>> +  list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64
>> offset, 3:i64 length)
>> +                       throws (1:FileNotFoundException ouch),
>> +}
>> +
>> +service localityservice {
>> +  list <list<double>> getHopCountMatrix(1:list<string> sourceHosts,
>> 2:list<string> destHosts),
>> +}
>> \ No newline at end of file
>> Index: trunk/src/tashi/agents/locality-server.py
>> ===================================================================
>> --- trunk/src/tashi/agents/locality-server.py (revision 0)
>> +++ trunk/src/tashi/agents/locality-server.py (revision 0)
>> @@ -0,0 +1,213 @@
>> +from socket import gethostname
>> +import os
>> +import threading
>> +import time
>> +import socket
>> +
>> +from tashi.services.ttypes import *
>> +
>> +from thrift import Thrift
>> +from thrift.transport import TSocket
>> +from thrift.transport import TTransport
>> +from thrift.protocol import TBinaryProtocol
>> +from thrift.server import TServer
>> +
>> +from tashi.services import clustermanagerservice
>> +from tashi.util import getConfig
>> +from tashi.parallel import *
>> +
>> +import tashi.services.layoutlocality.localityservice as
>> localityservice
>> +
>> +from numpy import *
>> +from scipy import *
>> +
>> +cnames = {}
>> +def cannonicalName(hn):
>> +    try:
>> +        if cnames.has_key(hn):
>> +            return cnames[hn]
>> +        r = socket.gethostbyname_ex(hn)[0]
>> +        cnames[hn] = r
>> +        return r
>> +    except:
>> +        return hn
>> +
>> +
>> +def createClient(config):
>> +    host = config.get('Client', 'clusterManagerHost')
>> +    port = config.get('Client', 'clusterManagerPort')
>> +    print host, port
>> +    timeout = float(config.get('Client', 'clusterManagerTimeout'))
>> * 1000.0
>> +
>> +    socket = TSocket.TSocket(host, int(port))
>> +    socket.setTimeout(timeout)
>> +    transport = TTransport.TBufferedTransport(socket)
>> +    protocol = TBinaryProtocol.TBinaryProtocol(transport)
>> +    client = clustermanagerservice.Client(protocol)
>> +    try:
>> +        transport.open()
>> +    except:
>> +        pass
>> +    return (client, transport)
>> +
>> +
>> +def genMul(A, B, add, mult):
>> +    '''generalized matrix multiplication'''
>> +    C = zeros((shape(A)[0], shape(B)[1]))
>> +    for i in range(shape(C)[0]):
>> +        for j in range(shape(C)[1]):
>> +            C[i,j] = add(mult(A[i,:], B[:,j]))
>> +    return C
>> +
>> +def addHost(graph, hostVals, host):
>> +    if not graph.has_key(host):
>> +        graph[host] = []
>> +    if not hostVals.has_key(host):
>> +        hostVals[host] = len(hostVals)
>> +
>> +def graphConnect(graph, h1, h2):
>> +    if not h1 in graph[h2]:
>> +        graph[h2].append(h1)
>> +    if not h2 in graph[h1]:
>> +        graph[h1].append(h2)
>> +
>> +def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
>> +    f = open(fn)
>> +    for line in f.readlines():
>> +        line = line.split()
>> +        if len(line) < 1:
>> +            continue
>> +        server = cannonicalName(line[0].strip())
>> +
>> +        addHost(graph, hostVals, server)
>> +        for peer in line[1:]:
>> +            peer = cannonicalName(peer.strip())
>> +            addHost(graph, hostVals, peer)
>> +            graphConnect(graph, server, peer)
>> +    return graph, hostVals
>> +
>> +def graphFromTashi(client, transport, graph={}, hostVals={}):
>> +    print 'getting graph'
>> +    if not transport.isOpen():
>> +        transport.open()
>> +    hosts = client.getHosts()
>> +    instances = client.getInstances()
>> +    for instance in instances:
>> +        host = [cannonicalName(h.name) for h in hosts if h.id ==
>> instance.hostId]
>> +        if len(host) <1 :
>> +            print 'cant find vm host'
>> +            continue
>> +        host = host[0]
>> +        print 'host is ', host
>> +        addHost(graph, hostVals, host)
>> +        print 'added host'
>> +        vmhost = cannonicalName(instance.name)
>> +        addHost(graph, hostVals, vmhost)
>> +        print 'added vm'
>> +        graphConnect(graph, host, vmhost)
>> +        print 'connected'
>> +    print 'returning from graphFromTashi'
>> +    return graph, hostVals
>> +
>> +
>> +
>> +def graphToArray(graph, hostVals):
>> +    a = zeros((len(hostVals), len(hostVals)))
>> +    for host in graph.keys():
>> +        if not hostVals.has_key(host):
>> +            continue
>> +        a[hostVals[host], hostVals[host]] = 1
>> +        for peer in graph[host]:
>> +            if not hostVals.has_key(peer):
>> +                continue
>> +            a[hostVals[host], hostVals[peer]] = 1
>> +    a[a==0] = inf
>> +    for i in range(shape(a)[0]):
>> +        a[i,i]=0
>> +    return a
>> +
>> +def shortestPaths(graphArray):
>> +    a = graphArray
>> +    for i in range(math.ceil(math.log(shape(a)[0],2))):
>> +        a = genMul(a,a,min,plus)
>> +    return a
>> +
>> +def plus(A, B):
>> +    return A + B
>> +
>> +
>> +def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
>> +    a = zeros((len(sourceHosts), len(destHosts)))
>> +    a[a==0] = inf
>> +    for i in range(len(sourceHosts)):
>> +        sh = cannonicalName(sourceHosts[i])
>> +        shv = None
>> +        if hostVals.has_key(sh):
>> +            shv = hostVals[sh]
>> +        else:
>> +            print 'host not found', sh
>> +            continue
>> +        for j in range(len(destHosts)):
>> +            dh = cannonicalName(destHosts[j])
>> +            dhv = None
>> +            if hostVals.has_key(dh):
>> +                dhv = hostVals[dh]
>> +            else:
>> +                print 'dest not found', dh
>> +                continue
>> +            print sh, dh, i,j, shv, dhv, array[shv, dhv]
>> +            a[i,j] = array[shv, dhv]
>> +    return a
>> +
>> +
>> +class LocalityService:
>> +    def __init__(self):
>> +        (config, configFiles) = getConfig(["Agent"])
>> +        self.port = int(config.get('LocalityService', 'port'))
>> +        print 'Locality service on port %i' % self.port
>> +        self.processor = localityservice.Processor(self)
>> +        self.transport = TSocket.TServerSocket(self.port)
>> +        self.tfactory = TTransport.TBufferedTransportFactory()
>> +        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
>> +        self.server = TServer.TThreadedServer(self.processor,
>> +                                              self.transport,
>> +                                              self.tfactory,
>> +                                              self.pfactory)
>> +
>> +        self.hostVals =[]
>> +        self.array = array([[]])
>> +        self.rtime = 0
>> +
>> +
>> +        self.fileName =
>> os.path.expanduser(config.get("LocalityService", "staticLayout"))
>> +        (self.client, self.transport) = createClient(config)
>> +
>> +        self.server.serve()
>> +
>> +    @synchronizedmethod
>> +    def refresh(self):
>> +        if time.time() - self.rtime < 10:
>> +            return
>> +        g, self.hostVals = graphFromFile(self.fileName)
>> +        try:
>> +            g, self.hostVals = graphFromTashi(self.client,
>> self.transport, g, self.hostVals)
>> +        except e:
>> +            print e
>> +            print 'could not get instance list from cluster manager'
>> +        print 'graph to array'
>> +        a = graphToArray(g, self.hostVals)
>> +        print 'calling shortest paths ', a.shape
>> +        self.array = shortestPaths(a)
>> +        print 'computed shortest paths'
>> +        print self.array
>> +        print self.hostVals
>> +    @synchronizedmethod
>> +    def getHopCountMatrix(self, sourceHosts, destHosts):
>> +        self.refresh()
>> +        print 'getting hop count matrix for', sourceHosts, destHosts
>> +        hcm =  getHopCountMatrix(sourceHosts, destHosts,
>> self.array, self.hostVals)
>> +        print hcm
>> +        return hcm
>> +
>> +
>> +ls = LocalityService()
>> Index: trunk/etc/TashiDefaults.cfg
>> ===================================================================
>> --- trunk/etc/TashiDefaults.cfg       (revision 764908)
>> +++ trunk/etc/TashiDefaults.cfg       (working copy)
>> @@ -82,6 +82,11 @@
>> [Vfs]
>> prefix = /var/tmp/
>>
>> +[LayoutService]
>> +host = layoutserver
>> +port = 9884
>> +staticLayout = /location/of/layout/file
>> +
>> # Client configuration
>> [Client]
>> clusterManagerHost = localhost
>>
>
>


Mime
View raw message