activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bratislav Stojanovic <bratislav1...@gmail.com>
Subject Unable to remove scheduled messages
Date Wed, 27 Mar 2013 18:28:46 GMT
I have two brokers in my local network, both with schedulerSupport="true".

Broker1 (localhost) <-------------> Broker2 (other machine)

Broker1 doesn't
have network connectors hardcoded, they are added dynamically. On the other
side, Broker2
has hardcoded network connector in his activemq.xml like this :

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>

Here's the problem :

On the Broker1 side, I dynamically add connector to the Broker2, schedule a
message using
CRON and send it. Message starts to send every minute and I'm getting the
message back from the
Broker2. So far so good.

Now, I want to remove scheduled messages (stop sending it). To achieve
this, I get the instance of the
JobScheduler, find a job in the list and remove it. After doing so, I'm
seeing in the logs that a message got successfully removed. The problem is,
broker1 will KEEP sending messages no matter if the JobScheduler reported 0
jobs! Does it have to do with the fact that when you schedule a message you
will also
schedule messages in ALL network connected brokers as well???

Here's the code on the broker1 side which adds connector dynamically :

protected boolean addNetworkConnector(Client c) {
try {
// add network connectors
if (broker.getNetworkConnectorByName("nc_"+c.getIp()+"_"+c.getPort()) ==
null) {
NetworkConnector nc = new DiscoveryNetworkConnector(new
URI("static:failover:(tcp://"+c.getIp()+":"+c.getPort()+")"));
nc.setName("nc_"+c.getIp()+"_"+c.getPort());
broker.addNetworkConnector(nc);
nc.start();
logger.debug("Network connector added and started successfully");
// not sure if this is necesarry
int tried = 0;
while (!nc.isStarted() && tried < 5) {
logger.debug("Nc not started. Trying again in 0.5 seconds...");
Thread.sleep(500);
tried++;
}
if (!nc.isStarted()) {
logger.error("Unable to start nc");
return false;
}
}
return true;
} catch (Exception e) {
logger.error("Unable to add network connector",e);
}
return false;
}

Here's the code that shcedules and sends a message :

try {
 TextMessage message = session.createTextMessage();
message.setJMSType("GetStatus");
 message.setIntProperty("cid", c.getId());
 if (num != -1 && !period.isEmpty()) {
logger.debug("Scheduling GetStatus message every "+num+" "+period);
if (period.equals("minutes")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "*/"+num+" *
* * *");
}
else if (period.equals("hours")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 */"+num+"
* * *");
}
else if (period.equals("days")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0
*/"+num+" * *"); // every day at midnight
}
else if (period.equals("weeks")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 * *
0/"+num); // every sunday at midnight
}
else {
logger.error("Unsupported period "+period);
return;
}
}
 if (!addNetworkConnector(c)) return;
 send(message);
 } catch (Exception e) {
logger.debug("",e);
}

Finally, here's the code that removes scheduled job from the job scheduler :

try {
JobScheduler js = OMSyncInit.getServer().getJobScheduler();
if (js != null) {
List<Job> jobs = js.getAllJobs();
logger.debug("Total number of jobs is "+jobs.size());
for (Job j : jobs) {
//logger.debug("Cron entry is "+j.getCronEntry());
//logger.debug("Job id is "+j.getJobId());
// payload is openwire encoded!
// do not mix with message payload! Payload is the whole message here,
headers + real payload!
//logger.debug("Job payload is "+new String(j.getPayload()));
OpenWireFormatFactory fact = new OpenWireFormatFactory();
WireFormat owf = fact.createWireFormat();
Message msg = (Message) owf.unmarshal(new ByteSequence(j.getPayload()));
int cid = msg.getIntProperty("cid");
String jmsType = msg.getJMSType();
//logger.debug("Cid is "+cid);
//logger.debug("Type is "+jmsType);
if (cid == client.getId()) {
if (type.equals("all") || type.equals(jmsType)) {
js.remove(j.getJobId());
logger.info("Job "+j.getJobId()+" for client "+client.getId()+ " removed
successfully");
}
}
}
logger.debug("New total number of jobs is "+js.getAllJobs().size());
}
} catch (Exception e) {
logger.error("",e);
}

What am I doing wrong? I can't stop messages from flowing until I stop
Broker2 and delete schedule
store from the disk! Please help!

Thanks in advance.

-- 
Bratislav Stojanovic, M.Sc.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message