qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gmur...@apache.org
Subject [3/3] qpid-dispatch git commit: DISPATCH-437 - Added API around the agent
Date Tue, 02 Aug 2016 13:39:34 GMT
DISPATCH-437 - Added API around the agent


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2bcb2f76
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2bcb2f76
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2bcb2f76

Branch: refs/heads/DISPATCH-437-1
Commit: 2bcb2f766a5667767b6aa8ee7b1073f0b5f542fd
Parents: e2ad8da
Author: Ganesh Murthy <gmurthy@redhat.com>
Authored: Tue Aug 2 09:37:23 2016 -0400
Committer: Ganesh Murthy <gmurthy@redhat.com>
Committed: Tue Aug 2 09:37:23 2016 -0400

----------------------------------------------------------------------
 src/CMakeLists.txt           |   6 +-
 src/agent.c                  | 375 +++++++++++++++++++++++++++++++
 src/agent.h                  |  62 ++++++
 src/agent_private.h          |  53 +++++
 src/router_core/agent.c      | 457 --------------------------------------
 src/router_core/agent_core.c | 457 ++++++++++++++++++++++++++++++++++++++
 6 files changed, 950 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3a78801..7367e96 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -65,7 +65,7 @@ set(qpid_dispatch_SOURCES
   python_embedded.c
   router_agent.c
   router_config.c
-  router_core/agent.c
+  router_core/agent_core.c
   router_core/agent_address.c
   router_core/agent_config_address.c
   router_core/agent_config_auto_link.c
@@ -83,7 +83,7 @@ set(qpid_dispatch_SOURCES
   router_core/transfer.c
   router_node.c
   router_pynode.c
-  agent_adapter.c
+  agent.c
   schema_enum.c
   server.c
   timer.c
@@ -95,7 +95,7 @@ if(USE_MEMORY_POOL)
 endif()
 
 set_property(
-  SOURCE python_embedded.c router_pynode.c agent_adapter.c
+  SOURCE python_embedded.c router_pynode.c agent.c
   PROPERTY COMPILE_FLAGS -Wno-strict-aliasing
   )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent.c
----------------------------------------------------------------------
diff --git a/src/agent.c b/src/agent.c
new file mode 100644
index 0000000..d4788f6
--- /dev/null
+++ b/src/agent.c
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/python_embedded.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include "agent.h"
+#include "agent_private.h"
+#include "schema_enum.h"
+
+#define MANAGEMENT_MODULE "qpid_dispatch_internal.management"
+
+typedef struct {
+    PyObject_HEAD
+    qd_agent_t *agent;
+
+} AgentRequestAdapter;
+
+/**
+ * Declare all the methods in the AgentRequestAdapter.
+ * post_management_request is the name of the method that the python side would call and qd_post_management_request is the C implementation
+ * of the function.
+ */
+static PyMethodDef AgentRequestAdapter_functions[] = {
+    {"post_management_request", qd_post_management_request, METH_VARARGS, "Posts a management request to a work queue"},
+    {0, 0, 0, 0} // <-- Not sure why we need this
+};
+
+static PyTypeObject AgentRequestAdapterType = {
+    PyObject_HEAD_INIT(0)
+    0,                              /* ob_size*/
+    MANAGEMENT_MODULE ".AgentRequestAdapter",  /* tp_name*/
+    sizeof(AgentRequestAdapter),    /* tp_basicsize*/
+    0,                              /* tp_itemsize*/
+    0,                              /* tp_dealloc*/
+    0,                              /* tp_print*/
+    0,                              /* tp_getattr*/
+    0,                              /* tp_setattr*/
+    0,                              /* tp_compare*/
+    0,                              /* tp_repr*/
+    0,                              /* tp_as_number*/
+    0,                              /* tp_as_sequence*/
+    0,                              /* tp_as_mapping*/
+    0,                              /* tp_hash */
+    0,                              /* tp_call*/
+    0,                              /* tp_str*/
+    0,                              /* tp_getattro*/
+    0,                              /* tp_setattro*/
+    0,                              /* tp_as_buffer*/
+    Py_TPFLAGS_DEFAULT,             /* tp_flags*/
+    "Agent request Adapter",        /* tp_doc */
+    0,                              /* tp_traverse */
+    0,                              /* tp_clear */
+    0,                              /* tp_richcompare */
+    0,                              /* tp_weaklistoffset */
+    0,                              /* tp_iter */
+    0,                              /* tp_iternext */
+    AgentRequestAdapter_functions,  /* tp_methods */
+    0,                              /* tp_members */
+    0,                              /* tp_getset */
+    0,                              /* tp_base */
+    0,                              /* tp_dict */
+    0,                              /* tp_descr_get */
+    0,                              /* tp_descr_set */
+    0,                              /* tp_dictoffset */
+    0,                              /* tp_init */
+    0,                              /* tp_alloc */
+    0,                              /* tp_new */
+    0,                              /* tp_free */
+    0,                              /* tp_is_gc */
+    0,                              /* tp_bases */
+    0,                              /* tp_mro */
+    0,                              /* tp_cache */
+    0,                              /* tp_subclasses */
+    0,                              /* tp_weaklist */
+    0,                              /* tp_del */
+    0                               /* tp_version_tag */
+};
+
+PyObject* qd_agent_init(char *agentClass, char *address, PyObject *pythonManagementModule, const char *config_path)
+{
+    // Create a new instance of AgentRequestAdapterType
+    AgentRequestAdapterType.tp_new = PyType_GenericNew;
+    PyType_Ready(&AgentRequestAdapterType);
+
+    // Load the qpid_dispatch_internal.management Python module
+
+    if (!pythonManagementModule) {
+        qd_error_py();
+        qd_log(log_source, QD_LOG_CRITICAL, "Cannot load dispatch extension module '%s'", MANAGEMENT_MODULE);
+        abort();
+    }
+
+    PyTypeObject *agentRequestAdapterType = &AgentRequestAdapterType;
+    Py_INCREF(agentRequestAdapterType);
+
+    //Use the "AgentRequestAdapter" name to add the AgentRequestAdapterType to the management
+    PyModule_AddObject(pythonManagementModule, "AgentRequestAdapter", (PyObject*) &AgentRequestAdapterType);
+    // Now we have added AgentRequestAdapter to the qpid_dispatch_internal.management python module
+
+    PyObject *adapterType     = PyObject_GetAttrString(pythonManagementModule, "AgentRequestAdapter");
+    PyObject * adapterInstance = PyObject_CallObject(adapterType, 0);
+    adapter = ((AgentRequestAdapter*) adapterInstance);
+    ((AgentRequestAdapter*) adapterInstance)->log_source = qd_log_source("AGENT");
+    qd_management_work_list_t  work_queue = 0;
+    DEQ_INIT(work_queue);
+    ((AgentRequestAdapter*) adapterInstance)->work_queue = work_queue;
+    ((AgentRequestAdapter*) adapterInstance)->lock = sys_mutex();
+    initialize_handlers(((AgentRequestAdapter*) adapterInstance)->handlers);
+
+    //Instantiate the ManagementAgent class found in qpid_dispatch_internal/management/agent.py
+    PyObject* pClass = PyObject_GetAttrString(pythonManagementModule, agentClass);
+
+    //
+    // Constructor Arguments for ManagementAgent
+    //
+    PyObject* pArgs = PyTuple_New(3);
+
+   // arg 0: management address $management
+   PyObject *address = PyString_FromString(address);
+   PyTuple_SetItem(pArgs, 0, address);
+
+   // arg 1: adapter instance
+   PyTuple_SetItem(pArgs, 1, adapterInstance);
+
+   // arg 2: config file location
+   PyObject *config_file = PyString_FromString(config_path);
+   PyTuple_SetItem(pArgs, 2, config_file);
+
+   //
+   // Instantiate the ManagementAgent class
+   //
+   PyObject* pyManagementInstance = PyInstance_New(pClass, pArgs, 0);
+   if (pyManagementInstance) {}
+   Py_DECREF(pArgs);
+   Py_DECREF(adapterType);
+   Py_DECREF(pythonManagementModule);
+
+   //TODO - should I return an adapter or an instance of the entire management agent object?
+   return adapterInstance;
+}
+
+/***
+ * Adds a management
+ */
+static PyObject *qd_post_management_request(PyObject *self, //TODO - Do we need so many arguments or can I just pass a list with everything in it?
+                                            PyObject *arg1, // Operation(CRUDQ) to be performed.
+                                            PyObject *arg2, // Entity type
+                                            PyObject *arg3, // count
+                                            PyObject *arg4, // offset
+                                            PyObject *arg5, // Correlation-id
+                                            PyObject *arg6, // Reply to
+                                            PyObject *arg7, // Name
+                                            PyObject *arg8, // identity
+                                            PyObject *arg9) // Request body
+{
+    int operation;    //Is this a CREATE, READ, UPDATE, DELETE or QUERY
+    int entity_type;  // Is this a listener or connector or address.... etc.
+    int count = 0;        // used for queries only
+    int offset = 0;       //used for queries only
+    PyObject *cid      = 0;
+    PyObject *reply_to = 0;
+    PyObject *name     = 0;
+    PyObject *identity = 0;
+    PyObject *body     = 0;
+
+    if (!PyArg_ParseTuple(arg1, "i", &operation))
+        return 0;
+    if (!PyArg_ParseTuple(arg2, "i", &entity_type))
+        return 0;
+    if (!PyArg_ParseTuple(arg3, "i", &count))
+        return 0;
+    if (!PyArg_ParseTuple(arg4, "i", &offset))
+        return 0;
+    if (!PyArg_ParseTuple(arg3, "o", &cid))
+        return 0;
+    if (!PyArg_ParseTuple(arg4, "o", &reply_to))
+        return 0;
+    if (!PyArg_ParseTuple(arg5, "o", &name))
+        return 0;
+    if (!PyArg_ParseTuple(arg6, "o", &identity))
+        return 0;
+    if (!PyArg_ParseTuple(arg7, "o", &body))
+        return 0;
+
+    //
+    // correlation id
+    //
+    qd_composed_field_t *cid_field = qd_compose_subfield(0);
+    qd_py_to_composed(body, cid_field);
+    qd_buffer_list_t cid_buffers = qd_compose_buffers(cid_field);
+    // TODO - this is not correct. what if the buffer length is more than 512?
+    qd_buffer_t buffer = DEQ_HEAD(cid_buffers);
+    qd_field_iterator_t cid_iter = qd_address_iterator_buffer(buffer, 0, qd_buffer_list_length(cid_buffers), ITER_VIEW_ALL);
+
+
+    qd_composed_field_t *reply_to_field = qd_compose_subfield(0);
+    qd_py_to_composed(body, reply_to_field);
+    qd_buffer_list_t reply_to_buffers = qd_compose_buffers(reply_to_field);
+    // TODO - this is not correct. what if the buffer length is more than 512?
+    qd_field_iterator_t reply_to_iter = qd_address_iterator_buffer(DEQ_HEAD(reply_to_buffers), 0, qd_buffer_list_length(reply_to_buffers), ITER_VIEW_ALL);
+
+    qd_composed_field_t *identity_field = qd_compose_subfield(0);
+    qd_py_to_composed(body, identity_field);
+    qd_buffer_list_t identity_buffers = qd_compose_buffers(identity_field);
+    // TODO - this is not correct. what if the buffer length is more than 512?
+    qd_field_iterator_t identity_iter = qd_address_iterator_buffer(DEQ_HEAD(identity_buffers), 0, qd_buffer_list_length(identity_buffers), ITER_VIEW_ALL);
+
+    qd_composed_field_t *name_field = qd_compose_subfield(0);
+    qd_py_to_composed(body, name_field);
+    qd_buffer_list_t name_buffers = qd_compose_buffers(name_field);
+    // TODO - this is not correct. what if the buffer length is more than 512?
+    qd_field_iterator_t name_iter = qd_address_iterator_buffer(DEQ_HEAD(name_buffers), 0, qd_buffer_list_length(name_buffers), ITER_VIEW_ALL);
+
+
+    qd_composed_field_t *body_field = qd_compose_subfield(0);
+    qd_py_to_composed(body, body_field);
+    qd_buffer_list_t body_buffers = qd_compose_buffers(body_field);
+    // TODO - this is not correct. what if the buffer length is more than 512?
+    qd_field_iterator_t body_iter = qd_address_iterator_buffer(DEQ_HEAD(body_buffers), 0, qd_buffer_list_length(body_buffers), ITER_VIEW_ALL);
+
+
+    qd_entity_type_handler_t handler = qd_agent_handler_for_type(entity_type);
+
+    //
+    // Create a work item (qd_management_work_item_t)
+    //
+    qd_management_work_item_t work_item = NEW(qd_management_work_item_t);
+    work_item->count          = count;
+    work_item->offset         = offset;
+    work_item->operation      = operation;
+    work_item->entity_type    = entity_type;
+    work_item->ctx            = handler->ctx;
+    work_item->reply_to       = reply_to_iter;
+    work_item->correlation_id = cid_iter;
+    work_item->identity_iter  = identity_iter;
+    work_item->name_iter      = name_iter;
+    work_item->in_body        = body_iter;
+
+    //
+    // Add work item to the work item list after locking the work item list
+    //
+    sys_mutex_lock(adapter->lock);
+    DEQ_INSERT_TAIL(adapter->work_queue, work_item);
+    sys_mutex_unlock(adapter->lock);
+
+    //
+    // TODO - Kick off processing of the work queue
+    //
+    return Py_None;
+}
+
+
+void qd_register_handlers(void *ctx,
+                          PyObject *pyAdapter,
+                          qd_schema_entity_type_t entity_type,
+                          qd_agent_handler_t create_handler,
+                          qd_agent_handler_t read_handler,
+                          qd_agent_handler_t update_handler,
+                          qd_agent_handler_t delete_handler,
+                          qd_agent_handler_t query_handler)
+{
+    AgentRequestAdapter* adapter = ((AgentRequestAdapter*) pyAdapter);
+    qd_entity_type_handler_t entity_handler = NEW(qd_entity_type_handler_t);
+    entity_handler->delete_handler = delete_handler;
+    entity_handler->update_handler = update_handler;
+    entity_handler->query_handler  = query_handler;
+    entity_handler->create_handler = create_handler;
+    entity_handler->read_handler   = read_handler;
+
+    //Store the entity_handler in the appropriate cell of the handler array index by the enum qd_schema_entity_type_t
+    adapter->handlers[entity_type] = entity_handler;
+
+}
+
+static qd_entity_type_handler_t *qd_agent_handler_for_type(qd_schema_entity_type_t entity_type, AgentRequestAdapter* adapter)
+{
+    return adapter->handlers[entity_type];
+}
+
+static void initialize_handlers(AgentRequestAdapter* adapter)
+{
+    for (int i=0; i < QD_SCHEMA_ENTITY_TYPE_ENUM_COUNT; i++)
+    {
+            adapter->handlers[i] = 0;
+    }
+}
+
+
+static process_work_queue(qd_management_work_list_t  work_queue, AgentRequestAdapter* adapter)
+{
+    qd_management_work_item_t work_item = DEQ_HEAD(work_queue);
+
+    qd_entity_type_handler_t handler = qd_agent_handler_for_type(work_item->entity_type, adapter);
+
+    //TODO - The following works well with core but no corresponding functions for non-core
+    while(work_item) {
+            switch (work_item->operation) {
+                case QD_SCHEMA_ENTITY_OPERATION_READ:
+                    handler->read_handler(work_item->ctx,
+                                          work_item->reply_to,
+                                          work_item->correlation_id,
+                                          work_item->entity_type,
+                                          work_item->operation,
+                                          work_item->identity_iter,
+                                          work_item->name_iter);
+                    break;
+                case QD_SCHEMA_ENTITY_OPERATION_DELETE:
+                    handler->delete_handler(work_item->ctx,
+                                            work_item->reply_to,
+                                            work_item->correlation_id,
+                                            work_item->entity_type,
+                                            work_item->operation,
+                                            work_item->identity_iter,
+                                            work_item->name_iter);
+                    break;
+                case QD_SCHEMA_ENTITY_OPERATION_CREATE:
+                    handler->create_handler(work_item->ctx,
+                                            work_item->reply_to,
+                                            work_item->correlation_id,
+                                            work_item->entity_type,
+                                            work_item->operation,
+                                            work_item->name_iter,
+                                            work_item->in_body);
+                    break;
+                case QD_SCHEMA_ENTITY_OPERATION_UPDATE:
+                    handler->update_handler(work_item->ctx,
+                                            work_item->reply_to,
+                                            work_item->correlation_id,
+                                            work_item->entity_type,
+                                            work_item->operation,
+                                            work_item->identity_iter,
+                                            work_item->name_iter,
+                                            work_item->in_body);
+                    break;
+                case QD_SCHEMA_ENTITY_OPERATION_QUERY:
+                    handler->query_handler(work_item->ctx,
+                                            work_item->reply_to,
+                                            work_item->correlation_id,
+                                            work_item->entity_type,
+                                            work_item->operation,
+                                            work_item->count,
+                                            work_item->offset,
+                                            work_item->in_body);
+                    break;
+            }
+
+            work_item = DEQ_NEXT(work_item);
+    }
+}
+
+
+PyObject* qd_agent_adapter_finalize(PyObject *adapter)
+{
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent.h
----------------------------------------------------------------------
diff --git a/src/agent.h b/src/agent.h
new file mode 100644
index 0000000..83a82d3
--- /dev/null
+++ b/src/agent.h
@@ -0,0 +1,62 @@
+#ifndef __agent_adapter_h__
+#define __agent_adapter_h__
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "schema_enum.h"
+#include "agent_private.h"
+
+typedef struct qd_agent_t qd_agent_t;
+typedef struct qd_agent_request_t qd_agent_request_t;
+
+/**
+ * Creates a new agent with the passed in address and whose configuration is located at config path.
+ * @see qd_agent_start to start the agent
+ */
+qd_agent_t* qd_agent(char *address, const char *config_path);
+
+/**
+ * Free the agent and its components
+ */
+void qd_agent_free(qd_agent_t *agent);
+
+
+typedef void (*qd_agent_handler_t) (void *context,
+                                    qd_agent_request_t *request);
+
+/**
+ * Register CRUDQ handlers for a particular entity type
+ */
+void qd_agent_register_handlers(qd_agent_t *agent,
+                                void *ctx,
+                                qd_schema_entity_type_t entity_type,
+                                qd_agent_handler_t create_handler,
+                                qd_agent_handler_t read_handler,
+                                qd_agent_handler_t update_handler,
+                                qd_agent_handler_t delete_handler,
+                                qd_agent_handler_t query_handler);
+
+/**
+ * Start the agent.
+ * Loads the contents of the config file located in config_path
+ * Agent starts listening on the provided address
+ */
+void qd_agent_start(qd_agent_t *agent);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent_private.h
----------------------------------------------------------------------
diff --git a/src/agent_private.h b/src/agent_private.h
new file mode 100644
index 0000000..96f0e26
--- /dev/null
+++ b/src/agent_private.h
@@ -0,0 +1,53 @@
+#ifndef __agent_private_h__
+#define __agent_private_h__
+
+#include "schema_enum.h"
+#include "ctools.h"
+
+typedef struct qd_management_work_item_t {
+    DEQ_LINKS(struct qd_management_work_item_t);
+    int                  operation;      //Is this a CREATE, READ, UPDATE, DELETE or QUERY
+    int                  entity_type;    // Is this a listener or connector or address.... etc.
+    int                  count;
+    int                  offset;
+    void                *ctx;
+    qd_field_iterator_t *reply_to;
+    qd_field_iterator_t *correlation_id;
+    qd_field_iterator_t *identity_iter;
+    qd_field_iterator_t *name_iter;
+    qd_parsed_field_t   *in_body;
+} qd_management_work_item_t;
+
+DEQ_DECLARE(qd_management_work_item_t, qd_management_work_list_t);
+
+typedef struct qd_entity_type_handler_t {
+    qd_schema_entity_type_t  entity_type;
+    void                    *ctx;
+    qd_agent_handler_t       create_handler;
+    qd_agent_handler_t       read_handler;
+    qd_agent_handler_t       update_handler;
+    qd_agent_handler_t       delete_handler;
+    qd_agent_handler_t       query_handler;
+} qd_entity_type_handler_t;
+
+struct qd_agent_t {
+    qd_management_work_list_t  work_queue;
+    sys_mutex_t               *lock;
+    qd_log_source_t           *log_source;
+    qd_entity_type_handler_t  *handlers[QD_SCHEMA_ENTITY_TYPE_ENUM_COUNT];
+};
+
+struct qd_agent_request_t {
+    qd_buffer_list_t        *list; // A buffer chain holding all the relevant information for the CRUDQ operations.
+    void                    *ctx;
+    int                      count;
+    int                      offset;
+    qd_field_iterator_t     *identity_iter;
+    qd_field_iterator_t     *name_iter;
+    qd_field_iterator_t     *reply_to;
+    qd_field_iterator_t     *correlation_id;
+    qd_router_entity_type_t  entity_type;
+    qd_parsed_field_t       *in_body;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
deleted file mode 100644
index 272b75c..0000000
--- a/src/router_core/agent.c
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qpid/dispatch/amqp.h>
-#include "agent_config_address.h"
-#include "agent_config_link_route.h"
-#include "agent_config_auto_link.h"
-#include "agent_address.h"
-#include "agent_link.h"
-#include "router_core_private.h"
-#include <stdio.h>
-
-static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-
-ALLOC_DECLARE(qdr_query_t);
-ALLOC_DEFINE(qdr_query_t);
-
-//==================================================================================
-// Internal Functions
-//==================================================================================
-
-static void qdr_agent_response_handler(void *context)
-{
-    qdr_core_t  *core = (qdr_core_t*) context;
-    qdr_query_t *query;
-    bool         done = false;
-
-    while (!done) {
-        sys_mutex_lock(core->query_lock);
-        query = DEQ_HEAD(core->outgoing_query_list);
-        if (query)
-            DEQ_REMOVE_HEAD(core->outgoing_query_list);
-        done = DEQ_SIZE(core->outgoing_query_list) == 0;
-        sys_mutex_unlock(core->query_lock);
-
-        if (query) {
-            bool more = query->more;
-            core->agent_response_handler(query->context, &query->status, more);
-            if (!more)
-                qdr_query_free(query);
-        }
-    }
-}
-
-
-void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
-{
-    sys_mutex_lock(core->query_lock);
-    DEQ_INSERT_TAIL(core->outgoing_query_list, query);
-    bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
-    sys_mutex_unlock(core->query_lock);
-
-    if (notify)
-        qd_timer_schedule(core->agent_timer, 0);
-}
-
-
-qdr_query_t *qdr_query(qdr_core_t              *core,
-                       void                    *context,
-                       qd_router_entity_type_t  type,
-                       qd_composed_field_t     *body)
-{
-    qdr_query_t *query = new_qdr_query_t();
-
-    DEQ_ITEM_INIT(query);
-    ZERO(query);
-    query->core        = core;
-    query->entity_type = type;
-    query->context     = context;
-    query->body        = body;
-    query->more        = false;
-
-    return query;
-}
-
-static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count);
-static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count);
-
-//==================================================================================
-// Interface Functions
-//==================================================================================
-
-void qdr_manage_create(qdr_core_t              *core,
-                       void                    *context,
-                       qd_router_entity_type_t  type,
-                       qd_field_iterator_t     *name,
-                       qd_parsed_field_t       *in_body,
-                       qd_composed_field_t     *out_body)
-{
-    qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create");
-
-    // Create a query object here
-    action->args.agent.query = qdr_query(core, context, type, out_body);
-    action->args.agent.name = name;
-    action->args.agent.in_body = in_body;
-
-    qdr_action_enqueue(core, action);
-}
-
-
-void qdr_manage_delete(qdr_core_t *core,
-                       void  *context,
-                       qd_router_entity_type_t  type,
-                       qd_field_iterator_t     *name,
-                       qd_field_iterator_t     *identity)
-{
-    qdr_action_t *action = qdr_action(qdr_manage_delete_CT, "manage_delete");
-
-    // Create a query object here
-    action->args.agent.query = qdr_query(core, context, type, 0);
-    action->args.agent.name = name;
-    action->args.agent.identity = identity;
-
-    qdr_action_enqueue(core, action);
-}
-
-
-void qdr_manage_read(qdr_core_t *core,
-                     void  *context,
-                     qd_router_entity_type_t  entity_type,
-                     qd_field_iterator_t     *name,
-                     qd_field_iterator_t     *identity,
-                     qd_composed_field_t     *body)
-{
-    qdr_action_t *action = qdr_action(qdr_manage_read_CT, "manage_read");
-
-    // Create a query object here
-    action->args.agent.query = qdr_query(core, context, entity_type, body);
-    action->args.agent.identity  = identity;
-    action->args.agent.name = name;
-
-    qdr_action_enqueue(core, action);
-}
-
-
-void qdr_manage_update(qdr_core_t              *core,
-                       void                    *context,
-                       qd_router_entity_type_t  type,
-                       qd_field_iterator_t     *name,
-                       qd_field_iterator_t     *identity,
-                       qd_parsed_field_t       *in_body,
-                       qd_composed_field_t     *out_body)
-{
-    qdr_action_t *action = qdr_action(qdr_manage_update_CT, "manage_update");
-    action->args.agent.query = qdr_query(core, context, type, out_body);
-    action->args.agent.name = name;
-    action->args.agent.identity = identity;
-    action->args.agent.in_body = in_body;
-
-    qdr_action_enqueue(core, action);
-}
-
-
-qdr_query_t *qdr_manage_query(qdr_core_t              *core,
-                              void                    *context,
-                              qd_router_entity_type_t  type,
-                              qd_parsed_field_t       *attribute_names,
-                              qd_composed_field_t     *body)
-{
-
-    qdr_query_t* query = qdr_query(core, context, type, body);
-
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    qdr_agent_set_columns(query, attribute_names, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT);  break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_set_columns(query, attribute_names, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT);  break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  qdr_agent_set_columns(query, attribute_names, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT);  break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT);  break;
-    case QD_ROUTER_ADDRESS:           qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
-    case QD_ROUTER_FORBIDDEN:         break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-    }
-
-    return query;
-}
-
-
-void qdr_query_add_attribute_names(qdr_query_t *query)
-{
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    qdr_agent_emit_columns(query, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_emit_columns(query, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  qdr_agent_emit_columns(query, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break;
-    case QD_ROUTER_ADDRESS:           qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
-    case QD_ROUTER_FORBIDDEN:         qd_compose_empty_list(query->body); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-    }
-}
-
-void qdr_query_get_first(qdr_query_t *query, int offset)
-{
-    qdr_action_t *action = qdr_action(qdrh_query_get_first_CT, "query_get_first");
-    action->args.agent.query  = query;
-    action->args.agent.offset = offset;
-    qdr_action_enqueue(query->core, action);
-}
-
-
-void qdr_query_get_next(qdr_query_t *query)
-{
-    qdr_action_t *action = qdr_action(qdrh_query_get_next_CT, "query_get_next");
-    action->args.agent.query = query;
-    qdr_action_enqueue(query->core, action);
-}
-
-
-void qdr_query_free(qdr_query_t *query)
-{
-    if (!query)
-        return;
-
-    if (query->next_key)
-        qdr_field_free(query->next_key);
-
-    free_qdr_query_t(query);
-}
-
-static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count)
-{
-    qd_compose_start_list(query->body);
-    int i = 0;
-    while (query->columns[i] >= 0) {
-        assert(query->columns[i] < column_count);
-        qd_compose_insert_string(query->body, qdr_columns[query->columns[i]]);
-        i++;
-    }
-    qd_compose_end_list(query->body);
-}
-
-static void qdr_agent_set_columns(qdr_query_t *query,
-                                  qd_parsed_field_t *attribute_names,
-                                  const char *qdr_columns[],
-                                  int column_count)
-{
-    if (!attribute_names ||
-        (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 &&
-         qd_parse_tag(attribute_names) != QD_AMQP_LIST32) ||
-        qd_parse_sub_count(attribute_names) == 0 ||
-        qd_parse_sub_count(attribute_names) >= QDR_AGENT_MAX_COLUMNS) {
-        //
-        // Either the attribute_names field is absent, it's not a list, or it's an empty list.
-        // In this case, we will include all available attributes.
-        //
-        int i;
-        for (i = 0; i < column_count; i++)
-            query->columns[i] = i;
-        query->columns[i] = -1;
-        assert(i < QDR_AGENT_MAX_COLUMNS);
-        return;
-    }
-
-    //
-    // We have a valid, non-empty attribute list.  Set the columns appropriately.
-    //
-    uint32_t count = qd_parse_sub_count(attribute_names);
-    uint32_t idx;
-
-    for (idx = 0; idx < count; idx++) {
-        qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx);
-        if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8))
-            query->columns[idx] = QDR_AGENT_COLUMN_NULL;
-        else {
-            int j = 0;
-            while (qdr_columns[j]) {
-                qd_field_iterator_t *iter = qd_parse_raw(name);
-                if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_columns[j])) {
-                    query->columns[idx] = j;
-                    break;
-                }
-                j+=1;
-            }
-        }
-    }
-    query->columns[idx+1] = -1;
-}
-
-
-
-void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler)
-{
-    core->agent_response_handler = response_handler;
-}
-
-
-//==================================================================================
-// In-Thread Functions
-//==================================================================================
-
-void qdr_agent_setup_CT(qdr_core_t *core)
-{
-    DEQ_INIT(core->outgoing_query_list);
-    core->query_lock  = sys_mutex();
-    core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core);
-}
-
-
-static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query)
-{
-    query->status = QD_AMQP_FORBIDDEN;
-    if (query->body && !op_query)
-        qd_compose_insert_null(query->body);
-    qdr_agent_enqueue_response_CT(core, query);
-}
-
-
-static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qd_field_iterator_t     *identity   = action->args.agent.identity;
-    qd_field_iterator_t     *name       = action->args.agent.name;
-    qdr_query_t             *query      = action->args.agent.query;
-
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_CT(core, name, identity, query, qdr_config_address_columns); break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_CT(core, name, identity, query, qdr_config_link_route_columns); break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_CT(core, name, identity, query, qdr_config_auto_link_columns); break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              break;
-    case QD_ROUTER_ADDRESS:           qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break;
-    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-   }
-}
-
-
-static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qd_field_iterator_t     *name       = action->args.agent.name;
-    qdr_query_t             *query      = action->args.agent.query;
-    qd_parsed_field_t       *in_body    = action->args.agent.in_body;
-
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_create_CT(core, name, query, in_body); break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_create_CT(core, name, query, in_body); break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_create_CT(core, name, query, in_body); break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              break;
-    case QD_ROUTER_ADDRESS:           break;
-    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-
-   }
-
-   qd_parse_free(in_body);
-}
-
-
-static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qd_field_iterator_t     *name       = action->args.agent.name;
-    qd_field_iterator_t     *identity   = action->args.agent.identity;
-    qdr_query_t             *query      = action->args.agent.query;
-
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_delete_CT(core, query, name, identity); break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, query, name, identity); break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_delete_CT(core, query, name, identity); break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              break;
-    case QD_ROUTER_ADDRESS:           break;
-    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-   }
-}
-
-static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qd_field_iterator_t     *identity   = action->args.agent.identity;
-    qd_field_iterator_t     *name       = action->args.agent.name;
-    qdr_query_t             *query      = action->args.agent.query;
-    qd_parsed_field_t       *in_body    = action->args.agent.in_body;
-
-    switch (query->entity_type) {
-    case QD_ROUTER_CONFIG_ADDRESS:    break;
-    case QD_ROUTER_CONFIG_LINK_ROUTE: break;
-    case QD_ROUTER_CONFIG_AUTO_LINK:  break;
-    case QD_ROUTER_CONNECTION:        break;
-    case QD_ROUTER_LINK:              qdra_link_update_CT(core, name, identity, query, in_body); break;
-    case QD_ROUTER_ADDRESS:           break;
-    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
-   }
-
-    qd_parse_free(in_body);
-}
-
-
-
-
-static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qdr_query_t *query  = action->args.agent.query;
-    int          offset = action->args.agent.offset;
-
-    if (!discard) {
-        switch (query->entity_type) {
-        case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_first_CT(core, query, offset); break;
-        case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_first_CT(core, query, offset); break;
-        case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_first_CT(core, query, offset); break;
-        case QD_ROUTER_CONNECTION:        break;
-        case QD_ROUTER_LINK:              qdra_link_get_first_CT(core, query, offset); break;
-        case QD_ROUTER_ADDRESS:           qdra_address_get_first_CT(core, query, offset); break;
-        case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, true); break;
-        case QD_ROUTER_EXCHANGE:          break;
-        case QD_ROUTER_BINDING:           break;
-        }
-    }
-}
-
-
-static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    qdr_query_t *query  = action->args.agent.query;
-
-    if (!discard) {
-        switch (query->entity_type) {
-        case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_next_CT(core, query); break;
-        case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_next_CT(core, query); break;
-        case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_next_CT(core, query); break;
-        case QD_ROUTER_CONNECTION:        break;
-        case QD_ROUTER_LINK:              qdra_link_get_next_CT(core, query); break;
-        case QD_ROUTER_ADDRESS:           qdra_address_get_next_CT(core, query); break;
-        case QD_ROUTER_FORBIDDEN:         break;
-        case QD_ROUTER_EXCHANGE:          break;
-        case QD_ROUTER_BINDING:           break;
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/router_core/agent_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_core.c b/src/router_core/agent_core.c
new file mode 100644
index 0000000..272b75c
--- /dev/null
+++ b/src/router_core/agent_core.c
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+#include "agent_config_address.h"
+#include "agent_config_link_route.h"
+#include "agent_config_auto_link.h"
+#include "agent_address.h"
+#include "agent_link.h"
+#include "router_core_private.h"
+#include <stdio.h>
+
+static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+ALLOC_DECLARE(qdr_query_t);
+ALLOC_DEFINE(qdr_query_t);
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+static void qdr_agent_response_handler(void *context)
+{
+    qdr_core_t  *core = (qdr_core_t*) context;
+    qdr_query_t *query;
+    bool         done = false;
+
+    while (!done) {
+        sys_mutex_lock(core->query_lock);
+        query = DEQ_HEAD(core->outgoing_query_list);
+        if (query)
+            DEQ_REMOVE_HEAD(core->outgoing_query_list);
+        done = DEQ_SIZE(core->outgoing_query_list) == 0;
+        sys_mutex_unlock(core->query_lock);
+
+        if (query) {
+            bool more = query->more;
+            core->agent_response_handler(query->context, &query->status, more);
+            if (!more)
+                qdr_query_free(query);
+        }
+    }
+}
+
+
+void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    sys_mutex_lock(core->query_lock);
+    DEQ_INSERT_TAIL(core->outgoing_query_list, query);
+    bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
+    sys_mutex_unlock(core->query_lock);
+
+    if (notify)
+        qd_timer_schedule(core->agent_timer, 0);
+}
+
+
+qdr_query_t *qdr_query(qdr_core_t              *core,
+                       void                    *context,
+                       qd_router_entity_type_t  type,
+                       qd_composed_field_t     *body)
+{
+    qdr_query_t *query = new_qdr_query_t();
+
+    DEQ_ITEM_INIT(query);
+    ZERO(query);
+    query->core        = core;
+    query->entity_type = type;
+    query->context     = context;
+    query->body        = body;
+    query->more        = false;
+
+    return query;
+}
+
+static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count);
+static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count);
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+void qdr_manage_create(qdr_core_t              *core,
+                       void                    *context,
+                       qd_router_entity_type_t  type,
+                       qd_field_iterator_t     *name,
+                       qd_parsed_field_t       *in_body,
+                       qd_composed_field_t     *out_body)
+{
+    qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create");
+
+    // Create a query object here
+    action->args.agent.query = qdr_query(core, context, type, out_body);
+    action->args.agent.name = name;
+    action->args.agent.in_body = in_body;
+
+    qdr_action_enqueue(core, action);
+}
+
+
+void qdr_manage_delete(qdr_core_t *core,
+                       void  *context,
+                       qd_router_entity_type_t  type,
+                       qd_field_iterator_t     *name,
+                       qd_field_iterator_t     *identity)
+{
+    qdr_action_t *action = qdr_action(qdr_manage_delete_CT, "manage_delete");
+
+    // Create a query object here
+    action->args.agent.query = qdr_query(core, context, type, 0);
+    action->args.agent.name = name;
+    action->args.agent.identity = identity;
+
+    qdr_action_enqueue(core, action);
+}
+
+
+void qdr_manage_read(qdr_core_t *core,
+                     void  *context,
+                     qd_router_entity_type_t  entity_type,
+                     qd_field_iterator_t     *name,
+                     qd_field_iterator_t     *identity,
+                     qd_composed_field_t     *body)
+{
+    qdr_action_t *action = qdr_action(qdr_manage_read_CT, "manage_read");
+
+    // Create a query object here
+    action->args.agent.query = qdr_query(core, context, entity_type, body);
+    action->args.agent.identity  = identity;
+    action->args.agent.name = name;
+
+    qdr_action_enqueue(core, action);
+}
+
+
+void qdr_manage_update(qdr_core_t              *core,
+                       void                    *context,
+                       qd_router_entity_type_t  type,
+                       qd_field_iterator_t     *name,
+                       qd_field_iterator_t     *identity,
+                       qd_parsed_field_t       *in_body,
+                       qd_composed_field_t     *out_body)
+{
+    qdr_action_t *action = qdr_action(qdr_manage_update_CT, "manage_update");
+    action->args.agent.query = qdr_query(core, context, type, out_body);
+    action->args.agent.name = name;
+    action->args.agent.identity = identity;
+    action->args.agent.in_body = in_body;
+
+    qdr_action_enqueue(core, action);
+}
+
+
+qdr_query_t *qdr_manage_query(qdr_core_t              *core,
+                              void                    *context,
+                              qd_router_entity_type_t  type,
+                              qd_parsed_field_t       *attribute_names,
+                              qd_composed_field_t     *body)
+{
+
+    qdr_query_t* query = qdr_query(core, context, type, body);
+
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    qdr_agent_set_columns(query, attribute_names, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT);  break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_set_columns(query, attribute_names, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT);  break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  qdr_agent_set_columns(query, attribute_names, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT);  break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT);  break;
+    case QD_ROUTER_ADDRESS:           qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
+    case QD_ROUTER_FORBIDDEN:         break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+    }
+
+    return query;
+}
+
+
+void qdr_query_add_attribute_names(qdr_query_t *query)
+{
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    qdr_agent_emit_columns(query, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_emit_columns(query, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  qdr_agent_emit_columns(query, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break;
+    case QD_ROUTER_ADDRESS:           qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
+    case QD_ROUTER_FORBIDDEN:         qd_compose_empty_list(query->body); break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+    }
+}
+
+void qdr_query_get_first(qdr_query_t *query, int offset)
+{
+    qdr_action_t *action = qdr_action(qdrh_query_get_first_CT, "query_get_first");
+    action->args.agent.query  = query;
+    action->args.agent.offset = offset;
+    qdr_action_enqueue(query->core, action);
+}
+
+
+void qdr_query_get_next(qdr_query_t *query)
+{
+    qdr_action_t *action = qdr_action(qdrh_query_get_next_CT, "query_get_next");
+    action->args.agent.query = query;
+    qdr_action_enqueue(query->core, action);
+}
+
+
+void qdr_query_free(qdr_query_t *query)
+{
+    if (!query)
+        return;
+
+    if (query->next_key)
+        qdr_field_free(query->next_key);
+
+    free_qdr_query_t(query);
+}
+
+static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count)
+{
+    qd_compose_start_list(query->body);
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        assert(query->columns[i] < column_count);
+        qd_compose_insert_string(query->body, qdr_columns[query->columns[i]]);
+        i++;
+    }
+    qd_compose_end_list(query->body);
+}
+
+static void qdr_agent_set_columns(qdr_query_t *query,
+                                  qd_parsed_field_t *attribute_names,
+                                  const char *qdr_columns[],
+                                  int column_count)
+{
+    if (!attribute_names ||
+        (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 &&
+         qd_parse_tag(attribute_names) != QD_AMQP_LIST32) ||
+        qd_parse_sub_count(attribute_names) == 0 ||
+        qd_parse_sub_count(attribute_names) >= QDR_AGENT_MAX_COLUMNS) {
+        //
+        // Either the attribute_names field is absent, it's not a list, or it's an empty list.
+        // In this case, we will include all available attributes.
+        //
+        int i;
+        for (i = 0; i < column_count; i++)
+            query->columns[i] = i;
+        query->columns[i] = -1;
+        assert(i < QDR_AGENT_MAX_COLUMNS);
+        return;
+    }
+
+    //
+    // We have a valid, non-empty attribute list.  Set the columns appropriately.
+    //
+    uint32_t count = qd_parse_sub_count(attribute_names);
+    uint32_t idx;
+
+    for (idx = 0; idx < count; idx++) {
+        qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx);
+        if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8))
+            query->columns[idx] = QDR_AGENT_COLUMN_NULL;
+        else {
+            int j = 0;
+            while (qdr_columns[j]) {
+                qd_field_iterator_t *iter = qd_parse_raw(name);
+                if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_columns[j])) {
+                    query->columns[idx] = j;
+                    break;
+                }
+                j+=1;
+            }
+        }
+    }
+    query->columns[idx+1] = -1;
+}
+
+
+
+void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler)
+{
+    core->agent_response_handler = response_handler;
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+void qdr_agent_setup_CT(qdr_core_t *core)
+{
+    DEQ_INIT(core->outgoing_query_list);
+    core->query_lock  = sys_mutex();
+    core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core);
+}
+
+
+static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query)
+{
+    query->status = QD_AMQP_FORBIDDEN;
+    if (query->body && !op_query)
+        qd_compose_insert_null(query->body);
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qd_field_iterator_t     *identity   = action->args.agent.identity;
+    qd_field_iterator_t     *name       = action->args.agent.name;
+    qdr_query_t             *query      = action->args.agent.query;
+
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_CT(core, name, identity, query, qdr_config_address_columns); break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_CT(core, name, identity, query, qdr_config_link_route_columns); break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_CT(core, name, identity, query, qdr_config_auto_link_columns); break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              break;
+    case QD_ROUTER_ADDRESS:           qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break;
+    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+   }
+}
+
+
+static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qd_field_iterator_t     *name       = action->args.agent.name;
+    qdr_query_t             *query      = action->args.agent.query;
+    qd_parsed_field_t       *in_body    = action->args.agent.in_body;
+
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_create_CT(core, name, query, in_body); break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_create_CT(core, name, query, in_body); break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_create_CT(core, name, query, in_body); break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              break;
+    case QD_ROUTER_ADDRESS:           break;
+    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+
+   }
+
+   qd_parse_free(in_body);
+}
+
+
+static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qd_field_iterator_t     *name       = action->args.agent.name;
+    qd_field_iterator_t     *identity   = action->args.agent.identity;
+    qdr_query_t             *query      = action->args.agent.query;
+
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_delete_CT(core, query, name, identity); break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, query, name, identity); break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_delete_CT(core, query, name, identity); break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              break;
+    case QD_ROUTER_ADDRESS:           break;
+    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+   }
+}
+
+static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qd_field_iterator_t     *identity   = action->args.agent.identity;
+    qd_field_iterator_t     *name       = action->args.agent.name;
+    qdr_query_t             *query      = action->args.agent.query;
+    qd_parsed_field_t       *in_body    = action->args.agent.in_body;
+
+    switch (query->entity_type) {
+    case QD_ROUTER_CONFIG_ADDRESS:    break;
+    case QD_ROUTER_CONFIG_LINK_ROUTE: break;
+    case QD_ROUTER_CONFIG_AUTO_LINK:  break;
+    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_LINK:              qdra_link_update_CT(core, name, identity, query, in_body); break;
+    case QD_ROUTER_ADDRESS:           break;
+    case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
+    case QD_ROUTER_EXCHANGE:          break;
+    case QD_ROUTER_BINDING:           break;
+   }
+
+    qd_parse_free(in_body);
+}
+
+
+
+
+static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_query_t *query  = action->args.agent.query;
+    int          offset = action->args.agent.offset;
+
+    if (!discard) {
+        switch (query->entity_type) {
+        case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_CONNECTION:        break;
+        case QD_ROUTER_LINK:              qdra_link_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_ADDRESS:           qdra_address_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, true); break;
+        case QD_ROUTER_EXCHANGE:          break;
+        case QD_ROUTER_BINDING:           break;
+        }
+    }
+}
+
+
+static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_query_t *query  = action->args.agent.query;
+
+    if (!discard) {
+        switch (query->entity_type) {
+        case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_get_next_CT(core, query); break;
+        case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_next_CT(core, query); break;
+        case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_get_next_CT(core, query); break;
+        case QD_ROUTER_CONNECTION:        break;
+        case QD_ROUTER_LINK:              qdra_link_get_next_CT(core, query); break;
+        case QD_ROUTER_ADDRESS:           qdra_address_get_next_CT(core, query); break;
+        case QD_ROUTER_FORBIDDEN:         break;
+        case QD_ROUTER_EXCHANGE:          break;
+        case QD_ROUTER_BINDING:           break;
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message