qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r595465 - in /incubator/qpid/trunk/qpid/python/qpid: codec.py management.py
Date Thu, 15 Nov 2007 21:36:00 GMT
Author: aconway
Date: Thu Nov 15 13:36:00 2007
New Revision: 595465

URL: http://svn.apache.org/viewvc?rev=595465&view=rev
Log:
QPID-687: comitted qpid-patch7-python.diff for real this time.

Modified:
    incubator/qpid/trunk/qpid/python/qpid/codec.py
    incubator/qpid/trunk/qpid/python/qpid/management.py

Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?rev=595465&r1=595464&r2=595465&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Thu Nov 15 13:36:00 2007
@@ -247,6 +247,12 @@
   def decode_signed_long(self):
     return self.unpack("!q")
 
+  def encode_signed_int(self, o):
+    self.pack("!l", o)
+
+  def decode_signed_int(self):
+    return self.unpack("!l")
+
   def encode_longlong(self, o):
     """
     encodes long long (64 bits) data 'o' in network byte order

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=595465&r1=595464&r2=595465&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Nov 15 13:36:00 2007
@@ -18,7 +18,7 @@
 #
 
 """
-Management classes for AMQP
+Management API for Qpid
 """
 
 import qpid
@@ -42,50 +42,91 @@
 #===================================================================
 class ManagementMetadata:
 
-  def parseSchema (self, cls, oid, len, codec):
-    #print "Schema Record: objId=", oid
+  def parseSchema (self, cls, codec):
+    className   = codec.decode_shortstr ()
+    configCount = codec.decode_short ()
+    instCount   = codec.decode_short ()
+    methodCount = codec.decode_short ()
+    eventCount  = codec.decode_short ()
+
+    configs = []
+    insts   = []
+    methods = []
+    events  = []
+
+    configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
+    insts.append   (("id", 4, None, None))
+
+    for idx in range (configCount):
+      ft = codec.decode_table ()
+      name   = ft["name"]
+      type   = ft["type"]
+      access = ft["access"]
+      index  = ft["index"]
+      unit   = None
+      min    = None
+      max    = None
+      maxlen = None
+      desc   = None
+
+      for key, value in ft.items ():
+        if   key == "unit":
+          unit = value
+        elif key == "min":
+          min = value
+        elif key == "max":
+          max = value
+        elif key == "maxlen":
+          maxlen = value
+        elif key == "desc":
+          desc = value
+
+      config = (name, type, unit, desc, access, index, min, max, maxlen)
+      configs.append (config)
+
+    for idx in range (instCount):
+      ft = codec.decode_table ()
+      name   = ft["name"]
+      type   = ft["type"]
+      unit   = None
+      desc   = None
+
+      for key, value in ft.items ():
+        if   key == "unit":
+          unit = value
+        elif key == "desc":
+          desc = value
 
-    config = []
-    inst   = []
-    while 1:
-      flags = codec.decode_octet ()
-      if flags == 0x80:
-        break
-
-      tc   = codec.decode_octet ()
-      name = codec.decode_shortstr ()
-      desc = codec.decode_shortstr ()
-
-      if flags & 1: # TODO: Define constants for these
-        config.append ((tc, name, desc))
-      if (flags & 1) == 0 or (flags & 2) == 2:
-        inst.append   ((tc, name, desc))
+      inst = (name, type, unit, desc)
+      insts.append (inst)
 
     # TODO: Handle notification of schema change outbound
-    self.schema[(oid,'C')] = config
-    self.schema[(oid,'I')] = inst
-
-  def parseContent (self, cls, oid, len, codec):
-    #print "Content Record: Class=", cls, ", objId=", oid
+    self.schema[(className,'C')] = configs
+    self.schema[(className,'I')] = insts
+    self.schema[(className,'M')] = methods
+    self.schema[(className,'E')] = events
 
+  def parseContent (self, cls, codec):
     if cls == 'C' and self.broker.config_cb == None:
       return
     if cls == 'I' and self.broker.inst_cb == None:
       return
 
-    if (oid,cls) not in self.schema:
+    className = codec.decode_shortstr ()
+
+    if (className,cls) not in self.schema:
       return
 
     row        = []
     timestamps = []
 
-    timestamps.append (codec.decode_longlong ()); # Current Time
-    timestamps.append (codec.decode_longlong ()); # Create Time
-    timestamps.append (codec.decode_longlong ()); # Delete Time
-
-    for element in self.schema[(oid,cls)][:]:
-      tc   = element[0]
-      name = element[1]
+    timestamps.append (codec.decode_longlong ())  # Current Time
+    timestamps.append (codec.decode_longlong ())  # Create Time
+    timestamps.append (codec.decode_longlong ())  # Delete Time
+
+    for element in self.schema[(className,cls)][:]:
+      tc   = element[1]
+      name = element[0]
       if   tc == 1: # TODO: Define constants for these
         data = codec.decode_octet ()
       elif tc == 2:
@@ -98,33 +139,29 @@
         data = codec.decode_octet ()
       elif tc == 6:
         data = codec.decode_shortstr ()
+      elif tc == 7:
+        data = codec.decode_longstr ()
+      else:
+        raise ValueError ("Invalid type code: %d" % tc)
       row.append ((name, data))
 
     if cls == 'C':
-      self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
-    if cls == 'I':
-      self.broker.inst_cb[1]   (self.broker.inst_cb[0], oid, row, timestamps)
+      self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
+    elif cls == 'I':
+      self.broker.inst_cb[1]   (self.broker.inst_cb[0], className, row, timestamps)
 
   def parse (self, codec):
-    try:
-      opcode = chr (codec.decode_octet ())
-    except EOF:
-      return 0
-
-    cls = chr (codec.decode_octet ())
-    oid = codec.decode_short ()
-    len = codec.decode_long  ()
-
-    if len < 8:
-      raise ValueError ("parse error: value of length field too small")
+    opcode = chr (codec.decode_octet ())
+    cls    = chr (codec.decode_octet ())
 
     if opcode == 'S':
-      self.parseSchema (cls, oid, len, codec)
+      self.parseSchema (cls, codec)
 
-    if opcode == 'C':
-      self.parseContent (cls, oid, len, codec)
+    elif opcode == 'C':
+      self.parseContent (cls, codec)
 
-    return 1
+    else:
+      raise ValueError ("Unknown opcode: %c" % opcode);
 
   def __init__ (self, broker):
     self.broker = broker
@@ -140,7 +177,8 @@
 #===================================================================
 class ManagedBroker:
 
-  exchange = "qpid.management"
+  mExchange = "qpid.management"
+  dExchange = "amq.direct"
 
   def checkHeader (self, codec):
     octet = chr (codec.decode_octet ())
@@ -157,69 +195,141 @@
       return 0
     return 1
 
-  def receive_cb (self, msg):
+  def publish_cb (self, msg):
     codec = Codec (StringIO (msg.content.body), self.spec)
 
     if self.checkHeader (codec) == 0:
       raise ValueError ("outer header invalid");
 
-    while self.metadata.parse (codec):
-      pass
-
+    self.metadata.parse (codec)
     msg.complete ()
 
-  def __init__ (self, host = "localhost", port = 5672,
-                username = "guest", password = "guest"):
+  def reply_cb (self, msg):
+    codec = Codec (StringIO (msg.content.body), self.spec)
+    methodId = codec.decode_long ()
+    status   = codec.decode_long ()
+    sText    = codec.decode_shortstr ()
+
+    args = {}
+    if status == 0:
+      args["sequence"] = codec.decode_long ()
+      args["body"]     = codec.decode_longstr ()
 
-    self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
-    self.client   = None
-    self.channel  = None
-    self.queue    = None
-    self.qname    = None
-    self.metadata = ManagementMetadata (self)
+    if self.method_cb != None:
+      self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
+
+    msg.complete ()
+
+  def __init__ (self,
+                host     = "localhost",
+                port     = 5672,
+                username = "guest",
+                password = "guest",
+                specfile = "../specs/amqp.0-10-preview.xml"):
+
+    self.spec = qpid.spec.load (specfile)
+    self.client    = None
+    self.channel   = None
+    self.queue     = None
+    self.rqueue    = None
+    self.qname     = None
+    self.rqname    = None
+    self.metadata  = ManagementMetadata (self)
+    self.connected = 0
+    self.lastConnectError = None
 
     #  Initialize the callback records
+    self.status_cb = None
     self.schema_cb = None
     self.config_cb = None
     self.inst_cb   = None
+    self.method_cb = None
 
     self.host     = host
     self.port     = port
     self.username = username
     self.password = password
 
+  def statusListener (self, context, callback):
+    self.status_cb = (context, callback)
+
   def schemaListener (self, context, callback):
     self.schema_cb = (context, callback)
 
   def configListener (self, context, callback):
     self.config_cb = (context, callback)
 
+  def methodListener (self, context, callback):
+    self.method_cb = (context, callback)
+
   def instrumentationListener (self, context, callback):
     self.inst_cb = (context, callback)
 
+  def method (self, methodId, objId, className,
+              methodName, args=None, packageName="qpid"):
+    codec = Codec (StringIO (), self.spec);
+    codec.encode_long     (methodId)
+    codec.encode_longlong (objId)
+    codec.encode_shortstr (self.rqname)
+
+    # TODO: Encode args according to schema
+    if methodName == "echo":
+      codec.encode_long (args["sequence"])
+      codec.encode_longstr (args["body"])
+
+    msg = Content (codec.stream.getvalue ())
+    msg["content_type"] = "application/octet-stream"
+    msg["routing_key"]  = "method." + packageName + "." + className + "." + methodName
+    msg["reply_to"]     = self.spec.struct ("reply_to")
+    self.channel.message_transfer (destination="qpid.management", content=msg)
+
+  def isConnected (self):
+    return connected
+
   def start (self):
-    print "Connecting to broker", self.host
+    print "Connecting to broker %s:%d" % (self.host, self.port)
 
     try:
       self.client = Client (self.host, self.port, self.spec)
       self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
       self.channel = self.client.channel (1)
-      response = self.channel.session_open (detached_lifetime=300)
-      self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
+      response = self.channel.session_open (detached_lifetime=10)
+      self.qname  = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
+      self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
+
+      self.channel.queue_declare (queue=self.qname,  exclusive=1, auto_delete=1)
+      self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
+      
+      self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
+                               routing_key="mgmt.#")
+      self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
+                               routing_key=self.rqname)
+
+      self.channel.message_subscribe (queue=self.qname,  destination="mdest")
+      self.channel.message_subscribe (queue=self.rqname, destination="rdest")
+
+      self.queue = self.client.queue ("mdest")
+      self.queue.listen (self.publish_cb)
+
+      self.channel.message_flow_mode (destination="mdest", mode=1)
+      self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
+      self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
+
+      self.rqueue = self.client.queue ("rdest")
+      self.rqueue.listen (self.reply_cb)
+
+      self.channel.message_flow_mode (destination="rdest", mode=1)
+      self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+      self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
 
-      self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
-      self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
-                               routing_key="mgmt")
-      self.channel.message_subscribe (queue=self.qname, destination="dest")
-      self.queue = self.client.queue ("dest")
-      self.queue.listen (self.receive_cb)
-
-      self.channel.message_flow_mode (destination="dest", mode=1)
-      self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
-      self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
+      self.connected = 1
 
     except socket.error, e:
       print "Socket Error Detected:", e[1]
+      self.lastConnectError = e
       raise
     except:
       raise
+
+  def stop (self):
+    pass



Mime
View raw message