activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject Re: How to avoid blocking of queue browsing after ActiveMQ checkpoint call
Date Fri, 15 Jan 2016 15:59:00 GMT
First, I think you're right that it's problematic that KahaDB checkpoint
operations and "real" usage of the memory store are able to block one
another, though my reasons for saying that may not be quite the same as
yours.  Fundamentally, the memory store and the persistence store are
intended to be two independent storage areas, used for different purposes
(one stores persistent messages, one stores non-persistent messages).  One
should be able to fill (and invoke flow control on the destinations that
use it) without affecting the other.  Yet what's actually happened is that
we needed the ability to read messages from the persistent store into
memory, and instead of recognizing the abstraction that the memory store is
for non-persistent memory (and therefore not for the temporarily-in-memory
copies of persistent messages as they are read into cursors), we treated
the memory store as a catch-all for any copy of a message that wasn't in a
persistent store.  Obviously they have to go somewhere, but there's already
a catch-all place for storing stuff that's being used for current
processing, and it's called the heap, and we should have used it for copies
of messages read in by cursors (or any other mechanism for reading messages
from the persistent store) instead of placing them into the memory store.

One argument for putting the messages into the memory store might have been
to ensure that lots of cursor-intensive operations don't cause the broker
to OOM.  I don't agree with that argument.  Overloading a piece of software
beyond the memory capacity that you've chosen to configure it with will
always cause the software to OOM; if you connect to the broker with a
billion separate connections, it's going to OOM, so I don't see why trying
to do too much simultaneous browsing of the persistent store should be any
different.  The argument will always be, "if the way you're using the
broker causes it to run out of memory, then either give it more memory so
you don't run out, or stop doing whatever you're doing that's causing it to
use so much memory;" there's no need to use the memory store as an attempt
to protect us from that problem.

So I roughly agree with the general premise that KahaDB operations should
be unaffected by a full memory store and should not cause the memory store
to fill, though my proposal for how to solve the problem differs from
yours, and I don't think that AMQ-6115 really captures what I'm proposing.
I'll submit a separate JIRA to propose that when I get some time this
weekend.

But putting that aside, it's obvious from your description that how you've
currently configured the broker is insufficient for the usage patterns
you're looking to support.  (If it was sufficient, the checkpoint would
succeed and everything would be find, as you've proven by your experiment
of increasing the memory store size and the heap size.)  So the same
argument I described halfway through the paragraph about OOMs applies to
you here: you're trying to do too much for the memory resources you've
allocated to your broker, so you either need to increase your resource
allocation or decrease your workload.  (Or we need to decrease the amount
of memory used by the checkpointing algorithm; more on that in a minute.)

Increasing your resource allocation is simple, but you say it's not
possible; I'm very skeptical that it's truly not possible, especially when
you say the host has 16GB total but you're only allowed 1GB, and would
recommend you question your assumptions (and those of your bosses, systems
engineers, customers, etc.) to see if it's actually not possible or just
not easy, but ultimately you know your scenario and if it's not possible
then it's not possible.

Decreasing your workload in this case means reducing the amount of work
required to checkpoint your KahaDB instance.  I don't have a lot of
expertise on the KahaDB checkpoint algorithm, but I'd assume that you'll
use less memory store if you have fewer messages in your KahaDB instance,
and that you might also use less if you have the same number of messages in
fewer destinations (since I assume that the cursors only have one page
worth of messages loaded at a time, so fewer cursors would mean fewer pages
loaded and therefore less memory usage at any one time).  So one option
might be to rearchitect your use of destinations to push messages into
fewer destinations and use selectors to ensure that the right consumers get
the right ones.  Another solution that could work would be to keep messages
for less time, either by discarding them if they're unconsumed after a
certain amount of time, by changing your consumers to reduce the max
possible time before they consume, or by consuming them yourself and
storing them into a database, to be retrieved later.  ActiveMQ is not a
database and is not optimized to act as one; it's optimized for delivery of
messages shortly after the are produced, and by keeping large amounts of
data available for days at a time, you're using it in ways it's not
optimized for (as you've discovered), but other products are optimized to
be used as a database and you could consume content yourself and write it
into one of those products if it's unconsumed after a certain amount of
time.  A third solution would be to provide dedicated brokers for each
consumer with a hub-and-spoke topology, so that there are fewer messages in
any given broker (so you've got a better chance of not running out of
memory store space while checkpointing) and one consumer going offline
doesn't affect the other brokers since the other brokers won't see the
offline consumer's messages.  A fourth option could be to shard your
destinations across multiple central brokers (standalone, not part of a
network of brokers), where each one is responsible for only certain
destinations and consumers connect to all brokers and know which
destinations are found on which broker.  There might be other ways to do
this, but those are the four I can think of: reduce your number of
destinations, reduce your message volume by not keeping them on the broker
as long, reduce your messages per broker by using a network of brokers, or
reduce your messages per broker by sharding across multiple standalone
brokers.

I promised to come back to whether the ActiveMQ code can be changed to
reduce the memory footprint used by checkpointing, so here it is.  I
haven't looked at the code that does the checkpointing, but I would assume
that it would be possible to configure it in ways that don't use as much of
the memory store, possibly at the cost of having checkpoints take longer.
One possibility might be to reduce the page size of the cursors that are
used for checkpointing each destination, so that each one puts fewer
messages at a time into the memory store.  Another option might be to limit
the number of destinations that can be checkpointed in parallel, so there
are at most N * Page Size messages in the memory store at a time due to
checkpointing.  Maybe there are other options as well, but those are the
two that jumped to mind without having read the code.  Both changes might
increase the time required to perform a checkpoint, so they'd need to be
configuration options that were exposed rather than changing default
behavior.  I don't want to submit a JIRA enhancement request without having
read the code to confirm that it works the way I've assumed it does, so I'm
not going to submit a JIRA entry about these ideas at the moment.  If you
have time to find and read the source code and evaluate whether these
proposed solutions make sense (and whether there's a better option), then
please do, otherwise I'll try to get to it when I have some time.  Either
way, this won't give you an immediate fix and there's no guarantee that any
enhancement you request will be implemented, so in the meantime I'd
encourage you to consider some of the other things I've recommended to you,
so you have options between now and whenever a version containing one of
these enhancements is released.

Tim

On Wed, Jan 13, 2016 at 8:43 AM, Klaus Pittig <
klaus.pittig@futura4retail.com> wrote:

> a.) Regarding your last answer (thanks for your effort by the way):
>
> I'm aware of the relation between the heap and the systemUsage
> memoryLimit and we make sure that there are no illogical settings.
> The primary requirement is to have a stable system running 'forever'
> w/o any memory issues at any time independent from the load/throughput.
> No one really wants to deal with memory settings on the edge of limits.
>
> You're right: the memory is completely consumed. And I can't guarantee
> the checkpoint/cleanup to be finished completely, so the system can be
> stalled without giving GC a chance to release some memory.
>
> It's the expiry check causing this. The persistent stores themselves
> seem to be managed as expected (no issues, no inconsistency, no loss);
> our situation is independent of the storage (reproducable for leveldb
> and kahadb). For KahaDB we use 16mb for journal files since years
> (helps to save a huge amount of space required for pending messages
> not consumed for some days due to offline situations on client side).
> Anyway, here is our current configuration you requested:
>
> <persistenceAdapter>
>   <kahaDB directory="${activemq.base}/data/kahadb"
> enableIndexWriteAsync="true" journalMaxFileLength="16mb"
> indexWriteBatchSize="10000" indexCacheSize="10000" />
> <!--
>   <levelDB directory="${activemq.base}/data/leveldb" logSize="33554432" />
> -->
> </persistenceAdapter>
>
>
> b.) Some proposal concerning AMQ-6115:
>
> In my point of view, it's worth to discuss the one and only
> memoryLimit parameter used for both the regular browse/consume threads
> and the checkpoint/cleanup threads.
> There should always be enough space to browse/consume any queue at
> least with prefetch 1 resp. one of the next pending messages.
> Maybe - in this case - 2 well-balanced memoryLimit parameters with
> priority on consumption instead of checkpoint/cleanup are helpful for
> a a better regulation. Or something near it.
>
>
> c.) Our results and an acceptable solution so far:
>
> After a thorough investigation (w/o changing ActiveMQ source code) the
> result is for now that we need to accept the limitations defined by
> the single memoryLimit parameter used both for the #checkpoint/cleanup
> process and browsing/consuming queues.
>
> **1.) Memory**
>
> There is not a problem, if we use a much higher memoryLimit (together
> with a higher max-heap) to support both the message caching per
> destination during the #checkpoint/cleanup workflow and our
> requirements to browse/consume messages.
>
> But more memory is not an option in our scenario, we need to deal with
> 1024m max-heap and 500m memoryLimit.
>
> Besides this, constantly setting higher memoryLimits just because of
> more persistent queues containing hundreds/thousands of pending
> messages together with certain offline/inactive consumer scenarios
> should be discussed in detail (IMHO).
>
>
> **2.) Persistent Adapters**
>
> We ruled out persistent adapters as the cause of the problem, because
> the behaviour doesn't change, if we switch different types of
> persistent stores (KahaDB, LevelDB, JDBC-PostgreSQL).
>
> During the debugging sessions with KahaDB we also see regular
> checkpoint handling, the storage is managed as expected.
>
>
> **3.) Destination Policy / Expiration Check**
>
> Our problem completely disappears, if we disable caching and the
> expiration check, which is the actual cause of the problem.
>
> The corresponding properties are documented and there is a nice blog
> article about Message Priorities with a description quite suitable for
> our scenario:
>
> - http://activemq.apache.org/how-can-i-support-priority-queues.html
> -
>
> http://blog.christianposta.com/activemq/activemq-message-priorities-how-it-works/
>
> We simply added useCache="false" and expireMessagesPeriod="0" to the
> policyEntry:
>
> <destinationPolicy>
>   <policyMap>
>     <policyEntries>
>       <policyEntry queue=">" producerFlowControl="false"
> optimizedDispatch="true" memoryLimit="128mb"
> timeBeforeDispatchStarts="1000"
>                              useCache="false" expireMessagesPeriod="0">
>         <dispatchPolicy>
>           <strictOrderDispatchPolicy />
>         </dispatchPolicy>
>         <pendingQueuePolicy>
>           <storeCursor />
>         </pendingQueuePolicy>
>       </policyEntry>
>     </policyEntries>
>   </policyMap>
> </destinationPolicy>
>
>
> The consequences are clear, if we don't use in-mem caching anymore and
> never check for message expiration.
>
> For we neither use message expiration nor message priorities and the
> current message dispatching is fast enough for us, this trade-off is
> acceptable regarding given system limitations.
>
> One should also think about well-defined prefetch limits for memory
> consumption during specific workflows. Message sizes in our scenario
> can be 2 Bytes up to approx. 100 KB, so more individual policyEntries
> and client consumer configurations could be helpful to optimize system
> behaviour concerning performance and memory usage (see
> http://activemq.apache.org/per-destination-policies.html).
>
>
> Cheers
> Klaus
>
>
> Am 11.01.16 um 15:35 schrieb Tim Bain:
> > I believe you are correct: browsing a persistent queue uses bytes
> > from the memory store, because those bytes must be read from the
> > persistence store into the memory store before they can be handed
> > off to browsers or consumers.  If all available bytes in the memory
> > store are already in use, the messages can't be paged into the
> > memory store, and so the operation that required them to be paged
> > in will hang/fail.
> >
> > You can work around the problem by increasing your memory store
> > size via trial-and-error until the problem goes away.  Note that
> > the broker itself needs some amount of memory, so you can't give
> > the whole heap over to the memory store or you'll risk getting
> > OOMs, which means you may need to increase the heap size as well.
> > You can estimate how much memory the broker needs aside from the
> > memory store by subtracting the bytes used for the memory store
> > (539 MB) from the total heap bytes used as measured via JConsole or
> > similar tools.  I'd double (or more) that number to be safe, if it
> > was me; the last thing I want to deal with in a production
> > application (ActiveMQ or anything else) is running out of memory
> > because I tried to cut the memory limits too close just to save a
> > little RAM.
> >
> > All of that is how to work around the fact that before you try to
> > browse your queue, something else has already consumed all
> > available bytes in the memory store.  If you want to dig into why
> > that's happening, we'd need to try to figure out what those bytes
> > are being used for and whether it's possible to change
> > configuration values to reduce the usage so it fits into your
> > current limit.  There will definitely be more effort required than
> > simply increasing the memory limit (and max heap size), but we can
> > try if you're not able to increase the limits enough to fix the
> > problem.
> >
> > If you want to go down that path, one thread to pull on is your
> > observation that you "can browse/consume some Queues  _until_ the
> > #checkpoint call after 30 seconds."  I assume from your reference
> > to checkpointing that you're using KahaDB as your persistence
> > store.  Can you post the KahaDB portion of your config?
> >
> > Your statements here and in your StackOverflow post (
> >
> http://stackoverflow.com/questions/34679854/how-to-avoid-blocking-of-queue-browsing-after-activemq-checkpoint-call
> )
> >
> >
> indicate that you think that the problem is that memory isn't getting
> > garbage collected after the operation that needed it (i.e. the
> > checkpoint) completes, but it's also possible that the checkpoint
> > operation isn't completing because it can't get enough messages
> > read into the memory store.  Have you confirmed via the thread dump
> > that there is not a checkpoint operation still in progress?  Also,
> > how large are your journal files that are getting checkpointed?  If
> > they're large enough that all messages for one file won't fit into
> > the memory store, you might be able to prevent the problem by using
> > smaller files.
> >
> > Tim On Jan 8, 2016 9:32 AM, "Klaus Pittig"
> > <klaus.pittig@futura4retail.com> wrote:
> >
> >> If I increase the JVM max heap size (4GB), the behavior does not
> >> change. In my point of view, the configured memoryLimit (500 MB)
> >> works as expected (heapdump shows same max. size for the
> >> TextMessage content, i.e. 55002 byte[] instances containing 539
> >> MB total).
> >>
> >> However, trying to browse a queue shows no content, even if there
> >> is enough heap memory available.
> >>
> >> As far as i understand the sourcecode, this also due to the
> >> configured memoryLimit, because - i hope this is the answer you
> >> expect - the calculation for available causes hasSpace = false.
> >>
> >> I found this here:
> >>
> >> AbstractPendingMessageCursor { public boolean hasSpace() { return
> >> systemUsage != null ?
> >> (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark))
> >> : true; } public boolean isFull() { return systemUsage != null ?
> >> systemUsage.getMemoryUsage().isFull() : false; } }
> >>
> >>
> >> #hasSpace is in this case called during a click on a queue in
> >> the Webconsole; see the 2 stacks during this workflow:
> >>
> >> Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107
> >> in QueueStorePrefetch)) owns: QueueStorePrefetch (id=6036) owns:
> >> StoreQueueCursor (id=6037) owns: Object (id=6038)
> >> QueueStorePrefetch.doFillBatch() line: 107
> >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> >> StoreQueueCursor.reset() line: 159
> >> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
> >> 1596 DedicatedTaskRunner.runTask() line: 112
> >> DedicatedTaskRunner$1.run() line: 42
> >>
> >> Daemon Thread [ActiveMQ VMTransport: vm://localhost#1]
> >> (Suspended (breakpoint at line 107 in QueueStorePrefetch)) owns:
> >> QueueStorePrefetch (id=5974) owns: StoreQueueCursor (id=5975)
> >> owns: Object (id=5976) owns: Object (id=5977)
> >> QueueStorePrefetch.doFillBatch() line: 107
> >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381
> >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142
> >> StoreQueueCursor.reset() line: 159
> >> Queue.doPageInForDispatch(boolean, boolean) line: 1897
> >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line:
> >> 1596 Queue.wakeup() line: 1822
> >> Queue.addSubscription(ConnectionContext, Subscription) line: 491
> >> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo) line: 399
> >> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 427
> >> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo)
> >> line: 244
> >> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
> >> 104
> >> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo)
> >> line: 102
> >> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext,
> >> ConsumerInfo) line: 102
> >> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
> >>
> >>
> ConsumerInfo) line: 107
> >> TransportConnection.processAddConsumer(ConsumerInfo) line: 663
> >> ConsumerInfo.visit(CommandVisitor) line: 348
> >> TransportConnection.service(Command) line: 334
> >> TransportConnection$1.onCommand(Object) line: 188
> >> ResponseCorrelator.onCommand(Object) line: 116
> >> MutexTransport.onCommand(Object) line: 50 VMTransport.iterate()
> >> line: 248 DedicatedTaskRunner.runTask() line: 112
> >> DedicatedTaskRunner$1.run() line: 42
> >>
> >>
> >>
> >> Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the
> >> PolicyEntry for queue=">" also has no effect.
> >>
> >>
> >> Am 08.01.16 um 16:32 schrieb Tim Bain:
> >>> If you increase your JVM size (4GB, 8GB, etc., the biggest your
> >>> OS and hardware will support), does the behavior change?  Does
> >>> it truly take all available memory, or just all the memory that
> >>> you've made available to it (which isn't tiny but really isn't
> >>> all that big)?
> >>>
> >>> Also, how do you know that the MessageCursor seems to decide
> >>> that there is not enough memory and stops delivery of queue
> >>> content to browsers/consumers?  What symptom tells you that? On
> >>> Jan 8, 2016 8:25 AM, "Klaus Pittig"
> >>> <klaus.pittig@futura4retail.com> wrote:
> >>>
> >>>> (related issue:
> >>>> https://issues.apache.org/jira/browse/AMQ-6115)
> >>>>
> >>>> There's a problem when Using ActiveMQ with a large number of
> >>>> Persistence Queues (250) á 1000 persistent TextMessages á 10
> >>>> KB.
> >>>>
> >>>> Our scenario requires these messages to remain in the storage
> >>>> over a long time (days), until they are consumed (large
> >>>> amounts of data are staged for distribution for many
> >>>> consumer, that could be offline for some days).
> >>>>
> >>>>
> >>>> After the Persistence Store is filled with these Messages and
> >>>> after a broker restart we can browse/consume some Queues
> >>>> _until_ the #checkpoint call after 30 seconds.
> >>>>
> >>>> This call causes the broker to use all available memory and
> >>>> never releases it for other tasks such as Queue
> >>>> browse/consume. Internally the MessageCursor seems to decide,
> >>>> that there is not enough memory and stops delivery of queue
> >>>> content to browsers/consumers.
> >>>>
> >>>>
> >>>> => Is there a way to avoid this behaviour by configuration or
> >>>> is this a bug?
> >>>>
> >>>> The expectation is, that we can consume/browse any queue
> >>>> under all circumstances.
> >>>>
> >>>> Settings below are in production for some time now and
> >>>> several recommendations are applied found in the ActiveMQ
> >>>> documentation (destination policies, systemUsage, persistence
> >>>> store options etc.)
> >>>>
> >>>> - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and
> >>>> 5.5.1. - Memory Settings: Xmx=1024m - Java: 1.8 or 1.7 - OS:
> >>>> Windows, MacOS, Linux - PersistenceAdapter: KahaDB or
> >>>> LevelDB - Disc: enough free space (200 GB) and physical
> >>>> memory (16 GB max).
> >>>>
> >>>> Besides the above mentioned settings we use the following
> >>>> settings for the broker (btw: changing the memoryLimit to a
> >>>> lower value like 1mb does not change the situation):
> >>>>
> >>>> <destinationPolicy> <policyMap> <policyEntries> <policyEntry
> >>>> queue=">" producerFlowControl="false"
> >>>> optimizedDispatch="true" memoryLimit="128mb"
> >>>> timeBeforeDispatchStarts="1000"> <dispatchPolicy>
> >>>> <strictOrderDispatchPolicy /> </dispatchPolicy>
> >>>> <pendingQueuePolicy> <storeCursor /> </pendingQueuePolicy>
> >>>> </policyEntry> </policyEntries> </policyMap>
> >>>> </destinationPolicy> <systemUsage> <systemUsage
> >>>> sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage
> >>>> limit="50 mb" /> </memoryUsage> <storeUsage> <storeUsage
> >>>> limit="80000 mb" /> </storeUsage> <tempUsage> <tempUsage
> >>>> limit="1000 mb" /> </tempUsage> </systemUsage>
> >>>> </systemUsage>
> >>>>
> >>>> If we set the **cursorMemoryHighWaterMark** in the
> >>>> destinationPolicy to a higher value like **150** or **600**
> >>>> depending on the difference between memoryUsage and the
> >>>> available heap space relieves the situation a bit for a
> >>>> workaround, but this is not really an option for production
> >>>> systems in my point of view.
> >>>>
> >>>> Screenie with information from Oracle Mission Control showing
> >>>> those ActiveMQTextMessage instances that are never released
> >>>> from memory:
> >>>>
> >>>> http://goo.gl/EjEixV
> >>>>
> >>>>
> >>>> Cheers Klaus
> >>>>
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message