incubator-tashi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Cipar <jci...@cs.cmu.edu>
Subject Locality server
Date Tue, 14 Apr 2009 19:33:50 GMT
I've been working on the Tashi locality server.  It exports a service  
that will give the hop count between any set of nodes that it knows  
about.  This can be used by, e.g., an HDFS Client that is running in a  
VM, to determine which node to read data from.  This case requires  
modifying the HDFS client code, but it is an example of how something  
like this could be useful.

The only call exported right now is:
  list <list <float>> getHopCountMatrix(list <string>sources, list  
<string> destinations)
This returns the complete hop count matrix between sources and  
destinations, with positive infinity for any nodes that are either  
unknown, or not connected.  It gathers information about static  
network objects from a configuration file describing all adjacent  
things (nodes, switches ...) in the network.  Each line of the file  
has one source, and any number of destinations.  For instance, a  
simple cluster with one top level switch, and two switches each with  
two hosts might be configured like this:

toplevel switch1 switch2
switch1 toplevel host1 host2
switch2 toplevel host3 host4

All physical objects on the network are configured via the  
configuration file.  Information about currently running Tashi VMs, is  
automatically gathered from the Tashi Cluster Manager, and these are  
added to the graph.

Lastly, there is a script ( src/utils/getLocality.py ) that can be  
used to contact this service.  The script takes two lines of input on  
stdin: a whitespace separated list of sources, and a whitespace  
separated list of destinations. It then prints the distance matrix  
with a row for each source, and a column for each destination.  It  
then waits for two more lines of input on stdin.  It will continue  
until it receives an EOF on stdin.

Here's the patch:



Index: trunk/src/utils/getLocality.py
===================================================================
--- trunk/src/utils/getLocality.py	(revision 0)
+++ trunk/src/utils/getLocality.py	(revision 0)
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+
+import sys
+import os
+from os import system
+
+import tashi.services.layoutlocality.localityservice as localityservice
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+
+from tashi.util import getConfig
+
+(config, configFiles) = getConfig(["Client"])
+host = config.get('LocalityService', 'host')
+port = int(config.get('LocalityService', 'port'))
+
+socket = TSocket.TSocket(host, port)
+transport = TTransport.TBufferedTransport(socket)
+protocol = TBinaryProtocol.TBinaryProtocol(transport)
+client = localityservice.Client(protocol)
+transport.open()
+
+
+while True:
+    line1 = "\n"
+    line2 = "\n"
+    while line1 != "":
+        line1 = sys.stdin.readline()
+        if line1 == "":
+            sys.exit(0)
+        if line1 != "\n":
+            break
+    line1 = line1.strip()
+    while line2 != "":
+        line2 = sys.stdin.readline()
+        if line2 == "":
+            sys.exit(0)
+        if line2 != "\n":
+            break
+    line2 = line2.strip()
+
+    sources = line1.split(" ")
+    destinations = line2.split(" ")
+
+    mat = client.getHopCountMatrix(sources, destinations)
+    for r in mat:
+        for c in r:
+            print '%f\t'%c,
+        print '\n',
+    print '\n',

Property changes on: trunk/src/utils/getLocality.py
___________________________________________________________________
Added: svn:executable
    + *

Index: trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
===================================================================
--- trunk/src/tashi/nodemanager/vmcontrol/xenpv.py	(revision 764908)
+++ trunk/src/tashi/nodemanager/vmcontrol/xenpv.py	(working copy)
@@ -156,9 +156,9 @@
  			    image, macAddr, memory, cores):
  		fn = os.path.join("/tmp", vmName)
  		cfgstr = """
-# kernel="/boot/vmlinuz-2.6.24-17-xen"
-# ramdisk="/boot/initrd.img-2.6.24-17-xen"
-bootloader="/usr/bin/pygrub"
+kernel="/boot/vmlinuz-2.6.24-17-xen"
+ramdisk="/boot/initrd.img-2.6.24-17-xen"
+# bootloader="/usr/bin/pygrub"
  disk=['tap:qcow:%s,xvda1,w']
  vif = [ 'mac=%s' ]
  # vif = ['ip=172.19.158.1']
Index: trunk/src/tashi/thrift/build.py
===================================================================
--- trunk/src/tashi/thrift/build.py	(revision 764908)
+++ trunk/src/tashi/thrift/build.py	(working copy)
@@ -35,6 +35,7 @@
  		print 'Removing \'../messaging/messagingthrift\' directory...'
  		shutil.rmtree('../messaging/messagingthrift')
  	
+	
  	print 'Generating Python code for \'services.thrift\'...'
  	os.system('thrift --gen py:new_style services.thrift')
  	
@@ -44,7 +45,12 @@
  	print 'Generatign Python code for \'messagingthrift\'...'
  	os.system('rm -rf gen-py')
  	os.system('thrift --gen py messagingthrift.thrift')
-	
+
+        print 'Generating Python code for \'layoutlocality.thrift\'...'
+        os.system('thrift --gen py:new_style layoutlocality.thrift')
+        print 'Copying generated code to \'tashi.services\' package...'
+        shutil.copytree('gen-py/layoutlocality', '../services/ 
layoutlocality')
+
  	print 'Copying generated code to \'tashi.messaging.messagingthrift 
\' package...'
  	shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
  			os.path.join('..', 'messaging', 'messagingthrift'))
Index: trunk/src/tashi/thrift/layoutlocality.thrift
===================================================================
--- trunk/src/tashi/thrift/layoutlocality.thrift	(revision 0)
+++ trunk/src/tashi/thrift/layoutlocality.thrift	(revision 0)
@@ -0,0 +1,26 @@
+struct BlockLocation {
+  list<string> hosts,           // hostnames of data nodes
+  list<i32> ports,              // ports for data nodes
+  list<string> names,           // hostname:port of data nodes
+  i64 blocknum,
+  i64 offset,
+  i64 length
+}
+
+struct Pathname {
+  string pathname
+}
+
+exception FileNotFoundException {
+  string message
+}
+
+service layoutservice {
+
+  list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64  
offset, 3:i64 length)
+                       throws (1:FileNotFoundException ouch),
+}
+
+service localityservice {
+  list <list<double>> getHopCountMatrix(1:list<string> sourceHosts,  
2:list<string> destHosts),
+}
\ No newline at end of file
Index: trunk/src/tashi/agents/locality-server.py
===================================================================
--- trunk/src/tashi/agents/locality-server.py	(revision 0)
+++ trunk/src/tashi/agents/locality-server.py	(revision 0)
@@ -0,0 +1,213 @@
+from socket import gethostname
+import os
+import threading
+import time
+import socket
+
+from tashi.services.ttypes import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tashi.services import clustermanagerservice
+from tashi.util import getConfig
+from tashi.parallel import *
+
+import tashi.services.layoutlocality.localityservice as localityservice
+
+from numpy import *
+from scipy import *
+
+cnames = {}
+def cannonicalName(hn):
+    try:
+        if cnames.has_key(hn):
+            return cnames[hn]
+        r = socket.gethostbyname_ex(hn)[0]
+        cnames[hn] = r
+        return r
+    except:
+        return hn
+
+
+def createClient(config):
+    host = config.get('Client', 'clusterManagerHost')
+    port = config.get('Client', 'clusterManagerPort')
+    print host, port
+    timeout = float(config.get('Client', 'clusterManagerTimeout')) *  
1000.0
+
+    socket = TSocket.TSocket(host, int(port))
+    socket.setTimeout(timeout)
+    transport = TTransport.TBufferedTransport(socket)
+    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+    client = clustermanagerservice.Client(protocol)
+    try:
+        transport.open()
+    except:
+        pass
+    return (client, transport)
+
+
+def genMul(A, B, add, mult):
+    '''generalized matrix multiplication'''
+    C = zeros((shape(A)[0], shape(B)[1]))
+    for i in range(shape(C)[0]):
+        for j in range(shape(C)[1]):
+            C[i,j] = add(mult(A[i,:], B[:,j]))
+    return C
+
+def addHost(graph, hostVals, host):
+    if not graph.has_key(host):
+        graph[host] = []
+    if not hostVals.has_key(host):
+        hostVals[host] = len(hostVals)
+
+def graphConnect(graph, h1, h2):
+    if not h1 in graph[h2]:
+        graph[h2].append(h1)
+    if not h2 in graph[h1]:
+        graph[h1].append(h2)
+
+def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
+    f = open(fn)
+    for line in f.readlines():
+        line = line.split()
+        if len(line) < 1:
+            continue
+        server = cannonicalName(line[0].strip())
+
+        addHost(graph, hostVals, server)
+        for peer in line[1:]:
+            peer = cannonicalName(peer.strip())
+            addHost(graph, hostVals, peer)
+            graphConnect(graph, server, peer)
+    return graph, hostVals
+
+def graphFromTashi(client, transport, graph={}, hostVals={}):
+    print 'getting graph'
+    if not transport.isOpen():
+        transport.open()
+    hosts = client.getHosts()
+    instances = client.getInstances()
+    for instance in instances:
+        host = [cannonicalName(h.name) for h in hosts if h.id ==  
instance.hostId]
+        if len(host) <1 :
+            print 'cant find vm host'
+            continue
+        host = host[0]
+        print 'host is ', host
+        addHost(graph, hostVals, host)
+        print 'added host'
+        vmhost = cannonicalName(instance.name)
+        addHost(graph, hostVals, vmhost)
+        print 'added vm'
+        graphConnect(graph, host, vmhost)
+        print 'connected'
+    print 'returning from graphFromTashi'
+    return graph, hostVals
+
+
+
+def graphToArray(graph, hostVals):
+    a = zeros((len(hostVals), len(hostVals)))
+    for host in graph.keys():
+        if not hostVals.has_key(host):
+            continue
+        a[hostVals[host], hostVals[host]] = 1
+        for peer in graph[host]:
+            if not hostVals.has_key(peer):
+                continue
+            a[hostVals[host], hostVals[peer]] = 1
+    a[a==0] = inf
+    for i in range(shape(a)[0]):
+        a[i,i]=0
+    return a
+
+def shortestPaths(graphArray):
+    a = graphArray
+    for i in range(math.ceil(math.log(shape(a)[0],2))):
+        a = genMul(a,a,min,plus)
+    return a
+
+def plus(A, B):
+    return A + B
+
+
+def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
+    a = zeros((len(sourceHosts), len(destHosts)))
+    a[a==0] = inf
+    for i in range(len(sourceHosts)):
+        sh = cannonicalName(sourceHosts[i])
+        shv = None
+        if hostVals.has_key(sh):
+            shv = hostVals[sh]
+        else:
+            print 'host not found', sh
+            continue
+        for j in range(len(destHosts)):
+            dh = cannonicalName(destHosts[j])
+            dhv = None
+            if hostVals.has_key(dh):
+                dhv = hostVals[dh]
+            else:
+                print 'dest not found', dh
+                continue
+            print sh, dh, i,j, shv, dhv, array[shv, dhv]
+            a[i,j] = array[shv, dhv]
+    return a
+
+
+class LocalityService:
+    def __init__(self):
+        (config, configFiles) = getConfig(["Agent"])
+        self.port = int(config.get('LocalityService', 'port'))
+        print 'Locality service on port %i' % self.port
+        self.processor = localityservice.Processor(self)
+        self.transport = TSocket.TServerSocket(self.port)
+        self.tfactory = TTransport.TBufferedTransportFactory()
+        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+        self.server = TServer.TThreadedServer(self.processor,
+                                              self.transport,
+                                              self.tfactory,
+                                              self.pfactory)
+
+        self.hostVals =[]
+        self.array = array([[]])
+        self.rtime = 0
+
+
+        self.fileName =  
os.path.expanduser(config.get("LocalityService", "staticLayout"))
+        (self.client, self.transport) = createClient(config)
+
+        self.server.serve()
+
+    @synchronizedmethod
+    def refresh(self):
+        if time.time() - self.rtime < 10:
+            return
+        g, self.hostVals = graphFromFile(self.fileName)
+        try:
+            g, self.hostVals = graphFromTashi(self.client,  
self.transport, g, self.hostVals)
+        except e:
+            print e
+            print 'could not get instance list from cluster manager'
+        print 'graph to array'
+        a = graphToArray(g, self.hostVals)
+        print 'calling shortest paths ', a.shape
+        self.array = shortestPaths(a)
+        print 'computed shortest paths'
+        print self.array
+        print self.hostVals
+    @synchronizedmethod
+    def getHopCountMatrix(self, sourceHosts, destHosts):
+        self.refresh()
+        print 'getting hop count matrix for', sourceHosts, destHosts
+        hcm =  getHopCountMatrix(sourceHosts, destHosts, self.array,  
self.hostVals)
+        print hcm
+        return hcm
+
+
+ls = LocalityService()
Index: trunk/etc/TashiDefaults.cfg
===================================================================
--- trunk/etc/TashiDefaults.cfg	(revision 764908)
+++ trunk/etc/TashiDefaults.cfg	(working copy)
@@ -82,6 +82,11 @@
  [Vfs]
  prefix = /var/tmp/

+[LayoutService]
+host = layoutserver
+port = 9884
+staticLayout = /location/of/layout/file
+
  # Client configuration
  [Client]
  clusterManagerHost = localhost


Mime
View raw message