Forgot to mention, after each minute, there will be another new job generated on the Broker2 side, so the number
of jobs will grow indefinitely!

Small picture attached.

On Wed, Mar 27, 2013 at 7:28 PM, Bratislav Stojanovic <bratislav1983@gmail.com> wrote:
I have two brokers in my local network, both with schedulerSupport="true".

Broker1 (localhost) <-------------> Broker2 (other machine)

Broker1 doesn't
have network connectors hardcoded, they are added dynamically. On the other side, Broker2
has hardcoded network connector in his activemq.xml like this :

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>

Here's the problem :

On the Broker1 side, I dynamically add connector to the Broker2, schedule a message using
CRON and send it. Message starts to send every minute and I'm getting the message back from the
Broker2. So far so good.

Now, I want to remove scheduled messages (stop sending it). To achieve this, I get the instance of the
JobScheduler, find a job in the list and remove it. After doing so, I'm seeing in the logs that a message got successfully removed. The problem is, broker1 will KEEP sending messages no matter if the JobScheduler reported 0 jobs! Does it have to do with the fact that when you schedule a message you will also
schedule messages in ALL network connected brokers as well???

Here's the code on the broker1 side which adds connector dynamically :
 
protected boolean addNetworkConnector(Client c) {
try {
// add network connectors
if (broker.getNetworkConnectorByName("nc_"+c.getIp()+"_"+c.getPort()) == null) {
NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:failover:(tcp://"+c.getIp()+":"+c.getPort()+")"));
nc.setName("nc_"+c.getIp()+"_"+c.getPort());
broker.addNetworkConnector(nc);
nc.start();
logger.debug("Network connector added and started successfully");
// not sure if this is necesarry
int tried = 0;
while (!nc.isStarted() && tried < 5) {
logger.debug("Nc not started. Trying again in 0.5 seconds...");
Thread.sleep(500);
tried++;
}
if (!nc.isStarted()) {
logger.error("Unable to start nc");
return false;
}
}
return true;
} catch (Exception e) {
logger.error("Unable to add network connector",e);
}
return false;
}

Here's the code that shcedules and sends a message :

try {
TextMessage message = session.createTextMessage();
message.setJMSType("GetStatus");
message.setIntProperty("cid", c.getId());
if (num != -1 && !period.isEmpty()) {
logger.debug("Scheduling GetStatus message every "+num+" "+period);
if (period.equals("minutes")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "*/"+num+" * * * *");
}
else if (period.equals("hours")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 */"+num+" * * *");
}
else if (period.equals("days")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 */"+num+" * *"); // every day at midnight
}
else if (period.equals("weeks")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 * * 0/"+num); // every sunday at midnight
}
else {
logger.error("Unsupported period "+period);
return;
}
}
if (!addNetworkConnector(c)) return;
send(message);
} catch (Exception e) {
logger.debug("",e);
}

Finally, here's the code that removes scheduled job from the job scheduler :

try {
JobScheduler js = OMSyncInit.getServer().getJobScheduler();
if (js != null) {
List<Job> jobs = js.getAllJobs();
logger.debug("Total number of jobs is "+jobs.size());
for (Job j : jobs) {
//logger.debug("Cron entry is "+j.getCronEntry());
//logger.debug("Job id is "+j.getJobId());
// payload is openwire encoded!
// do not mix with message payload! Payload is the whole message here, headers + real payload!
//logger.debug("Job payload is "+new String(j.getPayload()));
OpenWireFormatFactory fact = new OpenWireFormatFactory();
WireFormat owf = fact.createWireFormat();
Message msg = (Message) owf.unmarshal(new ByteSequence(j.getPayload()));
int cid = msg.getIntProperty("cid");
String jmsType = msg.getJMSType();
//logger.debug("Cid is "+cid);
//logger.debug("Type is "+jmsType);
if (cid == client.getId()) {
if (type.equals("all") || type.equals(jmsType)) {
js.remove(j.getJobId());
logger.info("Job "+j.getJobId()+" for client "+client.getId()+ " removed successfully");
}
}
}
logger.debug("New total number of jobs is "+js.getAllJobs().size());
}
} catch (Exception e) {
logger.error("",e);
}

What am I doing wrong? I can't stop messages from flowing until I stop Broker2 and delete schedule
store from the disk! Please help!

Thanks in advance.

--
Bratislav Stojanovic, M.Sc.



--
Bratislav Stojanovic, M.Sc.