Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 14D70200B5C for ; Thu, 11 Aug 2016 19:42:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1370E160A90; Thu, 11 Aug 2016 17:42:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5FBB1160AB4 for ; Thu, 11 Aug 2016 19:42:56 +0200 (CEST) Received: (qmail 76891 invoked by uid 500); 11 Aug 2016 17:42:55 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 76702 invoked by uid 99); 11 Aug 2016 17:42:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Aug 2016 17:42:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3C276E03E8; Thu, 11 Aug 2016 17:42:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gmurthy@apache.org To: commits@qpid.apache.org Date: Thu, 11 Aug 2016 17:42:56 -0000 Message-Id: <6cd40a263c73452fbd9a9907e4704054@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/9] qpid-dispatch git commit: DISPATCH-437 - Work in progress - Moved some of the code over from config.py to a new class called ManagementAgent archived-at: Thu, 11 Aug 2016 17:42:58 -0000 DISPATCH-437 - Work in progress - Moved some of the code over from config.py to a new class called ManagementAgent Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1df96814 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1df96814 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1df96814 Branch: refs/heads/DISPATCH-437-1 Commit: 1df96814787656de74ca956b478d78ed0025d72d Parents: 2bcb2f7 Author: Ganesh Murthy Authored: Thu Aug 4 08:45:01 2016 -0400 Committer: Ganesh Murthy Committed: Thu Aug 4 08:45:01 2016 -0400 ---------------------------------------------------------------------- doc/CMakeLists.txt | 2 +- doc/book/schema_txt.py | 2 +- include/qpid/dispatch/server.h | 11 - .../management/__init__.py | 2 - .../qpid_dispatch_internal/management/agent.py | 1008 ---------------- .../management/agent/__init__.py | 24 + .../management/agent/agent.py | 1095 ++++++++++++++++++ .../qpid_dispatch_internal/management/config.py | 12 +- .../management/qdrouter.py | 74 -- .../qpid_dispatch_internal/management/schema.py | 644 ---------- .../management/schema/__init__.py | 18 + .../management/schema/qdrouter.py | 25 + .../management/schema/schema.py | 700 +++++++++++ .../management/schema/schema_doc.py | 121 ++ .../management/schema_doc.py | 120 -- src/CMakeLists.txt | 8 +- src/agent.c | 434 +++---- src/agent.h | 9 +- src/agent_private.h | 46 +- src/dispatch.c | 9 +- src/dispatch_private.h | 6 +- src/router_core/management_agent.c | 144 +-- src/router_pynode.c | 6 +- src/server.c | 20 - 24 files changed, 2256 insertions(+), 2284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/doc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index b8c408a..62a80fd 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -42,7 +42,7 @@ if(BUILD_DOCS) set(tools ${CMAKE_SOURCE_DIR}/tools) set(schema ../python/qpid_dispatch/management/qdrouter.json) set(py_management ../python/qpid_dispatch_internal/management) - set(schema_depends ${schema} ${py_management}/schema_doc.py ${py_management}/schema.py) + set(schema_depends ${schema} ${py_management}/schema/schema_doc.py ${py_management}/schema/schema.py) # Flags for doc tools, based on cmake options set(ADOC_FLAGS "--conf-file=${bin}/asciidoc.conf") http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/doc/book/schema_txt.py ---------------------------------------------------------------------- diff --git a/doc/book/schema_txt.py b/doc/book/schema_txt.py index c9d5b1b..76c0246 100644 --- a/doc/book/schema_txt.py +++ b/doc/book/schema_txt.py @@ -22,7 +22,7 @@ Generate the schema.md chapter for the dispatch book from the qdrouter.json sche """ import sys, re -from qpid_dispatch_internal.management.qdrouter import QdSchema +from qpid_dispatch_internal.management.schema.schema import QdSchema from qpid_dispatch_internal.management.schema_doc import SchemaWriter class BookSchemaWriter(SchemaWriter): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/include/qpid/dispatch/server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 03d4e4b..1519df8 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -90,17 +90,6 @@ void qd_server_run(qd_dispatch_t *qd); /** - * Start the server threads and return immediately - The non-blocking version. - * - * Start the operation of the server, including launching all of the worker - * threads. - * - * @param qd The dispatch handle returned by qd_dispatch. - */ -void qd_server_start(qd_dispatch_t *qd); - - -/** * Stop the server * * Stop the server and join all of its worker threads. This function may be http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/python/qpid_dispatch_internal/management/__init__.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/__init__.py b/python/qpid_dispatch_internal/management/__init__.py index 9d49f47..9c49ec5 100644 --- a/python/qpid_dispatch_internal/management/__init__.py +++ b/python/qpid_dispatch_internal/management/__init__.py @@ -18,7 +18,5 @@ # """Qpid Dispatch internal management package.""" -# from .agent import ManagementAgent -__all__ = ["ManagementAgent"] http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/python/qpid_dispatch_internal/management/agent.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py deleted file mode 100644 index 7ada418..0000000 --- a/python/qpid_dispatch_internal/management/agent.py +++ /dev/null @@ -1,1008 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License -# - -"""Agent implementing the server side of the AMQP management protocol. - -Adapter layer between external attribute-value maps sent/received via the AMQP -management protocol and implementation objects (C or python) of the dispatch -router. Entity types are as described in qdrouter.json schema. Reading -configuration files is treated as a set of CREATE operations. - -Maintains a set of L{EntityAdapter} that hold attribute maps reflecting the last -known attribute values of the implementation objects. Delegates management -operations to the correct adapter. - -EntityAdapters are created/deleted in two ways: - -- Externally by CREATE/DELETE operations (or loading config file) -- Internally by creation or deletion of corresponding implementation object. - -Memory managment: The implementation is reponsible for informing the L{Agent} -when an implementation object is created and *before* it is deleted in the case -of a C object. - -EntityAdapters can: - -- Receive attribute maps via CREATE or UPDATE operations (reading configuration - files is treated as a set of CREATE operations) and set configuration in the - implementation objects. - -- Refresh the adapters attribute map to reflect the current state of the - implementation objects, to respond to READ or QUERY operations with up-to-date values. - -To avoid confusion the term "update" is only used for the EntityAdapter updating -the implementation object. The term "refresh" is used for the EntityAdapter -getting current information from the implementation object. - -## Threading: - - -The agent is locked to be thread safe, called in the following threads: -- Reading configuration file in initialization thread (no contention). -- Management requests arriving in multiple, concurrent connection threads. -- Implementation objects created/deleted in multiple, concurrent connection threads. - -When refreshing attributes, the agent must also read C implementation object -data that may be updated in other threads. - -# FIXME aconway 2015-02-09: -Temporary solution is to lock the entire dispatch router lock during full refresh. -Better solution coming soon... -""" - -import traceback, json, pstats -from itertools import ifilter, chain -from traceback import format_exc -from threading import Lock -from cProfile import Profile -from cStringIO import StringIO -from ctypes import c_void_p, py_object, c_long -from subprocess import Popen -from ..dispatch import IoAdapter, LogAdapter, LOG_INFO, LOG_WARNING, LOG_DEBUG, LOG_ERROR, TREATMENT_ANYCAST_CLOSEST -from qpid_dispatch.management.error import ManagementError, OK, CREATED, NO_CONTENT, STATUS_TEXT, \ - BadRequestStatus, InternalServerErrorStatus, NotImplementedStatus, NotFoundStatus, ForbiddenStatus -from qpid_dispatch.management.entity import camelcase -from .schema import ValidationError, SchemaEntity, EntityType -from .qdrouter import QdSchema -from ..router.message import Message -from ..router.address import Address -from ..policy.policy_manager import PolicyManager -from . import AgentRequestHandler - -def dictstr(d): - """Stringify a dict in the form 'k=v, k=v ...' instead of '{k:v, ...}'""" - return ", ".join("%s=%r" % (k, v) for k, v in d.iteritems()) - -def required_property(prop, request): - """Raise exception if required property is missing""" - if not request.properties or prop not in request.properties: - raise BadRequestStatus("No '%s' property: %s"%(prop, request)) - return request.properties[prop] - -def not_implemented(operation, entity_type): - """Raise NOT_IMPLEMENTED exception""" - raise NotImplementedStatus("Operation '%s' not implemented on %s" % (operation, entity_type)) - - -class AtomicCount(object): - """Simple atomic counter""" - def __init__(self, count=0): - self.count = count - self.lock = Lock() - - def next(self): - with self.lock: - n = self.count - self.count += 1 - return n - - -class Implementation(object): - """Abstract implementation wrapper""" - def __init__(self, entity_type, key): - self.entity_type, self.key = entity_type, key - - -class CImplementation(Implementation): - """Wrapper for a C implementation pointer""" - def __init__(self, qd, entity_type, pointer): - super(CImplementation, self).__init__(entity_type, pointer) - fname = "qd_entity_refresh_" + entity_type.short_name.replace('.', '_') - self.refreshfn = qd.function(fname, c_long, [py_object, c_void_p]) - - def refresh_entity(self, attributes): - return self.refreshfn(attributes, self.key) or True - - -class PythonImplementation(Implementation): - """Wrapper for a Python implementation object""" - def __init__(self, entity_type, impl): - """impl.refresh_entity(attributes) must be a valid function call""" - super(PythonImplementation, self).__init__(entity_type, id(impl)) - self.refresh_entity = impl.refresh_entity - - -class EntityAdapter(SchemaEntity): - """ - Base class for agent entities with operations as well as attributes. - """ - - def __init__(self, agent, entity_type, attributes=None, validate=True): - """ - @para agent: Containing L{Agent} - @param entity_type: L{EntityType} - @param attributes: Attribute name:value map - @param validate: If true, validate the entity. - """ - super(EntityAdapter, self).__init__(entity_type, attributes or {}, validate=validate) - # Direct __dict__ access to avoid validation as schema attributes - self.__dict__['_agent'] = agent - self.__dict__['_log'] = agent.log - self.__dict__['_qd'] = agent.qd - self.__dict__['_dispatch'] = agent.dispatch - self.__dict__['_policy'] = agent.policy - self.__dict__['_implementations'] = [] - - def validate(self, **kwargs): - """Set default identity and name if not already set, then do schema validation""" - identity = self.attributes.get("identity") - name = self.attributes.get("name") - if identity: - if not name: - self.attributes[u"name"] = "%s/%s" % (self.entity_type.short_name, self._identifier()) - else: - self.attributes[u"identity"] = "%s/%s" % (self.entity_type.short_name, self._identifier()) - if not name: - self.attributes.setdefault(u'name', self.attributes[u'identity']) - - super(EntityAdapter, self).validate(**kwargs) - - def _identifier(self): - """ - Generate identifier. identity=type/identifier. - Default is per-type counter, derived classes can override. - """ - try: counter = type(self)._identifier_count - except AttributeError: counter = type(self)._identifier_count = AtomicCount() - return str(counter.next()) - - def _refresh(self): - """Refresh self.attributes from implementation object(s).""" - for impl in self._implementations: - impl.refresh_entity(self.attributes) - return bool(self._implementations) - - def _add_implementation(self, impl): - """Add an implementaiton object to use to refresh our attributes""" - self._implementations.append(impl) - - def create(self): - """Subclasses can add extra create actions here""" - pass - - def read(self, request): - """Handle read request, default is to return attributes.""" - request_type = self.entity_type.schema.long_name(request.properties.get('type')) - if request_type and self.type != request_type: - raise NotFoundStatus("Entity type '%s' does match requested type '%s'" % - (self.type, request_type)) - return (OK, self.attributes) - - def update(self, request): - """Handle update request with new attributes from management client""" - self.entity_type.update_check(request.body, self.attributes) - newattrs = dict(self.attributes, **request.body) - self.entity_type.validate(newattrs, update=True) - self.attributes = newattrs - self._update() - return (OK, self.attributes) - - def _update(self): - """Subclasses implement update logic here""" - pass - - def delete(self, request): - """Handle delete request from client""" - self._delete() - self._agent.remove(self) - return (NO_CONTENT, {}) - - def _delete(self): - """Subclasses implement delete logic here""" - pass - - def __str__(self): - keys = sorted(self.attributes.keys()) - # If the attribute is hidden the attribute value will show up as stars ('*******'). - return "Entity(%s)" % ", ".join("%s=%s" % (k, '*******' if self.entity_type.attribute(k).hidden else self.attributes[k]) for k in keys) - - -class ContainerEntity(EntityAdapter): - """ - The ContainerEntity has been deprecated. Use the the RouterEntity instead - """ - - def create(self): - self._qd.qd_dispatch_configure_container(self._dispatch, self) - - def _identifier(self): - self.attributes.setdefault("containerName", "00000000-0000-0000-0000-000000000000") - return self.attributes["containerName"] - - def __str__(self): - return super(ContainerEntity, self).__str__().replace("Entity(", "ContainerEntity(") - - -class RouterEntity(EntityAdapter): - def __init__(self, agent, entity_type, attributes=None): - super(RouterEntity, self).__init__(agent, entity_type, attributes, validate=False) - # Router is a mix of configuration and operational entity. - # The statistics attributes are operational not configured. - self._add_implementation( - CImplementation(agent.qd, entity_type, self._dispatch)) - - def _identifier(self): return self.attributes.get('id') - - def create(self): - try: - if self.routerId: - self._agent.log(LOG_WARNING, "routerId is deprecated, use id instead") - except: - pass - - self._qd.qd_dispatch_configure_router(self._dispatch, self) - - def __str__(self): - return super(RouterEntity, self).__str__().replace("Entity(", "RouterEntity(") - - -class LogEntity(EntityAdapter): - - def __init__(self, agent, entity_type, attributes=None, validate=True): - # Special defaults for DEFAULT module. - if attributes.get("module") == "DEFAULT": - defaults = dict(enable="info+", timestamp=True, source=False, output="stderr") - attributes = dict(defaults, **attributes) - super(LogEntity, self).__init__(agent, entity_type, attributes, validate=True) - - def _identifier(self): return self.attributes.get('module') - - def create(self): - self._qd.qd_log_entity(self) - - def _update(self): - self._qd.qd_log_entity(self) - - def _delete(self): - """Can't actually delete a log source but return it to the default state""" - self._qd.qd_log_source_reset(self.attributes['module']) - - def __str__(self): - return super(LogEntity, self).__str__().replace("Entity(", "LogEntity(") - - -class PolicyEntity(EntityAdapter): - def __init__(self, agent, entity_type, attributes=None): - super(PolicyEntity, self).__init__(agent, entity_type, attributes, validate=False) - # Policy is a mix of configuration and operational entity. - # The statistics attributes are operational not configured. - self._add_implementation( - CImplementation(agent.qd, entity_type, self._dispatch)) - - def create(self): - self._qd.qd_dispatch_configure_policy(self._dispatch, self) - self._qd.qd_dispatch_register_policy_manager(self._dispatch, self._policy) - - def _identifier(self): - return self.attributes.get('module') - - def __str__(self): - return super(PolicyEntity, self).__str__().replace("Entity(", "PolicyEntity(") - - -class VhostEntity(EntityAdapter): - def create(self): - self._policy.create_ruleset(self.attributes) - - def _identifier(self): - return self.attributes.get('id') - - def __str__(self): - return super(VhostEntity, self).__str__().replace("Entity(", "VhostEntity(") - - -class VhostStatsEntity(EntityAdapter): - def _identifier(self): - return self.attributes.get('id') - - def __str__(self): - return super(VhostStatsEntity, self).__str__().replace("Entity(", "VhostStatsEntity(") - - def __str__(self): - return super(PolicyStatsEntity, self).__str__().replace("Entity(", "PolicyStatsEntity(") - - -def _host_port_name_identifier(entity): - for attr in ['host', 'port', 'name']: # Set default values if need be - entity.attributes.setdefault( - attr, entity.entity_type.attribute(attr).missing_value()) - - if entity.attributes.get('name'): - return "%s:%s:%s" % (entity.attributes['host'], entity.attributes['port'], entity.attributes['name']) - else: - return "%s:%s" % (entity.attributes['host'], entity.attributes['port']) - - -class SslProfileEntity(EntityAdapter): - def create(self): - return self._qd.qd_dispatch_configure_ssl_profile(self._dispatch, self) - - def _delete(self): - deleted = self._qd.qd_connection_manager_delete_ssl_profile(self._dispatch, self._implementations[0].key) - # SSL Profiles cannot be deleted if they are referenced by a connector/listener. - if not deleted: - raise ForbiddenStatus("SSL Profile is referenced by other listeners/connectors. Delete the associated " - "listeners/connectors before deleting the SSL Profile") - - def _identifier(self): - return self.name - - def __str__(self): - return super(SslProfileEntity, self).__str__().replace("Entity(", "SslProfileEntity(") - -class ListenerEntity(EntityAdapter): - def create(self): - config_listener = self._qd.qd_dispatch_configure_listener(self._dispatch, self) - self._qd.qd_connection_manager_start(self._dispatch) - return config_listener - - def _identifier(self): - return _host_port_name_identifier(self) - - def __str__(self): - return super(ListenerEntity, self).__str__().replace("Entity(", "ListenerEntity(") - - def _delete(self): - self._qd.qd_connection_manager_delete_listener(self._dispatch, self._implementations[0].key) - -class ConnectorEntity(EntityAdapter): - def create(self): - config_connector = self._qd.qd_dispatch_configure_connector(self._dispatch, self) - self._qd.qd_connection_manager_start(self._dispatch) - return config_connector - - def _delete(self): - self._qd.qd_connection_manager_delete_connector(self._dispatch, self._implementations[0].key) - - def _identifier(self): - return _host_port_name_identifier(self) - - def __str__(self): - return super(ConnectorEntity, self).__str__().replace("Entity(", "ConnectorEntity(") - -class FixedAddressEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_fixed_address(self._dispatch, self) - - def __str__(self): - return super(FixedAddressEntity, self).__str__().replace("Entity(", "FixedAddressEntity(") - - -class WaypointEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_waypoint(self._dispatch, self) - #self._qd.qd_waypoint_activate_all(self._dispatch) - - def __str__(self): - return super(WaypointEntity, self).__str__().replace("Entity(", "WaypointEntity(") - -class LinkRoutePatternEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_lrp(self._dispatch, self) - - def __str__(self): - return super(LinkRoutePatternEntity, self).__str__().replace("Entity(", "LinkRoutePatternEntity(") - -class AddressEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_address(self._dispatch, self) - - def __str__(self): - return super(AddressEntity, self).__str__().replace("Entity(", "AddressEntity(") - -class LinkRouteEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_link_route(self._dispatch, self) - - def __str__(self): - return super(LinkRouteEntity, self).__str__().replace("Entity(", "LinkRouteEntity(") - -class AutoLinkEntity(EntityAdapter): - def create(self): - self._qd.qd_dispatch_configure_auto_link(self._dispatch, self) - - def __str__(self): - return super(AutoLinkEntity, self).__str__().replace("Entity(", "AutoLinkEntity(") - -class ConsoleEntity(EntityAdapter): - def __str__(self): - return super(ConsoleEntity, self).__str__().replace("Entity(", "ConsoleEntity(") - - def create(self): - # if a named listener is present, use its host:port - name = self.attributes.get('listener') - if name: - listeners = self._agent.find_entity_by_type("listener") - for listener in listeners: - if listener.name == name: - try: - #required - host = listener.attributes['host'] - port = listener.attributes['port'] - #optional - wsport = self.attributes.get('wsport') - home = self.attributes.get('home') - args = self.attributes.get('args') - - pargs = [] - pargs.append(self.attributes['proxy']) - if args: - # Replace any $port|$host|$wsport|$home - dargs = {'$port': port, '$host': host} - if wsport: - dargs['$wsport'] = wsport - if home: - dargs['$home'] = home - for k,v in dargs.iteritems(): - args = args.replace(k,str(v)) - pargs += args.split() - - #run the external program - Popen(pargs) - except: - self._agent.log(LOG_ERROR, "Can't parse console entity: %s" % (format_exc())) - break - -class DummyEntity(EntityAdapter): - def callme(self, request): - return (OK, dict(**request.properties)) - - -class RouterLinkEntity(EntityAdapter): - def __str__(self): - return super(RouterLinkEntity, self).__str__().replace("Entity(", "RouterLinkEntity(") - - -class RouterNodeEntity(EntityAdapter): - def _identifier(self): - return self.attributes.get('id') - - def __str__(self): - return super(RouterNodeEntity, self).__str__().replace("Entity(", "RouterNodeEntity(") - - -class RouterAddressEntity(EntityAdapter): - def _identifier(self): - return self.attributes.get('key') - - def __str__(self): - return super(RouterAddressEntity, self).__str__().replace("Entity(", "RouterAddressEntity(") - - -class ConnectionEntity(EntityAdapter): - def _identifier(self): - return self.attributes.get('host') + ":" + str(self.attributes.get('identity')) - - def __str__(self): - return super(ConnectionEntity, self).__str__().replace("Entity(", "ConnectionEntity(") - - -class AllocatorEntity(EntityAdapter): - def _identifier(self): - return self.attributes.get('typeName') - - def __str__(self): - return super(AllocatorEntity, self).__str__().replace("Entity(", "AllocatorEntity(") - - -class EntityCache(object): - """ - Searchable cache of entities, can be refreshed from implementation objects. - """ - - def __init__(self, agent): - self.entities = [] - self.implementations = {} - self.agent = agent - self.qd = self.agent.qd - self.schema = agent.schema - self.log = self.agent.log - - def map_filter(self, function, test): - """Filter with test then apply function.""" - return map(function, ifilter(test, self.entities)) - - def map_type(self, function, type): - """Apply function to all entities of type, if type is None do all entities""" - if type is None: - return map(function, self.entities) - else: - if not isinstance(type, EntityType): type = self.schema.entity_type(type) - return map(function, ifilter(lambda e: e.entity_type.is_a(type), self.entities)) - - def add(self, entity): - """Add an entity to the agent""" - self.log(LOG_DEBUG, "Add entity: %s" % entity) - entity.validate() # Fill in defaults etc. - # Validate in the context of the existing entities for uniqueness - self.schema.validate_full(chain(iter([entity]), iter(self.entities))) - self.entities.append(entity) - - def _add_implementation(self, implementation, adapter=None): - """Create an adapter to wrap the implementation object and add it""" - cls = self.agent.entity_class(implementation.entity_type) - if not adapter: - adapter = cls(self.agent, implementation.entity_type, validate=False) - self.implementations[implementation.key] = adapter - adapter._add_implementation(implementation) - adapter._refresh() - self.add(adapter) - - def add_implementation(self, implementation, adapter=None): - self._add_implementation(implementation, adapter=adapter) - - def _remove(self, entity): - try: - self.entities.remove(entity) - self.log(LOG_DEBUG, "Remove %s entity: %s" % - (entity.entity_type.short_name, entity.attributes['identity'])) - except ValueError: pass - - def remove(self, entity): - self._remove(entity) - - def _remove_implementation(self, key): - if key in self.implementations: - entity = self.implementations[key] - del self.implementations[key] - self._remove(entity) - - def remove_implementation(self, key): - self._remove_implementation(key) - - def refresh_from_c(self): - """Refresh entities from the C dispatch runtime""" - REMOVE, ADD = 0, 1 - - def remove_redundant(events): - """Remove redundant add/remove pairs of events.""" - add = {} # add[pointer] = index of add event. - redundant = [] # List of redundant event indexes. - for i in xrange(len(events)): - action, type, pointer = events[i] - if action == ADD: - add[pointer] = i - elif pointer in add: # action == REMOVE and there's an ADD - redundant.append(add[pointer]) - redundant.append(i) - del add[pointer] - for i in sorted(redundant, reverse=True): - events.pop(i) - - # FIXME aconway 2014-10-23: locking is ugly, push it down into C code. - self.qd.qd_dispatch_router_lock(self.agent.dispatch) - try: - events = [] - self.qd.qd_entity_refresh_begin(events) - remove_redundant(events) - for action, type, pointer in events: - if action == REMOVE: - self._remove_implementation(pointer) - elif action == ADD: - entity_type = self.schema.entity_type(type) - self._add_implementation(CImplementation(self.qd, entity_type, pointer)) - # Refresh the entity values while the lock is still held. - for e in self.entities: e._refresh() - finally: - self.qd.qd_entity_refresh_end() - self.qd.qd_dispatch_router_unlock(self.agent.dispatch) - -class ManagementEntity(EntityAdapter): - """An entity representing the agent itself. It is a singleton created by the agent.""" - - def __init__(self, agent, entity_type, attributes, validate=True): - attributes = {"identity": "self", "name": "self"} - super(ManagementEntity, self).__init__(agent, entity_type, attributes, validate=validate) - self.__dict__["_schema"] = entity_type.schema - - def requested_type(self, request): - type = request.properties.get('entityType') - if type: return self._schema.entity_type(type) - else: return None - - def query(self, request): - """Management node query operation""" - entity_type = self.requested_type(request) - if entity_type: - all_attrs = set(entity_type.attributes.keys()) - else: - all_attrs = self._schema.all_attributes - - names = set(request.body.get('attributeNames')) - if names: - unknown = names - all_attrs - if unknown: - if entity_type: - for_type = " for type %s" % entity_type.name - else: - for_type = "" - raise NotFoundStatus("Unknown attributes %s%s." % (list(unknown), for_type)) - else: - names = all_attrs - - results = [] - def add_result(entity): - result = [] - non_empty = False - for name in names: - result.append(entity.attributes.get(name)) - if result[-1] is not None: non_empty = True - if non_empty: results.append(result) - - self._agent.entities.map_type(add_result, entity_type) - return (OK, {'attributeNames': list(names), 'results': results}) - - def get_types(self, request): - type = self.requested_type(request) - return (OK, dict((t.name, [b.name for b in t.all_bases]) - for t in self._schema.by_type(type))) - - def get_annotations(self, request): - """ - We are not supporting any annotations at the moment. - """ - return (OK, {}) - - def get_operations(self, request): - type = self.requested_type(request) - return (OK, dict((t, et.operations) - for t, et in self._schema.entity_types.iteritems() - if not type or type.name == t)) - - def get_attributes(self, request): - type = self.requested_type(request) - return (OK, dict((t, [a for a in et.attributes]) - for t, et in self._schema.entity_types.iteritems() - if not type or type.name == t)) - - def get_mgmt_nodes(self, request): - router = self._agent.entities.map_type(None, 'router')[0] - area = router.attributes['area'] - def node_address(node): - return str(Address.topological(node.attributes['id'], "$management", area)) - return (OK, self._agent.entities.map_type(node_address, 'router.node')) - - def get_schema(self, request): - return (OK, self._schema.dump()) - - def _intprop(self, request, prop): - value = request.properties.get(prop) - if value is not None: value = int(value) - return value - - def get_json_schema(self, request): - return (OK, json.dumps(self._schema.dump(), indent=self._intprop(request, "indent"))) - - def get_log(self, request): - logs = self._qd.qd_log_recent_py(self._intprop(request, "limit") or -1) - return (OK, logs) - - def profile(self, request): - """Start/stop the python profiler, returns profile results""" - profile = self.__dict__.get("_profile") - if "start" in request.properties: - if not profile: - profile = self.__dict__["_profile"] = Profile() - profile.enable() - self._log(LOG_INFO, "Started python profiler") - return (OK, None) - if not profile: - raise BadRequestStatus("Profiler not started") - if "stop" in request.properties: - profile.create_stats() - self._log(LOG_INFO, "Stopped python profiler") - out = StringIO() - stats = pstats.Stats(profile, stream=out) - try: - stop = request.properties["stop"] - if stop == "kgrind": # Generate kcachegrind output using pyprof2calltree - from pyprof2calltree import convert - convert(stats, out) - elif stop == "visualize": # Start kcachegrind using pyprof2calltree - from pyprof2calltree import visualize - visualize(stats) - else: - stats.print_stats() # Plain python profile stats - return (OK, out.getvalue()) - finally: - out.close() - raise BadRequestStatus("Bad profile request %s" % (request)) - -class ManagementAgent(object): - """ - AMQP managment agent. Manages entities, directs requests to the correct entity. - """ - - def __init__(self, address, agent_adapter, dispatch, qd, schema=QdSchema()): - self.qd = qd - self.address = address - self.agent_adapter = agent_adapter - self.dispatch = dispatch - self.schema = schema - self.entities = EntityCache(self) - self.request_lock = Lock() - self.log_adapter = LogAdapter("AGENT") - self.policy = PolicyManager(self) - self.management = self.create_entity({"type": "management"}) - self.add_entity(self.management) - self.io = IoAdapter(self.receive, address, 'L', '0', TREATMENT_ANYCAST_CLOSEST) - - def log(self, level, text): - info = traceback.extract_stack(limit=2)[0] # Caller frame info - self.log_adapter.log(level, text, info[0], info[1]) - - def entity_class(self, entity_type): - """Return the class that implements entity_type""" - class_name = camelcase(entity_type.short_name, capital=True) + 'Entity' - entity_class = globals().get(class_name) - if not entity_class: - raise InternalServerErrorStatus( - "Can't find implementation '%s' for '%s'" % (class_name, entity_type.name)) - return entity_class - - def create_entity(self, attributes): - """Create an instance of the implementation class for an entity""" - - if attributes.get('identity') is not None: - raise BadRequestStatus("'identity' attribute cannot be specified %s" % attributes) - if attributes.get('type') is None: - raise BadRequestStatus("No 'type' attribute in %s" % attributes) - entity_type = self.schema.entity_type(attributes['type']) - return self.entity_class(entity_type)(self, entity_type, attributes) - - def entity_type(self, type): - try: - return self.schema.entity_type(type) - except ValidationError, e: - raise NotFoundStatus(str(e)) - - def _create(self, attributes): - """Create an entity, called externally or from configuration file.""" - entity = self.create_entity(attributes) - pointer = entity.create() - if pointer: - cimplementation = CImplementation(self.qd, entity.entity_type, pointer) - self.entities.add_implementation(cimplementation, entity) - else: - self.add_entity(entity) - return entity - - def create(self, request): - """ - Create operation called from an external client. - Create is special: it is directed at an entity but the entity - does not yet exist so it is handled initially by the agent and - then delegated to the new entity. - """ - attributes = request.body - for a in ['type', 'name']: - prop = request.properties.get(a) - if prop: - old = attributes.setdefault(a, prop) - if old is not None and old != prop: - raise BadRequestStatus("Conflicting values for '%s'" % a) - attributes[a] = prop - if attributes.get('type') is None: - raise BadRequestStatus("No 'type' attribute in %s" % attributes) - et = self.schema.entity_type(attributes['type']) - et.allowed("CREATE", attributes) - et.create_check(attributes) - return (CREATED, self._create(attributes).attributes) - - def configure(self, attributes): - """Created via configuration file""" - self._create(attributes) - - def add_entity(self, entity): - """Add an entity adapter""" - self.entities.add(entity) - - def remove(self, entity): - self.entities.remove(entity) - - def add_implementation(self, implementation, entity_type_name): - """Add an internal python implementation object, it will be wrapped with an entity adapter""" - self.entities.add_implementation( - PythonImplementation(self.entity_type(entity_type_name), implementation)) - - def remove_implementation(self, implementation): - """Remove and internal python implementation object.""" - self.entities.remove_implementation(id(implementation)) - - def requested_entity_type(self, request): - entity_type = request.properties.get('entityType') - if entity_type: - return self.schema.entity_type(entity_type) - return None - - def find_entity(self, request): - """Find the entity addressed by request""" - - requested_type = request.properties.get('type') - if requested_type: - requested_type = self.schema.entity_type(requested_type) - - # ids is a map of identifying attribute values - ids = dict((k, request.properties.get(k)) - for k in ['name', 'identity'] if k in request.properties) - - # Special case for management object: if no name/id and no conflicting type - # then assume this is for "self" - if not ids: - if not requested_type or self.management.entity_type.is_a(requested_type): - return self.management - else: - raise BadRequestStatus("No name or identity provided") - - def attrvals(): - """String form of the id attribute values for error messages""" - return " ".join(["%s=%r" % (k, v) for k, v in ids.iteritems()]) - - k, v = ids.iteritems().next() # Get the first id attribute - found = self.entities.map_filter(None, lambda e: e.attributes.get(k) == v) - if len(found) == 1: - entity = found[0] - elif len(found) > 1: - raise InternalServerErrorStatus( - "Duplicate (%s) entities with %s=%r" % (len(found), k, v)) - else: - raise NotFoundStatus("No entity with %s" % attrvals()) - - for k, v in ids.iteritems(): - if entity[k] != v: raise BadRequestStatus("Conflicting %s" % attrvals()) - - if requested_type: - if not entity.entity_type.is_a(requested_type): - raise BadRequestStatus("Entity type '%s' does not extend requested type '%s'" % - (entity.entity_type.name, requested_type)) - - return entity - - def find_entity_by_type(self, type): - return self.entities.map_type(None, type) - - def handle(self, request): - """ - Handle a request. - Dispatch management node requests to self, entity requests to the entity. - @return: (response-code, body) - """ - # Get the operation from the request. This could be CREATE or READ or UPDATE or DELETE or QUERY - operation = required_property('operation', request) - - # Get the entity type from the request. For e.g. entity_type could be a connector entity or a listener entity - entity_type = self.requested_entity_type(request) - - # Get the type from the request - requested_type = request.properties.get('type') # Should be org.amqp.management - if requested_type: - requested_type = self.schema.entity_type(requested_type) - if operation not in requested_type.operations: - # Is the operation allowed on the entity_type, if not throw an exception - entity_type.allowed(operation) - - # target = self.find_entity(request) - - # Parameters required to post the request to the work queue - reply_to = request.reply_to - correlation_id = request.correlation_id - entity_type_ordinality = entity_type.ordinality - #operation_ordinality = operation.ordinality - - print 'reply_to, correlation_id, entity_type_ordinality ', reply_to, correlation_id, entity_type_ordinality - - if operation: - operation = operation.lower() - if operation == 'create': - - # request.body is already a map with the create parameters - # request.properties has the operation, name, type - request_handler.qd_post_management_request(operation, entity_type_ordinality, correlation_id, reply_to, - request.body) - - request_type = request.properties.get('type') - if self.schema.is_long_name(request_type): - enum_type = self.schema.type_map.get(request_type) - else: - # Get the corresponding long name from short name - long_type = self.schema.short_long_type_map.get(request_type) - enum_type = self.schema.type_map.get(long_type) - - # post an entry into the work queue - #qd_dispatch_post_management_request(self.dispatch, oper, enum_type) - - else: - raise not_implemented(operation, target.type) - """ - if operation.lower() == 'create': - # Create requests are entity requests but must be handled by the agent since - # the entity does not yet exist. - return self.create(request) - else: - target = self.find_entity(request) - target.entity_type.allowed(operation, request.body) - try: - method = getattr(target, operation.lower().replace("-", "_")) - except AttributeError: - not_implemented(operation, target.type) - return method(request) - """ - - def respond(self, request, status=OK, description=None, body=None): - """Send a response to the client""" - if body is None: body = {} - description = description or STATUS_TEXT[status] - response = Message( - address=request.reply_to, - correlation_id=request.correlation_id, - properties={'statusCode': status, 'statusDescription': description}, - body=body) - self.log(LOG_DEBUG, "Agent response:\n %s\n Responding to: \n %s"%(response, request)) - try: - self.io.send(response) - except: - self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, format_exc())) - - def receive(self, request, unused_link_id, unused_cost): - """ - Called when a management request is received. - The init function registers this function as the one to be called by creating an IOAdapter. - """ - # If there's no reply_to, don't bother to process the request. - if not request.reply_to: - return - - def error(e, trace): - """Raise an error""" - self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace)) - self.respond(request, e.status, e.description) - - # Coarse locking, handle one request at a time. - with self.request_lock: - try: - self.entities.refresh_from_c() - self.log(LOG_DEBUG, "Agent request %s"% request) - status, body = self.handle(request) - self.respond(request, status=status, body=body) - except ManagementError, e: - error(e, format_exc()) - except ValidationError, e: - error(BadRequestStatus(str(e)), format_exc()) - except Exception, e: - error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1df96814/python/qpid_dispatch_internal/management/agent/__init__.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/agent/__init__.py b/python/qpid_dispatch_internal/management/agent/__init__.py new file mode 100644 index 0000000..84cb695 --- /dev/null +++ b/python/qpid_dispatch_internal/management/agent/__init__.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""Qpid Dispatch internal management package.""" + + +from .agent import ManagementAgent + +__all__ = ["ManagementAgent"] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org