activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: redelivery policy & system usage limits not being applied correctly (Activemq 5.8)
Date Tue, 29 Oct 2013 16:52:40 GMT
It's not clear from your settings whether producer flow control is on/off.

But either way, as far as i remember, the store positions (and thus usage)
are updated at checkpoints (default every 5s)...

If you could put together a quick unit test that shows your issues (i see
you already have some java code), then it would help us quicker diagnose :)


On Tue, Oct 29, 2013 at 2:59 AM, Jamie <jamie@mailarchiva.com> wrote:

> Greetings
>
> I am experiencing a couple of issues while using ActiveMQ 5.8. They are:
>
> 1) Redelivery policy is not being applied correctly.
>
> 2) ActiveMQ broker is not adhering to set usage constraints. The queue is
> filling up until it runs out of diskspace
> despite the fact that store limits are set, etc.
>
> The queues are otherwise working fine.
>
> What am I missing here?
>
> Much appreciate
>
> Jamie
>
>
> Redelivery Policy
> ==========
>
> Despite redelivery delay being set to 30 seconds, redelivery is always
> instant. The policy is not being adhered to. Here's my code:
>
> RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> redeliveryPolicy.**setInitialRedeliveryDelay(**
> TimeUnit.MILLISECONDS.convert(**getInitialRedeliveryDelaySecs(**),
> TimeUnit.SECONDS));
> redeliveryPolicy.**setRedeliveryDelay(TimeUnit.**MILLISECONDS.convert(**getRedeliveryDelaySecs(),
> TimeUnit.SECONDS));
> redeliveryPolicy.**setMaximumRedeliveryDelay(**
> TimeUnit.MILLISECONDS.convert(**getMaxRedeliveryDelaySecs(),
> TimeUnit.SECONDS));
> redeliveryPolicy.**setUseExponentialBackOff(**isExponentialBackoff());
> redeliveryPolicy.**setMaximumRedeliveries(**getMaxRedeliveries());
> redeliveryPolicy.**setBackOffMultiplier(**getBackOffMultplier());
>
> If processing fails, I am calling  session.recover();
>
> Here are the settings
>
> (ActiveMQConnectionFactory)**connectionFactory).**
> setAlwaysSessionAsync(false);
> ((ActiveMQConnectionFactory)**connectionFactory).**setRedeliveryPolicy(**getRedeliveryPolicy());
>
> ActiveMQPrefetchPolicy prefectPolicy = new ActiveMQPrefetchPolicy();
> prefectPolicy.**setQueuePrefetch(10000);
> prefectPolicy.**setQueueBrowserPrefetch(5000);
> ((ActiveMQConnectionFactory)**connectionFactory).**setMaxThreadPoolSize(**processThreads);
>
> ((ActiveMQConnectionFactory)**connectionFactory).**setPrefetchPolicy(**prefectPolicy);
>
> ((ActiveMQConnectionFactory)**connectionFactory).**setUseAsyncSend(true);
> connection = connectionFactory.**createConnection();
> connection.start();
>
> System Usage Constraints
> ================
>
> String brokerName = app.getFileSystem().**getApplicationName();
> brokerService = new BrokerService();
> brokerService.setBrokerName(**brokerName);
> brokerService.setUseJmx(true);
> brokerService.**setUseLoggingForShutdownErrors**(true);
> DeadLetterStrategy strategy = new IndividualDeadLetterStrategy()**;
> strategy.**setProcessNonPersistent(true);
> strategy.setProcessExpired(**true);
> ((**IndividualDeadLetterStrategy)**strategy).**
> setUseQueueForQueueMessages(**true);
> ((**IndividualDeadLetterStrategy)**strategy).setQueuePrefix("DLQ.**");
> PolicyMap policyMap = new PolicyMap();
> List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
> PolicyEntry policy = new PolicyEntry();
> policy.setProducerFlowControl(**producerFlowControl);
> policy.setUseCache(true);
> policy.setQueue(">");
> policy.setDeadLetterStrategy(**strategy);
> entries.add(policy);
> policyMap.setPolicyEntries(**entries);
> brokerService.**setDestinationPolicy(**policyMap);
>
> //PERSISTENCE
> brokerService.setPersistent(**true);
> KahaDBPersistenceAdapter persistenceAdapter = new
> KahaDBPersistenceAdapter();
> persistenceAdapter.**setDirectory(kahaPath);
> TFile dataDir =  new TFile(app.getFileSystem().**getAppQueuePath() +
> File.separator + "data");
> brokerService.**setDataDirectoryFile(dataDir);
> dataDir.mkdirs();
> TFile tempQueuePath = getTempQueuePath();
> brokerService.**setTmpDataDirectory(**tempQueuePath);
> SystemUsage systemUsage = brokerService.getSystemUsage()**;
> systemUsage.getStoreUsage().**setLimit(storeUsageMB * 1024 * 1024);
> systemUsage.getTempUsage().**setLimit(tempUsageMB * 1024 * 1024);
> systemUsage.getMemoryUsage().**setLimit(memoryUsageMB * 1024 * 1024);
> brokerService.setSystemUsage(**systemUsage);
> brokerService.**setPersistenceAdapter(**persistenceAdapter);
>
> protected static final String defaultStoreUsageMB = "40";
> protected static final String defaultMemoryUsageMB = "32";
> protected static final String defaultTempUsageMB = "40";
> protected static final String defaultSendFailIfNoSpace = "no";
> protected static final String defaultProducerFlowControl = "yes";
>
>


-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message