activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bratislav Stojanovic <bratislav1...@gmail.com>
Subject Re: Unable to remove scheduled messages
Date Wed, 27 Mar 2013 18:40:02 GMT
Forgot to mention, after each minute, there will be another new job
generated on the Broker2 side, so the number
of jobs will grow indefinitely!

Small picture attached.

On Wed, Mar 27, 2013 at 7:28 PM, Bratislav Stojanovic <
bratislav1983@gmail.com> wrote:

> I have two brokers in my local network, both with schedulerSupport="true".
>
> Broker1 (localhost) <-------------> Broker2 (other machine)
>
> Broker1 doesn't
> have network connectors hardcoded, they are added dynamically. On the
> other side, Broker2
> has hardcoded network connector in his activemq.xml like this :
>
> <transportConnectors>
>     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
> </transportConnectors>
>
> Here's the problem :
>
> On the Broker1 side, I dynamically add connector to the Broker2, schedule
> a message using
> CRON and send it. Message starts to send every minute and I'm getting the
> message back from the
> Broker2. So far so good.
>
> Now, I want to remove scheduled messages (stop sending it). To achieve
> this, I get the instance of the
> JobScheduler, find a job in the list and remove it. After doing so, I'm
> seeing in the logs that a message got successfully removed. The problem is,
> broker1 will KEEP sending messages no matter if the JobScheduler reported 0
> jobs! Does it have to do with the fact that when you schedule a message you
> will also
> schedule messages in ALL network connected brokers as well???
>
> Here's the code on the broker1 side which adds connector dynamically :
>
> protected boolean addNetworkConnector(Client c) {
>  try {
> // add network connectors
> if (broker.getNetworkConnectorByName("nc_"+c.getIp()+"_"+c.getPort()) ==
> null) {
>  NetworkConnector nc = new DiscoveryNetworkConnector(new
> URI("static:failover:(tcp://"+c.getIp()+":"+c.getPort()+")"));
>  nc.setName("nc_"+c.getIp()+"_"+c.getPort());
> broker.addNetworkConnector(nc);
>  nc.start();
> logger.debug("Network connector added and started successfully");
>  // not sure if this is necesarry
> int tried = 0;
> while (!nc.isStarted() && tried < 5) {
>  logger.debug("Nc not started. Trying again in 0.5 seconds...");
> Thread.sleep(500);
>  tried++;
> }
> if (!nc.isStarted()) {
>  logger.error("Unable to start nc");
> return false;
> }
>  }
> return true;
> } catch (Exception e) {
>  logger.error("Unable to add network connector",e);
> }
> return false;
>  }
>
> Here's the code that shcedules and sends a message :
>
> try {
>  TextMessage message = session.createTextMessage();
> message.setJMSType("GetStatus");
>  message.setIntProperty("cid", c.getId());
>  if (num != -1 && !period.isEmpty()) {
> logger.debug("Scheduling GetStatus message every "+num+" "+period);
>  if (period.equals("minutes")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "*/"+num+"
> * * * *");
>  }
> else if (period.equals("hours")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0
> */"+num+" * * *");
>  }
> else if (period.equals("days")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0
> */"+num+" * *"); // every day at midnight
>  }
> else if (period.equals("weeks")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 * *
> 0/"+num); // every sunday at midnight
>  }
> else {
> logger.error("Unsupported period "+period);
>  return;
> }
> }
>  if (!addNetworkConnector(c)) return;
>  send(message);
>  } catch (Exception e) {
>  logger.debug("",e);
> }
>
> Finally, here's the code that removes scheduled job from the job scheduler
> :
>
> try {
> JobScheduler js = OMSyncInit.getServer().getJobScheduler();
> if (js != null) {
>  List<Job> jobs = js.getAllJobs();
> logger.debug("Total number of jobs is "+jobs.size());
>  for (Job j : jobs) {
> //logger.debug("Cron entry is "+j.getCronEntry());
>  //logger.debug("Job id is "+j.getJobId());
> // payload is openwire encoded!
>  // do not mix with message payload! Payload is the whole message here,
> headers + real payload!
> //logger.debug("Job payload is "+new String(j.getPayload()));
>  OpenWireFormatFactory fact = new OpenWireFormatFactory();
> WireFormat owf = fact.createWireFormat();
>  Message msg = (Message) owf.unmarshal(new ByteSequence(j.getPayload()));
> int cid = msg.getIntProperty("cid");
>  String jmsType = msg.getJMSType();
> //logger.debug("Cid is "+cid);
>  //logger.debug("Type is "+jmsType);
> if (cid == client.getId()) {
>  if (type.equals("all") || type.equals(jmsType)) {
> js.remove(j.getJobId());
>  logger.info("Job "+j.getJobId()+" for client "+client.getId()+ " removed
> successfully");
>  }
> }
> }
>  logger.debug("New total number of jobs is "+js.getAllJobs().size());
> }
>  } catch (Exception e) {
> logger.error("",e);
> }
>
> What am I doing wrong? I can't stop messages from flowing until I stop
> Broker2 and delete schedule
> store from the disk! Please help!
>
> Thanks in advance.
>
> --
> Bratislav Stojanovic, M.Sc.
>



-- 
Bratislav Stojanovic, M.Sc.

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message