qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r745832 - in /qpid/trunk/qpid/python: commands/qpid-cluster qmf/console.py
Date Thu, 19 Feb 2009 12:00:46 GMT
Author: tross
Date: Thu Feb 19 12:00:46 2009
New Revision: 745832

URL: http://svn.apache.org/viewvc?rev=745832&view=rev
Log:
QPID-1669 - Added client connection management to qpid-cluster.
Also added better session-id discrimination in the qmf.console library.

Modified:
    qpid/trunk/qpid/python/commands/qpid-cluster
    qpid/trunk/qpid/python/qmf/console.py

Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=745832&r1=745831&r2=745832&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Thu Feb 19 12:00:46 2009
@@ -23,12 +23,17 @@
 import getopt
 import sys
 import locale
+import socket
+import re
 from qmf.console import Session
 
 _host = "localhost"
 _stopId = None
 _stopAll = False
 _force = False
+_numeric = False
+_showConn = False
+_delConn = None
 
 def Usage ():
     print "Usage:  qpid-cluster [OPTIONS] [broker-addr]"
@@ -37,12 +42,40 @@
     print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
     print
     print "Options:"
-    print "          -s [--stop] ID   Stop one member of the cluster by its ID"
-    print "          -k [--all-stop]  Shut down the whole cluster"
-    print "          -f [--force]     Suppress the 'are-you-sure?' prompt"
+    print "          -C [--all-connections]  View client connections to all cluster members"
+    print "          -c [--connections] ID   View client connections to specified member"
+    print "          -d [--del-connection] HOST:PORT"
+    print "                                  Disconnect a client connection"
+    print "          -s [--stop] ID          Stop one member of the cluster by its ID"
+    print "          -k [--all-stop]         Shut down the whole cluster"
+    print "          -f [--force]            Suppress the 'are-you-sure?' prompt"
+    print "          -n [--numeric]          Don't resolve names"
     print
     sys.exit (1)
 
+
+class IpAddr:
+    def __init__(self, text):
+        if text.find(":") != -1:
+            tokens = text.split(":")
+            text = tokens[0]
+            self.port = int(tokens[1])
+        else:
+            self.port = 5672
+        self.dottedQuad = socket.gethostbyname(text)
+        nums = self.dottedQuad.split(".")
+        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2])
<< 8) + int(nums[3])
+
+    def bestAddr(self, addrPortList):
+        bestDiff = 0xFFFFFFFF
+        bestAddr = None
+        for addrPort in addrPortList:
+            diff = IpAddr(addrPort[0]).addr ^ self.addr
+            if diff < bestDiff:
+                bestDiff = diff
+                bestAddr = addrPort
+        return bestAddr
+
 class BrokerManager:
     def __init__(self):
         self.brokerName = None
@@ -62,7 +95,7 @@
         if self.broker:
             self.qmf.delBroker(self.broker)
 
-    def overview(self):
+    def _getClusters(self):
         packages = self.qmf.getPackages()
         if "org.apache.qpid.cluster" not in packages:
             print "Clustering is not installed on the broker."
@@ -73,8 +106,35 @@
             print "Clustering is installed but not enabled on the broker."
             sys.exit(0)
 
+        return clusters
+
+    def _getHostList(self, urlList):
+        hosts = []
+        hostAddr = IpAddr(_host)
+        for url in urlList:
+            if url.find("amqp:") != 0:
+                raise Exception("Invalid URL 1")
+            url = url[5:]
+            addrs = str(url).split(",")
+            addrList = []
+            for addr in addrs:
+                tokens = addr.split(":")
+                if len(tokens) != 3:
+                    raise Exception("Invalid URL 2")
+                addrList.append((tokens[1], tokens[2]))
+
+            # Find the address in the list that is most likely to be in the same subnet as
the address
+            # with which we made the original QMF connection.  This increases the probability
that we will
+            # be able to reach the cluster member.
+
+            best = hostAddr.bestAddr(addrList)
+            bestUrl = best[0] + ":" + best[1]
+            hosts.append(bestUrl)
+        return hosts
+
+    def overview(self):
+        clusters = self._getClusters()
         cluster = clusters[0]
-        myUrl = cluster.publishedURL
         memberList = cluster.members.split(";")
         idList = cluster.memberIDs.split(";")
 
@@ -86,11 +146,7 @@
             print "              : ID=%s URL=%s" % (idList[idx], memberList[idx])
 
     def stopMember(self, id):
-        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
-        if len(clusters) == 0:
-            print "Clustering is installed but not enabled on the broker."
-            sys.exit(0)
-
+        clusters = self._getClusters()
         cluster = clusters[0]
         idList = cluster.memberIDs.split(";")
         if id not in idList:
@@ -113,11 +169,7 @@
         cluster.stopClusterNode(id)
 
     def stopAll(self):
-        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
-        if len(clusters) == 0:
-            print "Clustering is installed but not enabled on the broker."
-            sys.exit(0)
-
+        clusters = self._getClusters()
         if not _force:
             prompt = "Warning: This command will shut down the entire cluster."
             prompt += " Are you sure? [N]: "
@@ -130,15 +182,72 @@
         cluster = clusters[0]
         cluster.stopFullCluster()
 
+    def showConnections(self):
+        clusters = self._getClusters()
+        cluster = clusters[0]
+        memberList = cluster.members.split(";")
+        idList = cluster.memberIDs.split(";")
+        displayList = []
+        hostList = self._getHostList(memberList)
+        self.qmf.delBroker(self.broker)
+        self.broker = None
+        self.brokers = []
+        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+
+        idx = 0
+        for host in hostList:
+            if _showConn == "all" or _showConn == idList[idx] or _delConn:
+                self.brokers.append(self.qmf.addBroker(host))
+                displayList.append(idList[idx])
+            idx += 1
+
+        idx = 0
+        found = False
+        for broker in self.brokers:
+            if not _delConn:
+                print "Clients on Member: ID=%s:" % displayList[idx]
+            connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker",
_broker=broker)
+            for conn in connList:
+                if pattern.match(conn.address):
+                    if _numeric or _delConn:
+                        a = conn.address
+                    else:
+                        tokens = conn.address.split(":")
+                        try:
+                            hostList = socket.gethostbyaddr(tokens[0])
+                            host = hostList[0]
+                        except:
+                            host = tokens[0]
+                        a = host + ":" + tokens[1]
+                    if _delConn:
+                        tokens = _delConn.split(":")
+                        ip = socket.gethostbyname(tokens[0])
+                        toDelete = ip + ":" + tokens[1]
+                        if a == toDelete:
+                            print "Closing connection from client: %s" % a
+                            conn.close()
+                            found = True
+                    else:
+                        print "    %s" % a
+            idx += 1
+            if not _delConn:
+                print
+        if _delConn and not found:
+            print "Client connection '%s' not found" % _delConn
+
+        for broker in self.brokers:
+            self.qmf.delBroker(broker)
+
+
 ##
 ## Main Program
 ##
 
 try:
-    longOpts = ("stop=", "all-stop", "force")
-    (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts)
+    longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=",
"numeric")
+    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n", longOpts)
 except:
-    Usage ()
+    Usage()
 
 try:
     encoding = locale.getpreferredencoding()
@@ -146,13 +255,41 @@
 except:
     cargs = encArgs
 
+count = 0
 for opt in optlist:
     if opt[0] == "-s" or opt[0] == "--stop":
         _stopId = opt[1]
+        if len(_stopId.split(":")) != 2:
+            print "Member ID must be of form: <host or ip>:<number>"
+            sys.exit(1)
+        count += 1
     if opt[0] == "-k" or opt[0] == "--all-stop":
         _stopAll = True
+        count += 1
     if opt[0] == "-f" or opt[0] == "--force":
         _force = True
+    if opt[0] == "-n" or opt[0] == "--numeric":
+        _numeric = True
+    if opt[0] == "-C" or opt[0] == "--all-connections":
+        _showConn = "all"
+        count += 1
+    if opt[0] == "-c" or opt[0] == "--connections":
+        _showConn = opt[1]
+        if len(_showConn.split(":")) != 2:
+            print "Member ID must be of form: <host or ip>:<number>"
+            sys.exit(1)
+        count += 1
+    if opt[0] == "-d" or opt[0] == "--del-connection":
+        _delConn = opt[1]
+        if len(_delConn.split(":")) != 2:
+            print "Connection must be of form: <host or ip>:<port>"
+            sys.exit(1)
+        count += 1
+
+if count > 1:
+    print "Only one command option may be supplied"
+    print
+    Usage()
 
 nargs = len(cargs)
 bm    = BrokerManager()
@@ -166,6 +303,8 @@
         bm.stopMember(_stopId)
     elif _stopAll:
         bm.stopAll()
+    elif _showConn or _delConn:
+        bm.showConnections()
     else:
         bm.overview()
 except KeyboardInterrupt:

Modified: qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf/console.py?rev=745832&r1=745831&r2=745832&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf/console.py (original)
+++ qpid/trunk/qpid/python/qmf/console.py Thu Feb 19 12:00:46 2009
@@ -1230,6 +1230,7 @@
 class Broker:
   """ This object represents a connection (or potential connection) to a QMF broker. """
   SYNC_TIME = 60
+  nextSeq = 1
 
   def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
     self.session  = session
@@ -1242,7 +1243,8 @@
     self.error = None
     self.brokerId = None
     self.connected = False
-    self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
+    self.amqpSessionId = "%s.%d.%d" % (os.uname()[1], os.getpid(), Broker.nextSeq)
+    Broker.nextSeq += 1
     if self.session.manageConnections:
       self.thread = ManagedConnection(self)
       self.thread.start()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message