qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r691700 [1/2] - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/templates/ cpp/src/qpid/agent/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/management/ python/qpid/ specs/
Date Wed, 03 Sep 2008 18:01:45 GMT
Author: tross
Date: Wed Sep  3 11:01:44 2008
New Revision: 691700

URL: http://svn.apache.org/viewvc?rev=691700&view=rev
Log:
QPID-1174 Updates to the management framework

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml
    incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AclModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/managementdata.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml
    incubator/qpid/trunk/qpid/specs/management-types.xml

Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp Wed Sep  3 11:01:44 2008
@@ -89,9 +89,10 @@
 
 CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
 {
+    static uint64_t persistId = 0x111222333444555LL;
     mgmtObject = new Parent(agent, this, name);
 
-    agent->addObject(mgmtObject);
+    agent->addObject(mgmtObject, persistId++);
     mgmtObject->set_state("IDLE");
 }
 
@@ -128,6 +129,8 @@
 
         children.push_back(child);
 
+        mgmtObject->event_childCreated(ioArgs.i_name);
+
         return STATUS_OK;
     }
 
@@ -145,7 +148,8 @@
 //==============================================================
 // Main program
 //==============================================================
-int main(int argc, char** argv) {
+int main_int(int argc, char** argv)
+{
     ManagementAgent::Singleton singleton;
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
@@ -158,7 +162,7 @@
 
     // Start the agent.  It will attempt to make a connection to the
     // management broker
-    agent->init(string(host), port);
+    agent->init(string(host), port, 5, false, ".magentdata");
 
     // Allocate some core objects
     CoreClass core1(agent, "Example Core Object #1");
@@ -168,4 +172,12 @@
     core1.doLoop();
 }
 
+int main(int argc, char** argv)
+{
+    try {
+        return main_int(argc, argv);
+    } catch(std::exception& e) {
+        cout << "Top Level Exception: " << e.what() << endl;
+    }
+}
 

Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml Wed Sep  3 11:01:44 2008
@@ -37,6 +37,14 @@
       <arg name="name"     dir="I" type="sstr"/>
       <arg name="childRef" dir="O" type="objId"/>
     </method>
+
+    <event name="childCreated">
+      <arg name="name" type="sstr"/>
+    </event>
+
+    <event name="childDestroyed">
+      <arg name="name" type="sstr"/>
+    </event>
   </class>
 
 

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Wed Sep  3 11:01:44 2008
@@ -79,7 +79,7 @@
   def getName (self):
     return self.name
 
-  def genAccessor (self, stream, varName, changeFlag = None):
+  def genAccessor (self, stream, varName, changeFlag = None, optional = False):
     if self.perThread:
       prefix = "getThreadStats()->"
       if self.style == "wm":
@@ -87,11 +87,13 @@
     else:
       prefix = ""
     if self.accessor == "direct":
-      stream.write ("    inline void set_" + varName + " (" + self.cpp + " val){\n");
+      stream.write ("    inline void set_" + varName + " (" + self.cpp + " val) {\n");
       if not self.perThread:
         stream.write ("        sys::Mutex::ScopedLock mutex(accessLock);\n")
       if self.style != "mma":
-        stream.write ("        " + prefix + varName + " = val;\n");
+        stream.write ("        " + prefix + varName + " = val;\n")
+        if optional:
+          stream.write ("        presenceMask[presenceByte_%s] |= presenceMask_%s;\n" % (varName, varName))
       if self.style == "wm":
         stream.write ("        if (" + varName + "Low  > val)\n")
         stream.write ("            " + varName + "Low  = val;\n")
@@ -106,9 +108,24 @@
         stream.write ("            " + prefix + varName + "Max = val;\n")
       if changeFlag != None:
         stream.write ("        " + changeFlag + " = true;\n")
-      stream.write ("    }\n");
+      stream.write ("    }\n")
+      if self.style != "mma":
+        stream.write ("    inline " + self.cpp + "& get_" + varName + "() {\n");
+        if not self.perThread:
+          stream.write ("        sys::Mutex::ScopedLock mutex(accessLock);\n")
+        stream.write ("        return " + prefix + varName + ";\n")
+        stream.write ("    }\n")
+      if optional:
+        stream.write ("    inline void clr_" + varName + "() {\n")
+        stream.write ("        presenceMask[presenceByte_%s] &= ~presenceMask_%s;\n" % (varName, varName))
+        if changeFlag != None:
+          stream.write ("        " + changeFlag + " = true;\n")
+        stream.write ("    }\n")
+        stream.write ("    inline bool isSet_" + varName + "() {\n")
+        stream.write ("        return presenceMask[presenceByte_%s] & presenceMask_%s != 0;\n" % (varName, varName))
+        stream.write ("    }\n")
     elif self.accessor == "counter":
-      stream.write ("    inline void inc_" + varName + " (" + self.cpp + " by = 1){\n");
+      stream.write ("    inline void inc_" + varName + " (" + self.cpp + " by = 1) {\n");
       if not self.perThread:
         stream.write ("        sys::Mutex::ScopedLock mutex(accessLock);\n")
       stream.write ("        " + prefix + varName + " += by;\n")
@@ -118,7 +135,7 @@
       if changeFlag != None:
         stream.write ("        " + changeFlag + " = true;\n")
       stream.write ("    }\n");
-      stream.write ("    inline void dec_" + varName + " (" + self.cpp + " by = 1){\n");
+      stream.write ("    inline void dec_" + varName + " (" + self.cpp + " by = 1) {\n");
       if not self.perThread:
         stream.write ("        sys::Mutex::ScopedLock mutex(accessLock);\n")
       stream.write ("        " + prefix + varName + " -= by;\n")
@@ -146,22 +163,22 @@
       stream.write ("        threadStats->" + varName + "Min   = std::numeric_limits<" + cpptype + ">::max();\n")
       stream.write ("        threadStats->" + varName + "Max   = std::numeric_limits<" + cpptype + ">::min();\n")
 
-  def genWrite (self, stream, varName):
+  def genWrite (self, stream, varName, indent="    "):
     if self.style != "mma":
-      stream.write ("    " + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
+      stream.write (indent + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
     if self.style == "wm":
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "High") + ";\n")
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "Low") + ";\n")
     if self.style == "mma":
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "Count") + ";\n")
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "Count ? " + varName + "Min : 0") + ";\n")
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "Max") + ";\n")
-      stream.write ("    " + self.encode.replace ("@", "buf") \
+      stream.write (indent + self.encode.replace ("@", "buf") \
                     .replace ("#", varName + "Count ? " + varName + "Total / " +
                               varName + "Count : 0") + ";\n")
 
@@ -207,7 +224,7 @@
 #=====================================================================================
 #
 #=====================================================================================
-class SchemaConfig:
+class SchemaProperty:
   def __init__ (self, node, typespec):
     self.name         = None
     self.type         = None
@@ -216,6 +233,7 @@
     self.isIndex      = 0
     self.isParentRef  = 0
     self.isGeneralRef = 0
+    self.isOptional   = 0
     self.unit         = None
     self.min          = None
     self.max          = None
@@ -255,6 +273,11 @@
           raise ValueError ("Expected 'y' in isGeneralReference attribute")
         self.isGeneralRef = 1
         
+      elif key == 'optional':
+        if val != 'y':
+          raise ValueError ("Expected 'y' in optional attribute")
+        self.isOptional = 1
+        
       elif key == 'unit':
         self.unit = val
         
@@ -273,6 +296,9 @@
       else:
         raise ValueError ("Unknown attribute in property '%s'" % key)
 
+    if self.access == "RC" and self.isOptional == 1:
+      raise ValueError ("Properties with ReadCreate access must not be optional (%s)" % self.name)
+
     if self.name == None:
       raise ValueError ("Missing 'name' attribute in property")
     if self.type == None:
@@ -289,18 +315,19 @@
   def genDeclaration (self, stream, prefix="    "):
     stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n")
 
-  def genFormalParam (self, stream):
+  def genFormalParam (self, stream, variables):
     stream.write (self.type.type.cpp + " _" + self.name)
 
   def genAccessor (self, stream):
-    self.type.type.genAccessor (stream, self.name, "configChanged")
+    self.type.type.genAccessor (stream, self.name, "configChanged", self.isOptional == 1)
 
   def genSchema (self, stream):
     stream.write ("    ft = FieldTable ();\n")
-    stream.write ("    ft.setString (NAME,   \"" + self.name + "\");\n")
-    stream.write ("    ft.setInt    (TYPE,   TYPE_" + self.type.type.base +");\n")
+    stream.write ("    ft.setString (NAME, \"" + self.name + "\");\n")
+    stream.write ("    ft.setInt    (TYPE, TYPE_" + self.type.type.base +");\n")
     stream.write ("    ft.setInt    (ACCESS, ACCESS_" + self.access + ");\n")
-    stream.write ("    ft.setInt    (INDEX,  " + str (self.isIndex) + ");\n")
+    stream.write ("    ft.setInt    (INDEX, " + str (self.isIndex) + ");\n")
+    stream.write ("    ft.setInt    (OPTIONAL, " + str (self.isOptional) + ");\n")
     if self.unit != None:
       stream.write ("    ft.setString (UNIT,   \"" + self.unit   + "\");\n")
     if self.min != None:
@@ -314,13 +341,19 @@
     stream.write ("    buf.put (ft);\n\n")
 
   def genWrite (self, stream):
-    self.type.type.genWrite (stream, self.name)
+    indent = "    "
+    if self.isOptional:
+      stream.write("    if (presenceMask[presenceByte_%s] & presenceMask_%s) {\n" % (self.name, self.name))
+      indent = "        "
+    self.type.type.genWrite (stream, self.name, indent)
+    if self.isOptional:
+      stream.write("    }\n")
 
 
 #=====================================================================================
 #
 #=====================================================================================
-class SchemaInst:
+class SchemaStatistic:
   def __init__ (self, node, typespec):
     self.name   = None
     self.type   = None
@@ -523,25 +556,30 @@
   def getDir (self):
     return self.dir
 
-  def genSchema (self, stream):
+  def genSchema (self, stream, event=False):
     stream.write ("    ft = FieldTable ();\n")
     stream.write ("    ft.setString (NAME,    \"" + self.name + "\");\n")
     stream.write ("    ft.setInt    (TYPE,    TYPE_" + self.type.type.base +");\n")
-    stream.write ("    ft.setString (DIR,     \"" + self.dir + "\");\n")
+    if (not event):
+      stream.write ("    ft.setString (DIR,     \"" + self.dir + "\");\n")
     if self.unit != None:
       stream.write ("    ft.setString (UNIT,    \"" + self.unit   + "\");\n")
-    if self.min != None:
-      stream.write ("    ft.setInt    (MIN,     " + self.min    + ");\n")
-    if self.max != None:
-      stream.write ("    ft.setInt    (MAX,     " + self.max    + ");\n")
-    if self.maxLen != None:
-      stream.write ("    ft.setInt    (MAXLEN,  " + self.maxLen + ");\n")
+    if not event:
+      if self.min != None:
+        stream.write ("    ft.setInt    (MIN,     " + self.min    + ");\n")
+      if self.max != None:
+        stream.write ("    ft.setInt    (MAX,     " + self.max    + ");\n")
+      if self.maxLen != None:
+        stream.write ("    ft.setInt    (MAXLEN,  " + self.maxLen + ");\n")
+      if self.default != None:
+        stream.write ("    ft.setString (DEFAULT, \"" + self.default + "\");\n")
     if self.desc != None:
       stream.write ("    ft.setString (DESC,    \"" + self.desc + "\");\n")
-    if self.default != None:
-      stream.write ("    ft.setString (DEFAULT, \"" + self.default + "\");\n")
     stream.write ("    buf.put (ft);\n\n")
 
+  def genFormalParam (self, stream, variables):
+    stream.write ("%s _%s" % (self.type.type.cpp, self.name))
+
 #=====================================================================================
 #
 #=====================================================================================
@@ -649,6 +687,50 @@
   def getArgCount (self):
     return len (self.args)
 
+  def genMethodBody (self, stream, variables, classObject):
+    stream.write("void ")
+    classObject.genNameCap(stream, variables)
+    stream.write("::event_%s(" % self.name)
+    count = 0
+    for arg in self.args:
+      arg.genFormalParam(stream, variables)
+      count += 1
+      if count < len(self.args):
+        stream.write(", ")
+    stream.write(") {\n")
+    stream.write("    sys::Mutex::ScopedLock mutex(getMutex());\n")
+    stream.write("    Buffer* buf = startEventLH();\n")
+    stream.write("    objectId.encode(*buf);\n")
+    stream.write("    buf->putShortString(packageName);\n")
+    stream.write("    buf->putShortString(className);\n")
+    stream.write("    buf->putBin128(md5Sum);\n")
+    stream.write("    buf->putShortString(\"%s\");\n" % self.name)
+    for arg in self.args:
+      stream.write("    %s;\n" % arg.type.type.encode.replace("@", "(*buf)").replace("#", "_" + arg.name))
+    stream.write("    finishEventLH(buf);\n")
+    stream.write("}\n\n")
+
+  def genMethodDecl (self, stream, variables):
+    stream.write("    void event_%s(" % self.name)
+    count = 0
+    for arg in self.args:
+      arg.genFormalParam(stream, variables)
+      count += 1
+      if count < len(self.args):
+        stream.write(", ")
+    stream.write(");\n")
+
+  def genSchema(self, stream, variables):
+    stream.write ("    ft = FieldTable ();\n")
+    stream.write ("    ft.setString (NAME,     \"" + self.name + "\");\n")
+    stream.write ("    ft.setInt    (ARGCOUNT, " + str (len (self.args)) + ");\n")
+    if self.desc != None:
+      stream.write ("    ft.setString (DESC,     \"" + self.desc + "\");\n")
+    stream.write ("    buf.put (ft);\n\n")
+    for arg in self.args:
+      arg.genSchema (stream, True)
+
+
 
 class SchemaClass:
   def __init__ (self, package, node, typespec, fragments, options):
@@ -669,11 +751,11 @@
     for child in children:
       if child.nodeType == Node.ELEMENT_NODE:
         if   child.nodeName == 'property':
-          sub = SchemaConfig (child, typespec)
+          sub = SchemaProperty (child, typespec)
           self.properties.append (sub)
 
         elif child.nodeName == 'statistic':
-          sub = SchemaInst (child, typespec)
+          sub = SchemaStatistic (child, typespec)
           self.statistics.append (sub)
 
         elif child.nodeName == 'method':
@@ -758,6 +840,12 @@
   # Code Generation Functions.  The names of these functions (minus the leading "gen")
   # match the substitution keywords in the template files.
   #===================================================================================
+  def testExistOptionals (self, variables):
+    for prop in self.properties:
+      if prop.isOptional == 1:
+        return True
+    return False
+
   def testExistPerThreadStats (self, variables):
     for inst in self.statistics:
       if inst.type.type.perThread:
@@ -794,17 +882,13 @@
     for element in self.properties:
       element.genDeclaration (stream)
 
-  def genConfigElementSchema (self, stream, variables):
-    for config in self.properties:
-      config.genSchema (stream)
-
   def genConstructorArgs (self, stream, variables):
     # Constructor args are config elements with read-create access
     result = ""
     for element in self.properties:
       if element.isConstructorArg ():
         stream.write (", ")
-        element.genFormalParam (stream)
+        element.genFormalParam (stream, variables)
 
   def genConstructorInits (self, stream, variables):
     for element in self.properties:
@@ -831,8 +915,17 @@
   def genEventCount (self, stream, variables):
     stream.write ("%d" % len (self.events))
 
+  def genEventMethodBodies (self, stream, variables):
+    for event in self.events:
+      event.genMethodBody (stream, variables, self)
+
+  def genEventMethodDecls (self, stream, variables):
+    for event in self.events:
+      event.genMethodDecl (stream, variables)
+
   def genEventSchema (self, stream, variables):
-    pass ###########################################################################
+    for event in self.events:
+      event.genSchema (stream, variables)
 
   def genHiLoStatResets (self, stream, variables):
     for inst in self.statistics:
@@ -884,10 +977,6 @@
       if element.type.type.perThread:
         element.genDeclaration (stream, "        ")
 
-  def genInstElementSchema (self, stream, variables):
-    for inst in self.statistics:
-      inst.genSchema (stream)
-
   def genMethodArgIncludes (self, stream, variables):
     for method in self.methods:
       if method.getArgCount () > 0:
@@ -898,7 +987,7 @@
 
   def genMethodHandlers (self, stream, variables):
     for method in self.methods:
-      stream.write ("\n    if (methodName == \"" + method.getName () + "\")\n    {\n")
+      stream.write ("\n    if (methodName == \"" + method.getName () + "\") {\n")
       if method.getArgCount () == 0:
         stream.write ("        ArgsNone ioArgs;\n")
       else:
@@ -922,10 +1011,36 @@
                                                     arg.name, "outBuf") + ";\n")
       stream.write ("        return;\n    }\n")
 
+  def genPresenceMaskBytes (self, stream, variables):
+    count = 0
+    for prop in self.properties:
+      if prop.isOptional == 1:
+        count += 1
+    if count == 0:
+      stream.write("0")
+    else:
+      stream.write (str(((count - 1) / 8) + 1))
+
+  def genPresenceMaskConstants (self, stream, variables):
+    count = 0
+    for prop in self.properties:
+      if prop.isOptional == 1:
+        stream.write("    static const uint8_t presenceByte_%s = %d;\n" % (prop.name, count / 8))
+        stream.write("    static const uint8_t presenceMask_%s = %d;\n" % (prop.name, 1 << (count % 8)))
+        count += 1
+
+  def genPropertySchema (self, stream, variables):
+    for prop in self.properties:
+      prop.genSchema (stream)
+
   def genSetGeneralReferenceDeclaration (self, stream, variables):
     for prop in self.properties:
       if prop.isGeneralRef:
-        stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n")
+        stream.write ("void setReference(ObjectId objectId) { " + prop.name + " = objectId; }\n")
+
+  def genStatisticSchema (self, stream, variables):
+    for stat in self.statistics:
+      stat.genSchema (stream)
 
   def genMethodIdDeclarations (self, stream, variables):
     number = 1
@@ -983,13 +1098,13 @@
       if inst.type.type.perThread:
         inst.genAssign (stream)
 
-  def genWriteConfig (self, stream, variables):
-    for config in self.properties:
-      config.genWrite (stream)
+  def genWriteProperties (self, stream, variables):
+    for prop in self.properties:
+      prop.genWrite (stream)
 
-  def genWriteInst (self, stream, variables):
-    for inst in self.statistics:
-      inst.genWrite (stream)
+  def genWriteStatistics (self, stream, variables):
+    for stat in self.statistics:
+      stat.genWrite (stream)
 
 
 
@@ -1046,15 +1161,9 @@
 
   def genClassRegisters (self, stream, variables):
     for _class in self.classes:
-      stream.write ("agent->RegisterClass (")
-      _class.genNameCap (stream, variables)
-      stream.write ("::packageName, ")
-      _class.genNameCap (stream, variables)
-      stream.write ("::className, ")
-      _class.genNameCap (stream, variables)
-      stream.write ("::md5Sum, ")
+      stream.write ("    ")
       _class.genNameCap (stream, variables)
-      stream.write ("::writeSchema);\n")
+      stream.write ("::registerClass(agent);\n")
 
 
 #=====================================================================================

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp Wed Sep  3 11:01:44 2008
@@ -42,6 +42,11 @@
 {
     /*MGEN:Class.ParentRefAssignment*/
 /*MGEN:Class.InitializeElements*/
+/*MGEN:IF(Class.ExistOptionals)*/
+    // Optional properties start out not-present
+    for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+        presenceMask[idx] = 0;
+/*MGEN:ENDIF*/
 /*MGEN:IF(Class.ExistPerThreadStats)*/
     maxThreads = agent->getMaxThreads();
     perThreadStatsArray = new struct PerThreadStats*[maxThreads];
@@ -65,6 +70,7 @@
     const string TYPE("type");
     const string ACCESS("access");
     const string INDEX("index");
+    const string OPTIONAL("optional");
     const string UNIT("unit");
     const string MIN("min");
     const string MAX("max");
@@ -76,6 +82,11 @@
     const string DEFAULT("default");
 }
 
+void /*MGEN:Class.NameCap*/::registerClass(ManagementAgent* agent)
+{
+    agent->RegisterClass(packageName, className, md5Sum, writeSchema);
+}
+
 void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
 {
     FieldTable ft;
@@ -90,9 +101,9 @@
     buf.putShort       (/*MGEN:Class.EventCount*/); // Event Count
 
     // Properties
-/*MGEN:Class.ConfigElementSchema*/
+/*MGEN:Class.PropertySchema*/
     // Statistics
-/*MGEN:Class.InstElementSchema*/
+/*MGEN:Class.StatisticSchema*/
     // Methods
 /*MGEN:Class.MethodSchema*/
     // Events
@@ -118,7 +129,11 @@
     configChanged = false;
 
     writeTimestamps (buf);
-/*MGEN:Class.WriteConfig*/
+/*MGEN:IF(Class.ExistOptionals)*/
+    for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+        buf.putOctet(presenceMask[idx]);
+/*MGEN:ENDIF*/
+/*MGEN:Class.WriteProperties*/
 }
 
 void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
@@ -140,7 +155,7 @@
 /*MGEN:Class.Assign*/
     if (!skipHeaders)
         writeTimestamps (buf);
-/*MGEN:Class.WriteInst*/
+/*MGEN:Class.WriteStatistics*/
 
     // Maintenance of hi-lo statistics
 /*MGEN:Class.HiLoStatResets*/
@@ -162,3 +177,4 @@
     outBuf.putShortString (Manageable::StatusText (status));
 }
 
+/*MGEN:Class.EventMethodBodies*/

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Wed Sep  3 11:01:44 2008
@@ -37,6 +37,10 @@
     static std::string packageName;
     static std::string className;
     static uint8_t     md5Sum[16];
+/*MGEN:IF(Class.ExistOptionals)*/
+    uint8_t presenceMask[/*MGEN:Class.PresenceMaskBytes*/];
+/*MGEN:Class.PresenceMaskConstants*/
+/*MGEN:ENDIF*/
 
     // Properties
 /*MGEN:Class.ConfigDeclarations*/
@@ -78,14 +82,13 @@
 /*MGEN:ENDIF*/
   public:
 
-    friend class Package/*MGEN:Class.NamePackageCap*/;
-
     /*MGEN:Class.NameCap*/ (ManagementAgent* agent,
                             Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
     ~/*MGEN:Class.NameCap*/ (void);
 
     /*MGEN:Class.SetGeneralReferenceDeclaration*/
 
+    static void  registerClass  (ManagementAgent* agent);
     std::string& getPackageName (void) { return packageName; }
     std::string& getClassName   (void) { return className; }
     uint8_t*     getMd5Sum      (void) { return md5Sum; }
@@ -94,6 +97,8 @@
 /*MGEN:Class.MethodIdDeclarations*/
     // Accessor Methods
 /*MGEN:Class.AccessorMethods*/
+    // Event Methods
+/*MGEN:Class.EventMethodDecls*/
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgent.h Wed Sep  3 11:01:44 2008
@@ -65,10 +65,14 @@
     //                       agent's thread.  In this case, the callback implementations
     //                       MUST be thread safe.
     //
+    //   storeFile         - File where this process has read and write access.  This
+    //                       file shall be used to store persistent state.
+    //
     virtual void init (std::string brokerHost        = "localhost",
                        uint16_t    brokerPort        = 5672,
                        uint16_t    intervalSeconds   = 10,
-                       bool        useExternalThread = false) = 0;
+                       bool        useExternalThread = false,
+                       std::string storeFile         = "") = 0;
 
     // Register a schema with the management agent.  This is normally called by the
     // package initializer generated by the management code generator.
@@ -93,9 +97,8 @@
     // pointer.  This allows the management agent to report the deletion of the object
     // in an orderly way.
     //
-    virtual uint64_t addObject (ManagementObject* objectPtr,
-                                uint32_t          persistId   = 0,
-                                uint32_t          persistBank = 4) = 0;
+    virtual ObjectId addObject (ManagementObject* objectPtr,
+                                uint64_t          persistId = 0) = 0;
 
     // If "useExternalThread" was set to true in init, this method must
     // be called to provide a thread for any pending method calls that have arrived.
@@ -120,6 +123,12 @@
     //
     virtual int getSignalFd (void) = 0;
 
+protected:
+    friend class ManagementObject;
+    virtual sys::Mutex& getMutex() = 0;
+    virtual framing::Buffer* startEventLH() = 0;
+    virtual void finishEventLH(framing::Buffer* buf) = 0;
+    
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Sep  3 11:01:44 2008
@@ -24,12 +24,21 @@
 #include <list>
 #include <unistd.h>
 #include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <iostream>
+#include <fstream>
+
 
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::management;
 using namespace qpid::sys;
 using std::stringstream;
+using std::ofstream;
+using std::ifstream;
 using std::string;
 using std::cout;
 using std::endl;
@@ -66,128 +75,186 @@
     return agent;
 }
 
+const string ManagementAgentImpl::storeMagicNumber("MA01");
+
 ManagementAgentImpl::ManagementAgentImpl() :
-    clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
+    extThread(false), writeFd(-1), readFd(-1),
+    clientWasAdded(true), requestedBank(0),
+    assignedBank(0), brokerBank(0), bootSequence(0),
+    connThreadBody(*this), connThread(connThreadBody),
+    pubThreadBody(*this), pubThread(pubThreadBody)
 {
     // TODO: Establish system ID
 }
 
-void ManagementAgentImpl::init(std::string brokerHost,
-                               uint16_t    brokerPort,
-                               uint16_t    intervalSeconds,
-                               bool        useExternalThread)
+void ManagementAgentImpl::init(string    brokerHost,
+                               uint16_t  brokerPort,
+                               uint16_t  intervalSeconds,
+                               bool      useExternalThread,
+                               string    _storeFile)
 {
-    {
-        Mutex::ScopedLock lock(agentLock);
-        startupWait = true;
-    }
-
     interval     = intervalSeconds;
     extThread    = useExternalThread;
+    storeFile    = _storeFile;
     nextObjectId = 1;
+    host         = brokerHost;
+    port         = brokerPort;
 
-    sessionId.generate();
-    queueName << "qmfagent-" << sessionId;
-    string dest = "qmfagent";
-
-    connection.open(brokerHost.c_str(), brokerPort);
-    session = connection.newSession (queueName.str());
-    dispatcher = new client::Dispatcher(session);
-
-
-    session.queueDeclare (arg::queue=queueName.str());
-    session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
-                          arg::bindingKey=queueName.str());
-    session.messageSubscribe (arg::queue=queueName.str(),
-                              arg::destination=dest);
-    session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
-    session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
-
-    Message attachRequest;
-    char    rawbuffer[512];
-    Buffer  buffer (rawbuffer, 512);
-
-    attachRequest.getDeliveryProperties().setRoutingKey("broker");
-    attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
-
-    EncodeHeader (buffer, 'A');
-    buffer.putShortString ("RemoteAgent [C++]");
-    systemId.encode  (buffer);
-    buffer.putLong (11);
-
-    size_t length = 512 - buffer.available ();
-    string stringBuffer (rawbuffer, length);
-    attachRequest.setData (stringBuffer);
-
-    session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
-
-    dispatcher->listen(dest, this);
-    dispatcher->start();
+    // TODO: Abstract the socket calls for portability
+    if (extThread) {
+        int pair[2];
+        int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair);
+        if (result == -1) {
+            return;
+        }
+        writeFd = pair[0];
+        readFd  = pair[1];
 
-    {
-        Mutex::ScopedLock lock(agentLock);
-        if (startupWait)
-            startupCond.wait(agentLock);
+        // Set the readFd to non-blocking
+        int flags = fcntl(readFd, F_GETFL);
+        fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
     }
+
+    retrieveData();
+    bootSequence++;
+    if ((bootSequence & 0xF000) != 0)
+        bootSequence = 1;
+    storeData(true);
 }
 
 ManagementAgentImpl::~ManagementAgentImpl()
 {
-    dispatcher->stop();
-    session.close();
-    delete dispatcher;
 }
 
-void ManagementAgentImpl::RegisterClass (std::string packageName,
-                                         std::string className,
-                                         uint8_t*    md5Sum,
-                                         management::ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementAgentImpl::RegisterClass(std::string packageName,
+                                        std::string className,
+                                        uint8_t*    md5Sum,
+                                        management::ManagementObject::writeSchemaCall_t schemaCall)
 { 
     Mutex::ScopedLock lock(agentLock);
-    PackageMap::iterator pIter = FindOrAddPackage (packageName);
-    AddClassLocal (pIter, className, md5Sum, schemaCall);
+    PackageMap::iterator pIter = FindOrAddPackage(packageName);
+    AddClassLocal(pIter, className, md5Sum, schemaCall);
 }
 
-uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
-                                         uint32_t          /*persistId*/,
-                                         uint32_t          /*persistBank*/)
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+                                        uint64_t          persistId)
 {
     Mutex::ScopedLock lock(addLock);
-    uint64_t objectId;
+    uint16_t sequence  = persistId ? 0 : bootSequence;
+    uint64_t objectNum = persistId ? persistId : nextObjectId++;
+
+    ObjectId objectId(&attachment, 0, sequence, objectNum);
 
     // TODO: fix object-id handling
-    objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
-    object->setObjectId (objectId);
+    object->setObjectId(objectId);
     newManagementObjects[objectId] = object;
     return objectId;
 }
 
-uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
 {
-    return 0;
+    Mutex::ScopedLock lock(agentLock);
+
+    for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
+        if (methodQueue.empty())
+            break;
+
+        QueuedMethod* item = methodQueue.front();
+        methodQueue.pop_front();
+        {
+            Mutex::ScopedUnlock unlock(agentLock);
+            Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
+            invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+            delete item;
+        }
+    }
+
+    uint8_t rbuf[100];
+    while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes
+    return methodQueue.size();
 }
 
 int ManagementAgentImpl::getSignalFd(void)
 {
-    return -1;
+    return readFd;
+}
+
+void ManagementAgentImpl::startProtocol()
+{
+    char    rawbuffer[512];
+    Buffer  buffer(rawbuffer, 512);
+
+    EncodeHeader(buffer, 'A');
+    buffer.putShortString("RemoteAgent [C++]");
+    systemId.encode (buffer);
+    buffer.putLong(requestedBank);
+    uint32_t length = 512 - buffer.available();
+    buffer.reset();
+    connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
+}
+
+void ManagementAgentImpl::storeData(bool requested)
+{
+    if (!storeFile.empty()) {
+        ofstream outFile(storeFile.c_str());
+        uint32_t bankToWrite = requested ? requestedBank : assignedBank;
+
+        if (outFile.good()) {
+            outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl;
+            outFile.close();
+        }
+    }
+}
+
+void ManagementAgentImpl::retrieveData()
+{
+    if (!storeFile.empty()) {
+        ifstream inFile(storeFile.c_str());
+        string   mn;
+
+        if (inFile.good()) {
+            inFile >> mn;
+            if (mn == storeMagicNumber) {
+                inFile >> requestedBank;
+                inFile >> bootSequence;
+            }
+            inFile.close();
+        }
+    }
+}
+
+void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
+                                              uint32_t code, string text)
+{
+    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+    uint32_t outLen;
+
+    EncodeHeader(outBuffer, 'z', sequence);
+    outBuffer.putLong(code);
+    outBuffer.putShortString(text);
+    outLen = MA_BUFFER_SIZE - outBuffer.available();
+    outBuffer.reset();
+    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
 }
 
 void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
 {
     Mutex::ScopedLock lock(agentLock);
-    uint32_t assigned;
-    stringstream key;
 
-    assigned = inBuffer.getLong();
-    objIdPrefix = ((uint64_t) assigned) << 24;
+    brokerBank   = inBuffer.getLong();
+    assignedBank = inBuffer.getLong();
+    if (assignedBank != requestedBank) {
+        if (requestedBank == 0)
+            cout << "Initial object-id bank assigned: " << assignedBank << endl;
+        else
+            cout << "Collision in object-id! New bank assigned: " << assignedBank << endl;
+        storeData();
+    }
 
-    startupWait = false;
-    startupCond.notify();
+    attachment.setBanks(brokerBank, assignedBank);
 
     // Bind to qpid.management to receive commands
-    key << "agent." << assigned;
-    session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
-                          arg::bindingKey=key.str());
+    connThreadBody.bindToBank(assignedBank);
 
     // Send package indications for all local packages
     for (PackageMap::iterator pIter = packages.begin();
@@ -198,9 +265,9 @@
 
         EncodeHeader(outBuffer, 'p');
         EncodePackageIndication(outBuffer, pIter);
-        outLen = MA_BUFFER_SIZE - outBuffer.available ();
+        outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
-        SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+        connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
 
         // Send class indications for all local classes
         ClassMap cMap = pIter->second;
@@ -208,9 +275,9 @@
             outBuffer.reset();
             EncodeHeader(outBuffer, 'q');
             EncodeClassIndication(outBuffer, pIter, cIter);
-            outLen = MA_BUFFER_SIZE - outBuffer.available ();
+            outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
         }
     }
 }
@@ -236,9 +303,9 @@
 
              EncodeHeader(outBuffer, 's', sequence);
              schema.writeSchemaCall(outBuffer);
-             outLen = MA_BUFFER_SIZE - outBuffer.available ();
+             outLen = MA_BUFFER_SIZE - outBuffer.available();
              outBuffer.reset();
-             SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+             connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
         }
     }
 }
@@ -249,28 +316,93 @@
     clientWasAdded = true;
 }
 
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
 {
     string   methodName;
-    Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+    string   packageName;
+    string   className;
+    uint8_t  hash[16];
+    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    uint64_t objId = inBuffer.getLongLong();
+    ObjectId objId(inBuffer);
+    inBuffer.getShortString(packageName);
+    inBuffer.getShortString(className);
+    inBuffer.getBin128(hash);
     inBuffer.getShortString(methodName);
 
     EncodeHeader(outBuffer, 'm', sequence);
 
     ManagementObjectMap::iterator iter = managementObjects.find(objId);
     if (iter == managementObjects.end() || iter->second->isDeleted()) {
-        outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
-        outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+        outBuffer.putLong       (Manageable::STATUS_UNKNOWN_OBJECT);
+        outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
     } else {
-        iter->second->doMethod(methodName, inBuffer, outBuffer);
+        if ((iter->second->getPackageName() != packageName) ||
+            (iter->second->getClassName()   != className)) {
+            outBuffer.putLong        (Manageable::STATUS_INVALID_PARAMETER);
+            outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+        }
+        else
+            iter->second->doMethod(methodName, inBuffer, outBuffer);
     }
 
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
-    SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+    FieldTable           ft;
+    FieldTable::ValuePtr value;
+
+    moveNewObjectsLH();
+
+    ft.decode(inBuffer);
+    value = ft.get("_class");
+    if (value.get() == 0 || !value->convertsTo<string>())
+    {
+        // TODO: Send completion with an error code
+        return;
+    }
+
+    string className(value->get<string>());
+
+    for (ManagementObjectMap::iterator iter = managementObjects.begin();
+         iter != managementObjects.end();
+         iter++)
+    {
+        ManagementObject* object = iter->second;
+        if (object->getClassName() == className)
+        {
+            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+            uint32_t outLen;
+
+            EncodeHeader(outBuffer, 'g', sequence);
+            object->writeProperties(outBuffer);
+            object->writeStatistics(outBuffer, true);
+            outLen = MA_BUFFER_SIZE - outBuffer.available();
+            outBuffer.reset();
+            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+        }
+    }
+
+    sendCommandComplete(replyTo, sequence);
+}
+
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+    if (extThread) {
+        Mutex::ScopedLock lock(agentLock);
+        string body;
+
+        inBuffer.getRawData(body, inBuffer.available());
+        methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+        write(writeFd, "X", 1);
+    } else {
+        invokeMethodRequest(inBuffer, sequence, replyTo);
+    }
 }
 
 void ManagementAgentImpl::received(Message& msg)
@@ -287,103 +419,86 @@
         replyToKey = rt.getRoutingKey();
     }
 
-    if (CheckHeader (inBuffer, &opcode, &sequence))
+    if (CheckHeader(inBuffer, &opcode, &sequence))
     {
         if      (opcode == 'a') handleAttachResponse(inBuffer);
         else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
         else if (opcode == 'x') handleConsoleAddedIndication();
+        else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
         else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
     }
 }
 
-void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
 {
-    buf.putOctet ('A');
-    buf.putOctet ('M');
-    buf.putOctet ('1');
-    buf.putOctet (opcode);
-    buf.putLong  (seq);
+    buf.putOctet('A');
+    buf.putOctet('M');
+    buf.putOctet('1');
+    buf.putOctet(opcode);
+    buf.putLong (seq);
 }
 
-bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
 {
     if (buf.getSize() < 8)
         return false;
 
-    uint8_t h1 = buf.getOctet ();
-    uint8_t h2 = buf.getOctet ();
-    uint8_t h3 = buf.getOctet ();
+    uint8_t h1 = buf.getOctet();
+    uint8_t h2 = buf.getOctet();
+    uint8_t h3 = buf.getOctet();
 
-    *opcode = buf.getOctet ();
-    *seq    = buf.getLong ();
+    *opcode = buf.getOctet();
+    *seq    = buf.getLong();
 
     return h1 == 'A' && h2 == 'M' && h3 == '1';
 }
 
-void ManagementAgentImpl::SendBuffer (Buffer&  buf,
-                                      uint32_t length,
-                                      string   exchange,
-                                      string   routingKey)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name)
 {
-    Message msg;
-    string  data;
-
-    if (objIdPrefix == 0)
-        return;
-
-    buf.getRawData(data, length);
-    msg.getDeliveryProperties().setRoutingKey(routingKey);
-    msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
-    msg.setData (data);
-    session.messageTransfer (arg::content=msg, arg::destination=exchange);
-}
-
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
-{
-    PackageMap::iterator pIter = packages.find (name);
-    if (pIter != packages.end ())
+    PackageMap::iterator pIter = packages.find(name);
+    if (pIter != packages.end())
         return pIter;
 
     // No such package found, create a new map entry.
     std::pair<PackageMap::iterator, bool> result =
-        packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+        packages.insert(std::pair<string, ClassMap>(name, ClassMap()));
 
     // Publish a package-indication message
-    Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'p');
-    EncodePackageIndication (outBuffer, result.first);
-    outLen = MA_BUFFER_SIZE - outBuffer.available ();
-    outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+    EncodeHeader(outBuffer, 'p');
+    EncodePackageIndication(outBuffer, result.first);
+    outLen = MA_BUFFER_SIZE - outBuffer.available();
+    outBuffer.reset();
+    connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package");
 
     return result.first;
 }
 
 void ManagementAgentImpl::moveNewObjectsLH()
 {
-    Mutex::ScopedLock lock (addLock);
-    for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
-         iter != newManagementObjects.end ();
+    Mutex::ScopedLock lock(addLock);
+    for (ManagementObjectMap::iterator iter = newManagementObjects.begin();
+         iter != newManagementObjects.end();
          iter++)
         managementObjects[iter->first] = iter->second;
     newManagementObjects.clear();
 }
 
-void ManagementAgentImpl::AddClassLocal (PackageMap::iterator  pIter,
-                                         string                className,
-                                         uint8_t*              md5Sum,
-                                         management::ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementAgentImpl::AddClassLocal(PackageMap::iterator  pIter,
+                                        string                className,
+                                        uint8_t*              md5Sum,
+                                        management::ManagementObject::writeSchemaCall_t schemaCall)
 {
     SchemaClassKey key;
     ClassMap&      cMap = pIter->second;
 
     key.name = className;
-    memcpy (&key.hash, md5Sum, 16);
+    memcpy(&key.hash, md5Sum, 16);
 
-    ClassMap::iterator cIter = cMap.find (key);
-    if (cIter != cMap.end ())
+    ClassMap::iterator cIter = cMap.find(key);
+    if (cIter != cMap.end())
         return;
 
     // No such class found, create a new class with local information.
@@ -395,21 +510,21 @@
     // TODO: Publish a class-indication message
 }
 
-void ManagementAgentImpl::EncodePackageIndication (Buffer&              buf,
-                                                   PackageMap::iterator pIter)
+void ManagementAgentImpl::EncodePackageIndication(Buffer&              buf,
+                                                  PackageMap::iterator pIter)
 {
-    buf.putShortString ((*pIter).first);
+    buf.putShortString((*pIter).first);
 }
 
-void ManagementAgentImpl::EncodeClassIndication (Buffer&              buf,
-                                                 PackageMap::iterator pIter,
-                                                 ClassMap::iterator   cIter)
+void ManagementAgentImpl::EncodeClassIndication(Buffer&              buf,
+                                                PackageMap::iterator pIter,
+                                                ClassMap::iterator   cIter)
 {
     SchemaClassKey key = (*cIter).first;
 
-    buf.putShortString ((*pIter).first);
-    buf.putShortString (key.name);
-    buf.putBin128      (key.hash);
+    buf.putShortString((*pIter).first);
+    buf.putShortString(key.name);
+    buf.putBin128     (key.hash);
 }
 
 void ManagementAgentImpl::PeriodicProcessing()
@@ -419,17 +534,17 @@
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
-    std::list<uint64_t> deleteList;
+    std::list<ObjectId> deleteList;
 
     {
         Buffer msgBuffer(msgChars, BUFSIZE);
         EncodeHeader(msgBuffer, 'h');
         msgBuffer.putLongLong(uint64_t(Duration(now())));
 
-        contentSize = BUFSIZE - msgBuffer.available ();
-        msgBuffer.reset ();
+        contentSize = BUFSIZE - msgBuffer.available();
+        msgBuffer.reset();
         routingKey = "mgmt." + systemId.str() + ".heartbeat";
-        SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
     }
 
     moveNewObjectsLH();
@@ -437,65 +552,171 @@
     if (clientWasAdded)
     {
         clientWasAdded = false;
-        for (ManagementObjectMap::iterator iter = managementObjects.begin ();
-             iter != managementObjects.end ();
+        for (ManagementObjectMap::iterator iter = managementObjects.begin();
+             iter != managementObjects.end();
              iter++)
         {
             ManagementObject* object = iter->second;
-            object->setAllChanged ();
+            object->setAllChanged();
         }
     }
 
-    if (managementObjects.empty ())
+    if (managementObjects.empty())
         return;
         
-    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
-         iter != managementObjects.end ();
+    for (ManagementObjectMap::iterator iter = managementObjects.begin();
+         iter != managementObjects.end();
          iter++)
     {
         ManagementObject* object = iter->second;
 
-        if (object->getConfigChanged () || object->isDeleted ())
+        if (object->getConfigChanged() || object->isDeleted())
         {
-            Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'c');
+            Buffer msgBuffer(msgChars, BUFSIZE);
+            EncodeHeader(msgBuffer, 'c');
             object->writeProperties(msgBuffer);
 
-            contentSize = BUFSIZE - msgBuffer.available ();
-            msgBuffer.reset ();
-            routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
-            SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+            contentSize = BUFSIZE - msgBuffer.available();
+            msgBuffer.reset();
+            routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName();
+            connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
         }
         
-        if (object->getInstChanged ())
+        if (object->getInstChanged())
         {
-            Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'i');
+            Buffer msgBuffer(msgChars, BUFSIZE);
+            EncodeHeader(msgBuffer, 'i');
             object->writeStatistics(msgBuffer);
 
-            contentSize = BUFSIZE - msgBuffer.available ();
-            msgBuffer.reset ();
-            routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
-            SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+            contentSize = BUFSIZE - msgBuffer.available();
+            msgBuffer.reset();
+            routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName();
+            connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
         }
 
-        if (object->isDeleted ())
-            deleteList.push_back (iter->first);
+        if (object->isDeleted())
+            deleteList.push_back(iter->first);
     }
 
     // Delete flagged objects
-    for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
-         iter != deleteList.rend ();
+    for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+         iter != deleteList.rend();
          iter++)
-        managementObjects.erase (*iter);
+        managementObjects.erase(*iter);
 
-    deleteList.clear ();
+    deleteList.clear();
 }
 
-void ManagementAgentImpl::BackgroundThread::run()
+void ManagementAgentImpl::ConnectionThread::run()
+{
+    static const int delayMin(1);
+    static const int delayMax(128);
+    static const int delayFactor(2);
+    int delay(delayMin);
+    string dest("qmfagent");
+
+    sessionId.generate();
+    queueName << "qmfagent-" << sessionId;
+
+    while (true) {
+        try {
+            if (!agent.host.empty()) {
+                connection.open(agent.host.c_str(), agent.port);
+                session = connection.newSession(queueName.str());
+                subscriptions = new client::SubscriptionManager(session);
+
+                session.queueDeclare(arg::queue=queueName.str());
+                session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+                                     arg::bindingKey=queueName.str());
+
+                subscriptions->subscribe(agent, queueName.str(), dest);
+                {
+                    Mutex::ScopedLock _lock(connLock);
+                    operational = true;
+                    agent.startProtocol();
+                    try {
+                        Mutex::ScopedUnlock _unlock(connLock);
+                        subscriptions->run();
+                    } catch (std::exception) {}
+
+                    operational = false;
+                }
+                delay = delayMin;
+                delete subscriptions;
+                subscriptions = 0;
+                session.close();
+            }
+        } catch (std::exception &e) {
+            if (delay < delayMax)
+                delay *= delayFactor;
+        }
+
+        ::sleep(delay);
+    }
+}
+
+ManagementAgentImpl::ConnectionThread::~ConnectionThread()
+{
+    if (subscriptions != 0) {
+        delete subscriptions;
+    }
+}
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer&  buf,
+                                                       uint32_t length,
+                                                       string   exchange,
+                                                       string   routingKey)
+{
+    {
+        Mutex::ScopedLock _lock(connLock);
+        if (!operational)
+            return;
+    }
+
+    Message msg;
+    string  data;
+
+    buf.getRawData(data, length);
+    msg.getDeliveryProperties().setRoutingKey(routingKey);
+    msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+    msg.setData(data);
+    session.messageTransfer(arg::content=msg, arg::destination=exchange);
+}
+
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
+{
+    stringstream key;
+    key << "agent." << agentBank;
+    session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
+                          arg::bindingKey=key.str());
+}
+
+
+void ManagementAgentImpl::PublishThread::run()
 {
     while (true) {
         ::sleep(5);
         agent.PeriodicProcessing();
     }
 }
+
+Mutex& ManagementAgentImpl::getMutex()
+{
+    return agentLock;
+}
+
+Buffer* ManagementAgentImpl::startEventLH()
+{
+    Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+    EncodeHeader(*outBuffer, 'e');
+    outBuffer->putLongLong(uint64_t(Duration(now())));
+    return outBuffer;
+}
+
+void ManagementAgentImpl::finishEventLH(Buffer* outBuffer)
+{
+    uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+    outBuffer->reset();
+    connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event");
+    delete outBuffer;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Sep  3 11:01:44 2008
@@ -22,7 +22,7 @@
 
 #include "ManagementAgent.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/Message.h"
@@ -30,10 +30,10 @@
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
 #include "qpid/framing/Uuid.h"
 #include <iostream>
 #include <sstream>
+#include <deque>
 
 namespace qpid { 
 namespace management {
@@ -49,14 +49,14 @@
     void init(std::string brokerHost        = "localhost",
               uint16_t    brokerPort        = 5672,
               uint16_t    intervalSeconds   = 10,
-              bool        useExternalThread = false);
+              bool        useExternalThread = false,
+              std::string storeFile         = "");
     void RegisterClass(std::string packageName,
                        std::string className,
                        uint8_t*    md5Sum,
                        management::ManagementObject::writeSchemaCall_t schemaCall);
-    uint64_t addObject     (management::ManagementObject* objectPtr,
-                            uint32_t          persistId   = 0,
-                            uint32_t          persistBank = 4);
+    ObjectId addObject     (management::ManagementObject* objectPtr,
+                            uint64_t          persistId = 0);
     uint32_t pollCallbacks (uint32_t callLimit = 0);
     int      getSignalFd   (void);
 
@@ -64,14 +64,12 @@
 
   private:
 
-    struct SchemaClassKey
-    {
+    struct SchemaClassKey {
         std::string name;
         uint8_t     hash[16];
     };
 
-    struct SchemaClassKeyComp
-    {
+    struct SchemaClassKeyComp {
         bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
         {
             if (lhs.name != rhs.name)
@@ -84,53 +82,95 @@
         }
     };
 
-    struct SchemaClass
-    {
+    struct SchemaClass {
         management::ManagementObject::writeSchemaCall_t writeSchemaCall;
 
         SchemaClass () : writeSchemaCall(0) {}
     };
 
+    struct QueuedMethod {
+        QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
+            sequence(_seq), replyTo(_reply), body(_body) {}
+
+        uint32_t    sequence;
+        std::string replyTo;
+        std::string body;
+    };
+
+    typedef std::deque<QueuedMethod*> MethodQueue;
     typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
     typedef std::map<std::string, ClassMap> PackageMap;
 
     PackageMap                       packages;
+    AgentAttachment                  attachment;
     management::ManagementObjectMap  managementObjects;
     management::ManagementObjectMap  newManagementObjects;
+    MethodQueue                      methodQueue;
 
     void received (client::Message& msg);
 
     uint16_t          interval;
     bool              extThread;
+    int               writeFd;
+    int               readFd;
     uint64_t          nextObjectId;
+    std::string       storeFile;
     sys::Mutex        agentLock;
     sys::Mutex        addLock;
-    framing::Uuid     sessionId;
     framing::Uuid     systemId;
+    std::string       host;
+    uint16_t          port;
 
-    int signalFdIn, signalFdOut;
-    client::Connection   connection;
-    client::Session      session;
-    client::Dispatcher*  dispatcher;
     bool                 clientWasAdded;
-    uint64_t    objIdPrefix;
-    std::stringstream queueName;
+    uint32_t             requestedBank;
+    uint32_t             assignedBank;
+    uint32_t             brokerBank;
+    uint16_t             bootSequence;
 #   define MA_BUFFER_SIZE 65536
     char outputBuffer[MA_BUFFER_SIZE];
+    char eventBuffer[MA_BUFFER_SIZE];
 
-    class BackgroundThread : public sys::Runnable
+    friend class ConnectionThread;
+    class ConnectionThread : public sys::Runnable
     {
+        bool operational;
         ManagementAgentImpl& agent;
+        framing::Uuid        sessionId;
+        client::Connection   connection;
+        client::Session      session;
+        client::SubscriptionManager* subscriptions;
+        std::stringstream queueName;
+        sys::Mutex        connLock;
         void run();
     public:
-        BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+        ConnectionThread(ManagementAgentImpl& _agent) :
+            operational(false), agent(_agent), subscriptions(0) {}
+        ~ConnectionThread();
+        void sendBuffer(qpid::framing::Buffer& buf,
+                        uint32_t               length,
+                        std::string            exchange,
+                        std::string            routingKey);
+        void bindToBank(uint32_t agentBank);
     };
 
-    BackgroundThread bgThread;
-    sys::Thread      thread;
-    sys::Condition   startupCond;
-    bool             startupWait;
+    class PublishThread : public sys::Runnable
+    {
+        ManagementAgentImpl& agent;
+        void run();
+    public:
+        PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+    };
 
+    ConnectionThread connThreadBody;
+    sys::Thread      connThread;
+    PublishThread    pubThreadBody;
+    sys::Thread      pubThread;
+
+    static const std::string storeMagicNumber;
+
+    void startProtocol();
+    void storeData(bool requested=false);
+    void retrieveData();
     PackageMap::iterator FindOrAddPackage (std::string name);
     void moveNewObjectsLH();
     void AddClassLocal (PackageMap::iterator  pIter,
@@ -144,16 +184,19 @@
                                 ClassMap::iterator     cIter);
     void EncodeHeader (qpid::framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
     bool CheckHeader  (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
-    void SendBuffer         (qpid::framing::Buffer&  buf,
-                             uint32_t                length,
-                             std::string             exchange,
-                             std::string             routingKey);
+    void sendCommandComplete  (std::string replyToKey, uint32_t sequence,
+                               uint32_t code = 0, std::string text = std::string("OK"));
     void handleAttachResponse (qpid::framing::Buffer& inBuffer);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
     void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+    void invokeMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+    void handleGetQuery       (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
     void handleMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
     void handleConsoleAddedIndication();
+    sys::Mutex& getMutex();
+    framing::Buffer* startEventLH();
+    void finishEventLH(framing::Buffer* outBuffer);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AclModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AclModule.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AclModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AclModule.h Wed Sep  3 11:01:44 2008
@@ -33,9 +33,11 @@
 
 namespace acl {
 
-enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE,OBJECTSIZE};
-enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, UPDATE, ACTIONSIZE};
-enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, QUEUENAME};
+enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE, METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list
+enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE,
+             UPDATE, ACTIONSIZE}; // ACTIONSIZE must be last in list
+enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE,
+               QUEUENAME, SCHEMAPACKAGE, SCHEMACLASS};
 enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG};	
 
 } // namespace acl
@@ -74,6 +76,7 @@
         if (str.compare("broker") == 0) return BROKER;
         if (str.compare("link") == 0) return LINK;
         if (str.compare("route") == 0) return ROUTE;
+        if (str.compare("method") == 0) return METHOD;
         throw str;
     }
     static inline std::string getObjectTypeStr(const ObjectType o) {
@@ -83,6 +86,7 @@
           case BROKER: return "broker";
           case LINK: return "link";
           case ROUTE: return "route";
+          case METHOD: return "method";
           default: assert(false); // should never get here
         }
     }
@@ -123,6 +127,8 @@
         if (str.compare("type") == 0) return TYPE;
         if (str.compare("alternate") == 0) return ALTERNATE;
         if (str.compare("queuename") == 0) return QUEUENAME;
+        if (str.compare("schemapackage") == 0) return SCHEMAPACKAGE;
+        if (str.compare("schemaclass") == 0) return SCHEMACLASS;
         throw str;
     }
     static inline std::string getPropertyStr(const Property p) {
@@ -137,6 +143,8 @@
           case TYPE: return "type";
           case ALTERNATE: return "alternate";
           case QUEUENAME: return "queuename";
+          case SCHEMAPACKAGE: return "schemapackage";
+          case SCHEMACLASS: return "schemaclass";
           default: assert(false); // should never get here
         }
     }
@@ -231,6 +239,17 @@
         a3->insert(actionPair(DELETE,  p0));
         
         map->insert(objectPair(ROUTE, a3));
+
+        // == Method ==
+
+        propSetPtr p5(new propSet);
+        p5->insert(SCHEMAPACKAGE);
+        p5->insert(SCHEMACLASS);
+
+        actionMapPtr a4(new actionMap);
+        a4->insert(actionPair(ACCESS, p5));
+
+        map->insert(objectPair(METHOD, a4));
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Sep  3 11:01:44 2008
@@ -158,10 +158,12 @@
         mgmtObject->set_stagingThreshold (conf.stagingThreshold);
         mgmtObject->set_mgmtPubInterval  (conf.mgmtPubInterval);
         mgmtObject->set_version          (PACKAGE_VERSION);
-        mgmtObject->set_dataDirEnabled   (dataDir.isEnabled ());
-        mgmtObject->set_dataDir          (dataDir.getPath ());
+        if (dataDir.isEnabled())
+            mgmtObject->set_dataDir(dataDir.getPath());
+        else
+            mgmtObject->clr_dataDir();
         
-        managementAgent->addObject (mgmtObject, 2, 1);
+        managementAgent->addObject (mgmtObject, 0x1000000000000002LL);
 
         // Since there is currently no support for virtual hosts, a placeholder object
         // representing the implied single virtual host is added here to keep the

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Sep  3 11:01:44 2008
@@ -57,9 +57,9 @@
             mgmtExchange = new management::Exchange (agent, this, parent, _name, durable);
             if (!durable) {
                 if (name == "")
-                    agent->addObject (mgmtExchange, 4, 1);  // Special default exchange ID
+                    agent->addObject (mgmtExchange, 0x1000000000000004LL);  // Special default exchange ID
                 else if (name == "qpid.management")
-                    agent->addObject (mgmtExchange, 5, 1);  // Special management exchange ID
+                    agent->addObject (mgmtExchange, 0x1000000000000005LL);  // Special management exchange ID
                 else
                     agent->addObject (mgmtExchange);
             }
@@ -78,7 +78,7 @@
     if (mgmtExchange != 0 && persistenceId == 0)
     {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-        agent->addObject (mgmtExchange, id, 2);
+        agent->addObject (mgmtExchange, 0x2000000000000000LL + id);
     }
     persistenceId = id;
 }
@@ -130,7 +130,7 @@
             ManagementObject* mo = queue->GetManagementObject();
             if (mo != 0)
             {
-                uint64_t queueId = mo->getObjectId();
+                management::ObjectId queueId = mo->getObjectId();
                 mgmtBinding = new management::Binding (agent, this, (Manageable*) parent, queueId, key, args);
                 agent->addObject (mgmtBinding);
             }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Sep  3 11:01:44 2008
@@ -618,7 +618,7 @@
     if (mgmtObject != 0 && persistenceId == 0)
     {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-        agent->addObject (mgmtObject, _persistenceId, 3);
+        agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId);
 
         if (externalQueueStore) {
             ManagementObject* childObj = externalQueueStore->GetManagementObject();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp Wed Sep  3 11:01:44 2008
@@ -73,7 +73,7 @@
             mgmtObject->set_machine  (std::string (_uname.machine));
         }
 
-        agent->addObject (mgmtObject, 1, 1);
+        agent->addObject (mgmtObject, 0x1000000000000001LL);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Wed Sep  3 11:01:44 2008
@@ -32,7 +32,7 @@
         if (agent != 0)
         {
             mgmtObject = new management::Vhost (agent, this, parentBroker, "/");
-            agent->addObject (mgmtObject, 3, 1);
+            agent->addObject (mgmtObject, 0x1000000000000003LL);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h Wed Sep  3 11:01:44 2008
@@ -58,6 +58,7 @@
     int count() const;
     void set(const std::string& name, const ValuePtr& value);
     ValuePtr get(const std::string& name) const;
+    bool isSet(const std::string& name) const { return get(name).get() != 0; }
 
     void setString(const std::string& name, const std::string& value);
     void setInt(const std::string& name, int value);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.cpp Wed Sep  3 11:01:44 2008
@@ -31,6 +31,7 @@
     case STATUS_NOT_IMPLEMENTED         : return "NotImplemented";
     case STATUS_INVALID_PARAMETER       : return "InvalidParameter";
     case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
+    case STATUS_FORBIDDEN               : return "Forbidden";
     }
 
     return "??";

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h Wed Sep  3 11:01:44 2008
@@ -44,6 +44,7 @@
     static const status_t STATUS_NOT_IMPLEMENTED         = 3;
     static const status_t STATUS_INVALID_PARAMETER       = 4;
     static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
+    static const status_t STATUS_FORBIDDEN               = 6;
 
     //  Every "Manageable" object must hold a reference to exactly one
     //  management object.  This object is always of a class derived from



Mime
View raw message