activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie <ja...@mailarchiva.com>
Subject redelivery policy & system usage limits not being applied correctly (Activemq 5.8)
Date Tue, 29 Oct 2013 09:59:20 GMT
Greetings

I am experiencing a couple of issues while using ActiveMQ 5.8. They are:

1) Redelivery policy is not being applied correctly.

2) ActiveMQ broker is not adhering to set usage constraints. The queue 
is filling up until it runs out of diskspace
despite the fact that store limits are set, etc.

The queues are otherwise working fine.

What am I missing here?

Much appreciate

Jamie


Redelivery Policy
==========

Despite redelivery delay being set to 30 seconds, redelivery is always 
instant. The policy is not being adhered to. Here's my code:

RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getInitialRedeliveryDelaySecs(),

TimeUnit.SECONDS));
redeliveryPolicy.setRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getRedeliveryDelaySecs(),

TimeUnit.SECONDS));
redeliveryPolicy.setMaximumRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getMaxRedeliveryDelaySecs(),

TimeUnit.SECONDS));
redeliveryPolicy.setUseExponentialBackOff(isExponentialBackoff());
redeliveryPolicy.setMaximumRedeliveries(getMaxRedeliveries());
redeliveryPolicy.setBackOffMultiplier(getBackOffMultplier());

If processing fails, I am calling  session.recover();

Here are the settings

(ActiveMQConnectionFactory)connectionFactory).setAlwaysSessionAsync(false);
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(getRedeliveryPolicy());


ActiveMQPrefetchPolicy prefectPolicy = new ActiveMQPrefetchPolicy();
prefectPolicy.setQueuePrefetch(10000);
prefectPolicy.setQueueBrowserPrefetch(5000);
((ActiveMQConnectionFactory)connectionFactory).setMaxThreadPoolSize(processThreads); 

((ActiveMQConnectionFactory)connectionFactory).setPrefetchPolicy(prefectPolicy); 

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
connection = connectionFactory.createConnection();
connection.start();

System Usage Constraints
================

String brokerName = app.getFileSystem().getApplicationName();
brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setUseJmx(true);
brokerService.setUseLoggingForShutdownErrors(true);
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setProcessExpired(true);
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
PolicyMap policyMap = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry policy = new PolicyEntry();
policy.setProducerFlowControl(producerFlowControl);
policy.setUseCache(true);
policy.setQueue(">");
policy.setDeadLetterStrategy(strategy);
entries.add(policy);
policyMap.setPolicyEntries(entries);
brokerService.setDestinationPolicy(policyMap);

//PERSISTENCE
brokerService.setPersistent(true);
KahaDBPersistenceAdapter persistenceAdapter = new 
KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(kahaPath);
TFile dataDir =  new TFile(app.getFileSystem().getAppQueuePath() + 
File.separator + "data");
brokerService.setDataDirectoryFile(dataDir);
dataDir.mkdirs();
TFile tempQueuePath = getTempQueuePath();
brokerService.setTmpDataDirectory(tempQueuePath);
SystemUsage systemUsage = brokerService.getSystemUsage();
systemUsage.getStoreUsage().setLimit(storeUsageMB * 1024 * 1024);
systemUsage.getTempUsage().setLimit(tempUsageMB * 1024 * 1024);
systemUsage.getMemoryUsage().setLimit(memoryUsageMB * 1024 * 1024);
brokerService.setSystemUsage(systemUsage);
brokerService.setPersistenceAdapter(persistenceAdapter);

protected static final String defaultStoreUsageMB = "40";
protected static final String defaultMemoryUsageMB = "32";
protected static final String defaultTempUsageMB = "40";
protected static final String defaultSendFailIfNoSpace = "no";
protected static final String defaultProducerFlowControl = "yes";


Mime
View raw message