activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject svn commit: r781882 - /activemq/sandbox/activemq-flow/webgen/src/
Date Fri, 05 Jun 2009 01:18:41 GMT
Author: cmacnaug
Date: Fri Jun  5 01:18:41 2009
New Revision: 781882

Updated architecture notes.


Modified: activemq/sandbox/activemq-flow/webgen/src/
--- activemq/sandbox/activemq-flow/webgen/src/ (original)
+++ activemq/sandbox/activemq-flow/webgen/src/ Fri Jun  5 01:18:41 2009
@@ -1,11 +1,136 @@
 title: Architectural Overview
 --- pipeline:textile
-h2. Overview
+h1. Overview
 This page explores some of the details around core components. 
-h2. Flow Control
-TODO: I've got some more background notes that I need to 
+h1. Getting Started:
+Some pointers to get you going with the prototype:
+It is located in the activemq sandbox at: "":}
+Running some testcases:
+When running these testcases you should use the -server jvm argument:
+# *activemq-queue module org.apache.activemq.flow.MockBrokerTest*. This is a good starting
point for looking at some of the flow control, dispatcher and queue modules. It is a very
basic MockBroker which uses a lightweight proto-buf based protocol. You should feel free to
play around with some of the hardcoded options in the test (e.g. ptp=true, tcp=true, threadsPerDispatcher
+# *activemq-openwire module* This test does
performance testing against the SharedQueue implementation and is a good place to start looking
at queue persistence and the CursoredQueue. It employs openwire messages with using network
connecitons. Check out ** which currently hardcodes
the queue sizes; you can change DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD and the associated persistence
policy to play around with paging.
+# *activemq-all module*. This test
uses the full blown broker, adds in a tcp transport layert and provides options for persistence
and durable subscriptions (albeit asynchronous at the moment). It extends **
in the activemq-broker module which has some additional hard coded options to play around
with. Again refer to ** to mess around with queue
+h1. Flow Control
+The activemq-flow package is meant to be a standalone module that deals generically with
Resources and Flow's of elements that flow through and between them. Each resource creates
a FlowController for each of it's Flows which is assigned a corresponding FlowLimiter. As
elements (e.g. messages) pass from one resource to another they are passed through the downstream
resource's FlowController which updates its Limiter. If propagation of an element from one
resource to another causes the downstream limiter to become throttled the associated FlowController
will block the source of the element. The flow module is used heavily by the rest of the core
for memory and disk management.
+* _Memory Mangement:_ Memory is manged based on the resources in play \-\- the usage is computed
by summing of the space allocated to each of the resources' limiters. This strategy intentionally
avoids a centralized memory limit which leads to complicated logic to track when a centralized
limiter needs to be decremented and avoids contention between multiple resources/threads accessing
the limiter and also reduces the potential for memory limiter related deadlocks. However,
it should be noted that this approach doesn't preclude implementing centralized limiters in
the future.
+* _Flow Control:_ As messages propagate from one resource A to another B, then if A overflows
B's limit, B will block A and A can't release it's limiter space until B unblocks it. This
allowance for overflow into downstream resources is a key concept in flow control performance
and ease of use. Provided that the upstream resource has already accounted for the message's
memory it can freely overflow any downstream limiter providing it reserves space from elements
that caused overflow.
+* _Threading Model:_ Note that as a message propagates from A to B, that the general contract
is that A won't release it's memory if B blocks it during the course of dispatch. This means
that it is not safe to perform a thread handoff during dispatch between two resources since
the thread dispatching A relies on the message making it to B (so that B can block it) prior
to A completing dispatch.
+* _Management/Visibility:_ Another intended use of the activemq-flow module is to assist
in visibility e.g. provide an underlying map of resources that can be exposed via tooling
to see the relationships between sources and sinks of messages and to find bottlenecks ...
this aspect has been downplayed for now as we have been focusing more on the queueing/memory
management model in the prototype, but eventually the flow package itself will provide a handy
way of providing visibility in the system particularly in terms of finding performance bottlenecks.
+*FlowResource (FlowSink and FlowSource):* A container for FlowControllers providing some
lifecycle related logic. The base resource class handles interaction/registration with the
FlowManager (below).
+*FlowManager:* Registry for Flow's and FlowResources. The manager will provide some hooks
into system visibility. As mentioned above this aspect has been downplayed somewhat for the
present time.
+*FlowController*: Wraps a FlowLimiter and actually implements block/resume logic
+*FlowLimiter*: Defines the limits enforced by a FlowController. Currently the package has
size based limiter implementations, but eventually should also support other common limiter
types such as rate based limiters. The limiter's are also extended at other points in the
broker (for example implementing a protocol based WindowLimiter). It is also likely that we
would want to introduce CompositeLimiters to combine various limiter types.
+*Flow*: The concept of a flow is not used very heavily right now. But a Flow defines the
stream of elements that can be blocked. In general the prototype creates a single flow per
resource, but in the future a source may break it's elements down into more granular flows
on which downstream sinks may block it. One case where this is anticipated as being useful
is in networks of brokers where-in it may be desirable to partition messages into more granular
flows (e.g based on producer or destination) to avoid blocking the broker-broker connection
+h1. Dispatcher / Threading Model
+The dispatcher package is basically a smart thread pool \-\- at the new broker core almost
everything should be driven by a dispatcher to keep thread count down and scalability up.
+    * Components in the core create DispatchContexts which they use to request dispatch via
a Dispatcher. The dispatch attempts to be smart about grouping together dispatch contexts
that interact with one another to reduce contended synchronization. For example if we have
a producer->queue->consumer scenario each with their own dispatch context, the dispatcher
attempts to group them all on the same thread to avoid contention.
+    * The Dispatcher is priority based which in conjunction with partitioned queues assists
in dispatching higher priority messages
+    * The Dispatcher also support creation of simple Executors that operate on the dispatch
+    * Supports timer callbacks.
+h1. Persistence Store
+KahaDB based. As it stands now there is a single worker thread / queue that drives the store
impl (BrokerDatabase).
+    * Like other resources, the BrokerDatabase employs a flow controller so as elements are
added to the queue they may block the source (e.g. a producer) if the configured memory limit
is exceeded.
+    * The BrokerDatabase allows events to be delayed. This is at a key optimization for persistent
delivery (in conjunction with the CursoredQueue implementation). When a persistent message
is placed on a queue with consumers that are keeping up, it is placed on the DB queue with
a flush delay. If the consumer, acknowledged the message prior to the delay expiring it is
removed from the queue and doesn't have to hit the disk.
+The approach of a single worker thread driving the store is still under debate. It's pros
are that it reduces contention at the store layer, allows batching of several operations into
a single disk sync, and simplifies ordering of interrelated db operations. The major downside
is that it doesn't allow the store to leverage all cpus for encoding decoding work. The store
implementation itself is not yet complete and needs more work.
+h1. Queues
+The activemq-queue module contains basic queue implementations and a Subscription interfaces.
It relies heavily on activemq-flow and activemq-dispatcher modules. The intent is to allow
the queue module to be used by both the broker and the client. The queue module has 3 basic
queue types:
+* Exclusive Queue: Only one subscription is attached to the queue (e.g. topic subscriber
or durable)
+* Share Queue: A load balanced queue where messages are loadbalanced across multiple consumers.
+* Relay: Like a queue but simply a pass through (useful in cases where it doesn't make sense
to buffer up messages but where you still want to a FlowController to throttle sources.
+In addition the queue module defines partitioned queues to allow segmenting messages based
on arbitrary criteria (more below). It also has priority based implementations built in.
+h2. CursoredQueue
+The hope is to use the CursoredQueue as the single persistence mechanism for messages, and
serves as the basis for other queue types that require persistence. It supports browsing and
+It the case of SharedQueues, Durables, Transactions and even non durables that wish to offload
messages to the store when the consumer is too slow.
+* Transactions: usecase is to hold messages in memory until commit time, spooling to disk
if transaction memory limit is reached.
+* SharedQueue store queue messages.
+* DurableQueue store durable messsages
+* TopicSubscription when configured to
+It can operate in a couple of different modes:
+* Keep all elements in memory and throttle sources to the memory limit \-\- persisting only
elements whose QoS level dictates it.
+* Two paging modes otherwise. Offload the message itself but keep a placeholder in memory
(increases performance), or page out place holders to allow for very large queue backlogs
(where the placeholders themselves become a memory burden).
+There is still a fair amount of policy work /customization to be done here with respect to
paging queues, particular with respect to slow or disconnected subscribers:
+* Expunge Policy: When to discard messages (if ever)
+* Handling of temporary bursts: In general if subscribers can keep up with the publishers
it is best to keep messages in memory instead of paging them out, however, for the case of
a temporary burst paging to disk can be appropriate
+* When paging out page out a reduced rate or as fast as possible?
+* Disconnected Subscriber (no subs on queue or disconnected durable): throttle the rate at
which producers send messages to the store to improve recovery time?
+* Other usecases/scenarios?
+h2. Partitioning of queues
+We've also introduced the concept of partitioned queue in which a single queue can be partitioned
to increase parallelization of queue processing. Additionally:
+* Priority based partitions allow dispatch of higher priority messages first.
+* Can be used for JMSXGroupID (not implemented yet)
+* For clustering connections partitioning can be used to create parallel independently limited
message flows between brokers (so if there is a slow consumer at the other side only only
the streams of messages being sent to it will be blocked).
+h1. Work In Progress
+h2. Transactions:
+* Should be backed by cursored queue
+h2. Large Message Support:
+* There are some stubs in the Store interface for this, but basically for large messages
we should be able to stage them to disk, and when given to a consumer chunk the payload out
to keep memory low. The cursored queue will need to be updated to handle this.
+h2. Protocol Support
+Lots of work still to be done here\!
+* Protocol Handlers are there for OpenWire and STOMP, and we'll want to add proto buf as
+* Do we convert things to a common message format on arrival or keep them in their native
format doing transforms on the fly?
+* Common interfaces for Connection/Session/*Consumer for the broker to interact with?
+* Threading model ... We should probably come up with a good abstract base class for the
protocol handlers that helps to provide a threading model that will make synchronization between
I/O events (exceptions etc), protocol events and message dispatch). 
+h2. Replication:
+* This should be handled by KahaDB replication
+* Still need to think about replication of non persistent messages, though ...e.g. messages
that are never saved to disk but only replicated to peers.
+* Recovery time. Keep state fairly hot to achieve fast failover time, we'll want to keep
optimizations in this area in mind (basically messages that are in memory on the active broker
are kept in memory on the standby as well).
+h2. General Interface cleanliness
+A fair amount of clean up work needs to be done to clean up and harden interfaces in the
+h2. Testing
+As much of the work to date has been largely experimental in nature, testing up to now has
focussed mainly on end to end type performance tests, but more unit testing and functional
testing of edge cases such as slow subscriber test etc are needed. 
\ No newline at end of file

View raw message