incubator-tashi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Cipar <jci...@andrew.cmu.edu>
Subject Re: Locality server
Date Tue, 14 Apr 2009 19:36:38 GMT

Forgot to mention: requires numpy and scipy.  I'll work on getting rid  
of those requirements.

On Apr 14, 2009, at 3:33 PM, Jim Cipar wrote:

> I've been working on the Tashi locality server.  It exports a  
> service that will give the hop count between any set of nodes that  
> it knows about.  This can be used by, e.g., an HDFS Client that is  
> running in a VM, to determine which node to read data from.  This  
> case requires modifying the HDFS client code, but it is an example  
> of how something like this could be useful.
>
> The only call exported right now is:
> list <list <float>> getHopCountMatrix(list <string>sources, list  
> <string> destinations)
> This returns the complete hop count matrix between sources and  
> destinations, with positive infinity for any nodes that are either  
> unknown, or not connected.  It gathers information about static  
> network objects from a configuration file describing all adjacent  
> things (nodes, switches ...) in the network.  Each line of the file  
> has one source, and any number of destinations.  For instance, a  
> simple cluster with one top level switch, and two switches each with  
> two hosts might be configured like this:
>
> toplevel switch1 switch2
> switch1 toplevel host1 host2
> switch2 toplevel host3 host4
>
> All physical objects on the network are configured via the  
> configuration file.  Information about currently running Tashi VMs,  
> is automatically gathered from the Tashi Cluster Manager, and these  
> are added to the graph.
>
> Lastly, there is a script ( src/utils/getLocality.py ) that can be  
> used to contact this service.  The script takes two lines of input  
> on stdin: a whitespace separated list of sources, and a whitespace  
> separated list of destinations. It then prints the distance matrix  
> with a row for each source, and a column for each destination.  It  
> then waits for two more lines of input on stdin.  It will continue  
> until it receives an EOF on stdin.
>
> Here's the patch:
>
>
>
> Index: trunk/src/utils/getLocality.py
> ===================================================================
> --- trunk/src/utils/getLocality.py	(revision 0)
> +++ trunk/src/utils/getLocality.py	(revision 0)
> @@ -0,0 +1,53 @@
> +#!/usr/bin/python
> +
> +import sys
> +import os
> +from os import system
> +
> +import tashi.services.layoutlocality.localityservice as  
> localityservice
> +
> +from thrift import Thrift
> +from thrift.transport import TSocket
> +from thrift.transport import TTransport
> +from thrift.protocol import TBinaryProtocol
> +
> +from tashi.util import getConfig
> +
> +(config, configFiles) = getConfig(["Client"])
> +host = config.get('LocalityService', 'host')
> +port = int(config.get('LocalityService', 'port'))
> +
> +socket = TSocket.TSocket(host, port)
> +transport = TTransport.TBufferedTransport(socket)
> +protocol = TBinaryProtocol.TBinaryProtocol(transport)
> +client = localityservice.Client(protocol)
> +transport.open()
> +
> +
> +while True:
> +    line1 = "\n"
> +    line2 = "\n"
> +    while line1 != "":
> +        line1 = sys.stdin.readline()
> +        if line1 == "":
> +            sys.exit(0)
> +        if line1 != "\n":
> +            break
> +    line1 = line1.strip()
> +    while line2 != "":
> +        line2 = sys.stdin.readline()
> +        if line2 == "":
> +            sys.exit(0)
> +        if line2 != "\n":
> +            break
> +    line2 = line2.strip()
> +
> +    sources = line1.split(" ")
> +    destinations = line2.split(" ")
> +
> +    mat = client.getHopCountMatrix(sources, destinations)
> +    for r in mat:
> +        for c in r:
> +            print '%f\t'%c,
> +        print '\n',
> +    print '\n',
>
> Property changes on: trunk/src/utils/getLocality.py
> ___________________________________________________________________
> Added: svn:executable
>   + *
>
> Index: trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
> ===================================================================
> --- trunk/src/tashi/nodemanager/vmcontrol/xenpv.py	(revision 764908)
> +++ trunk/src/tashi/nodemanager/vmcontrol/xenpv.py	(working copy)
> @@ -156,9 +156,9 @@
> 			    image, macAddr, memory, cores):
> 		fn = os.path.join("/tmp", vmName)
> 		cfgstr = """
> -# kernel="/boot/vmlinuz-2.6.24-17-xen"
> -# ramdisk="/boot/initrd.img-2.6.24-17-xen"
> -bootloader="/usr/bin/pygrub"
> +kernel="/boot/vmlinuz-2.6.24-17-xen"
> +ramdisk="/boot/initrd.img-2.6.24-17-xen"
> +# bootloader="/usr/bin/pygrub"
> disk=['tap:qcow:%s,xvda1,w']
> vif = [ 'mac=%s' ]
> # vif = ['ip=172.19.158.1']
> Index: trunk/src/tashi/thrift/build.py
> ===================================================================
> --- trunk/src/tashi/thrift/build.py	(revision 764908)
> +++ trunk/src/tashi/thrift/build.py	(working copy)
> @@ -35,6 +35,7 @@
> 		print 'Removing \'../messaging/messagingthrift\' directory...'
> 		shutil.rmtree('../messaging/messagingthrift')
> 	
> +	
> 	print 'Generating Python code for \'services.thrift\'...'
> 	os.system('thrift --gen py:new_style services.thrift')
> 	
> @@ -44,7 +45,12 @@
> 	print 'Generatign Python code for \'messagingthrift\'...'
> 	os.system('rm -rf gen-py')
> 	os.system('thrift --gen py messagingthrift.thrift')
> -	
> +
> +        print 'Generating Python code for \'layoutlocality.thrift 
> \'...'
> +        os.system('thrift --gen py:new_style layoutlocality.thrift')
> +        print 'Copying generated code to \'tashi.services\'  
> package...'
> +        shutil.copytree('gen-py/layoutlocality', '../services/ 
> layoutlocality')
> +
> 	print 'Copying generated code to \'tashi.messaging.messagingthrift 
> \' package...'
> 	shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
> 			os.path.join('..', 'messaging', 'messagingthrift'))
> Index: trunk/src/tashi/thrift/layoutlocality.thrift
> ===================================================================
> --- trunk/src/tashi/thrift/layoutlocality.thrift	(revision 0)
> +++ trunk/src/tashi/thrift/layoutlocality.thrift	(revision 0)
> @@ -0,0 +1,26 @@
> +struct BlockLocation {
> +  list<string> hosts,           // hostnames of data nodes
> +  list<i32> ports,              // ports for data nodes
> +  list<string> names,           // hostname:port of data nodes
> +  i64 blocknum,
> +  i64 offset,
> +  i64 length
> +}
> +
> +struct Pathname {
> +  string pathname
> +}
> +
> +exception FileNotFoundException {
> +  string message
> +}
> +
> +service layoutservice {
> +
> +  list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64  
> offset, 3:i64 length)
> +                       throws (1:FileNotFoundException ouch),
> +}
> +
> +service localityservice {
> +  list <list<double>> getHopCountMatrix(1:list<string> sourceHosts,
 
> 2:list<string> destHosts),
> +}
> \ No newline at end of file
> Index: trunk/src/tashi/agents/locality-server.py
> ===================================================================
> --- trunk/src/tashi/agents/locality-server.py	(revision 0)
> +++ trunk/src/tashi/agents/locality-server.py	(revision 0)
> @@ -0,0 +1,213 @@
> +from socket import gethostname
> +import os
> +import threading
> +import time
> +import socket
> +
> +from tashi.services.ttypes import *
> +
> +from thrift import Thrift
> +from thrift.transport import TSocket
> +from thrift.transport import TTransport
> +from thrift.protocol import TBinaryProtocol
> +from thrift.server import TServer
> +
> +from tashi.services import clustermanagerservice
> +from tashi.util import getConfig
> +from tashi.parallel import *
> +
> +import tashi.services.layoutlocality.localityservice as  
> localityservice
> +
> +from numpy import *
> +from scipy import *
> +
> +cnames = {}
> +def cannonicalName(hn):
> +    try:
> +        if cnames.has_key(hn):
> +            return cnames[hn]
> +        r = socket.gethostbyname_ex(hn)[0]
> +        cnames[hn] = r
> +        return r
> +    except:
> +        return hn
> +
> +
> +def createClient(config):
> +    host = config.get('Client', 'clusterManagerHost')
> +    port = config.get('Client', 'clusterManagerPort')
> +    print host, port
> +    timeout = float(config.get('Client', 'clusterManagerTimeout'))  
> * 1000.0
> +
> +    socket = TSocket.TSocket(host, int(port))
> +    socket.setTimeout(timeout)
> +    transport = TTransport.TBufferedTransport(socket)
> +    protocol = TBinaryProtocol.TBinaryProtocol(transport)
> +    client = clustermanagerservice.Client(protocol)
> +    try:
> +        transport.open()
> +    except:
> +        pass
> +    return (client, transport)
> +
> +
> +def genMul(A, B, add, mult):
> +    '''generalized matrix multiplication'''
> +    C = zeros((shape(A)[0], shape(B)[1]))
> +    for i in range(shape(C)[0]):
> +        for j in range(shape(C)[1]):
> +            C[i,j] = add(mult(A[i,:], B[:,j]))
> +    return C
> +
> +def addHost(graph, hostVals, host):
> +    if not graph.has_key(host):
> +        graph[host] = []
> +    if not hostVals.has_key(host):
> +        hostVals[host] = len(hostVals)
> +
> +def graphConnect(graph, h1, h2):
> +    if not h1 in graph[h2]:
> +        graph[h2].append(h1)
> +    if not h2 in graph[h1]:
> +        graph[h1].append(h2)
> +
> +def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
> +    f = open(fn)
> +    for line in f.readlines():
> +        line = line.split()
> +        if len(line) < 1:
> +            continue
> +        server = cannonicalName(line[0].strip())
> +
> +        addHost(graph, hostVals, server)
> +        for peer in line[1:]:
> +            peer = cannonicalName(peer.strip())
> +            addHost(graph, hostVals, peer)
> +            graphConnect(graph, server, peer)
> +    return graph, hostVals
> +
> +def graphFromTashi(client, transport, graph={}, hostVals={}):
> +    print 'getting graph'
> +    if not transport.isOpen():
> +        transport.open()
> +    hosts = client.getHosts()
> +    instances = client.getInstances()
> +    for instance in instances:
> +        host = [cannonicalName(h.name) for h in hosts if h.id ==  
> instance.hostId]
> +        if len(host) <1 :
> +            print 'cant find vm host'
> +            continue
> +        host = host[0]
> +        print 'host is ', host
> +        addHost(graph, hostVals, host)
> +        print 'added host'
> +        vmhost = cannonicalName(instance.name)
> +        addHost(graph, hostVals, vmhost)
> +        print 'added vm'
> +        graphConnect(graph, host, vmhost)
> +        print 'connected'
> +    print 'returning from graphFromTashi'
> +    return graph, hostVals
> +
> +
> +
> +def graphToArray(graph, hostVals):
> +    a = zeros((len(hostVals), len(hostVals)))
> +    for host in graph.keys():
> +        if not hostVals.has_key(host):
> +            continue
> +        a[hostVals[host], hostVals[host]] = 1
> +        for peer in graph[host]:
> +            if not hostVals.has_key(peer):
> +                continue
> +            a[hostVals[host], hostVals[peer]] = 1
> +    a[a==0] = inf
> +    for i in range(shape(a)[0]):
> +        a[i,i]=0
> +    return a
> +
> +def shortestPaths(graphArray):
> +    a = graphArray
> +    for i in range(math.ceil(math.log(shape(a)[0],2))):
> +        a = genMul(a,a,min,plus)
> +    return a
> +
> +def plus(A, B):
> +    return A + B
> +
> +
> +def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
> +    a = zeros((len(sourceHosts), len(destHosts)))
> +    a[a==0] = inf
> +    for i in range(len(sourceHosts)):
> +        sh = cannonicalName(sourceHosts[i])
> +        shv = None
> +        if hostVals.has_key(sh):
> +            shv = hostVals[sh]
> +        else:
> +            print 'host not found', sh
> +            continue
> +        for j in range(len(destHosts)):
> +            dh = cannonicalName(destHosts[j])
> +            dhv = None
> +            if hostVals.has_key(dh):
> +                dhv = hostVals[dh]
> +            else:
> +                print 'dest not found', dh
> +                continue
> +            print sh, dh, i,j, shv, dhv, array[shv, dhv]
> +            a[i,j] = array[shv, dhv]
> +    return a
> +
> +
> +class LocalityService:
> +    def __init__(self):
> +        (config, configFiles) = getConfig(["Agent"])
> +        self.port = int(config.get('LocalityService', 'port'))
> +        print 'Locality service on port %i' % self.port
> +        self.processor = localityservice.Processor(self)
> +        self.transport = TSocket.TServerSocket(self.port)
> +        self.tfactory = TTransport.TBufferedTransportFactory()
> +        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
> +        self.server = TServer.TThreadedServer(self.processor,
> +                                              self.transport,
> +                                              self.tfactory,
> +                                              self.pfactory)
> +
> +        self.hostVals =[]
> +        self.array = array([[]])
> +        self.rtime = 0
> +
> +
> +        self.fileName =  
> os.path.expanduser(config.get("LocalityService", "staticLayout"))
> +        (self.client, self.transport) = createClient(config)
> +
> +        self.server.serve()
> +
> +    @synchronizedmethod
> +    def refresh(self):
> +        if time.time() - self.rtime < 10:
> +            return
> +        g, self.hostVals = graphFromFile(self.fileName)
> +        try:
> +            g, self.hostVals = graphFromTashi(self.client,  
> self.transport, g, self.hostVals)
> +        except e:
> +            print e
> +            print 'could not get instance list from cluster manager'
> +        print 'graph to array'
> +        a = graphToArray(g, self.hostVals)
> +        print 'calling shortest paths ', a.shape
> +        self.array = shortestPaths(a)
> +        print 'computed shortest paths'
> +        print self.array
> +        print self.hostVals
> +    @synchronizedmethod
> +    def getHopCountMatrix(self, sourceHosts, destHosts):
> +        self.refresh()
> +        print 'getting hop count matrix for', sourceHosts, destHosts
> +        hcm =  getHopCountMatrix(sourceHosts, destHosts,  
> self.array, self.hostVals)
> +        print hcm
> +        return hcm
> +
> +
> +ls = LocalityService()
> Index: trunk/etc/TashiDefaults.cfg
> ===================================================================
> --- trunk/etc/TashiDefaults.cfg	(revision 764908)
> +++ trunk/etc/TashiDefaults.cfg	(working copy)
> @@ -82,6 +82,11 @@
> [Vfs]
> prefix = /var/tmp/
>
> +[LayoutService]
> +host = layoutserver
> +port = 9884
> +staticLayout = /location/of/layout/file
> +
> # Client configuration
> [Client]
> clusterManagerHost = localhost
>


Mime
View raw message