Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A523D105D9 for ; Fri, 23 Aug 2013 16:01:49 +0000 (UTC) Received: (qmail 82304 invoked by uid 500); 23 Aug 2013 16:01:48 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 82210 invoked by uid 500); 23 Aug 2013 16:01:43 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 82203 invoked by uid 99); 23 Aug 2013 16:01:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Aug 2013 16:01:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Aug 2013 16:01:35 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 55A5E23888E7; Fri, 23 Aug 2013 16:01:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1516912 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/ Date: Fri, 23 Aug 2013 16:01:13 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130823160113.55A5E23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri Aug 23 16:01:12 2013 New Revision: 1516912 URL: http://svn.apache.org/r1516912 Log: fix for: https://issues.apache.org/jira/browse/AMQ-4683 Make scheduler job dispatching start more deterministic Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=1516912&r1=1516911&r2=1516912&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Fri Aug 23 16:01:12 2013 @@ -17,117 +17,161 @@ package org.apache.activemq.broker.scheduler; import java.util.List; + import org.apache.activemq.util.ByteSequence; public interface JobScheduler { /** * @return the name of the scheduler - * @throws Exception + * @throws Exception + */ + String getName() throws Exception; + + /** + * Starts dispatch of scheduled Jobs to registered listeners. + * + * Any listener added after the start dispatch method can miss jobs so its + * important to register critical listeners before the start of job dispatching. + * + * @throws Exception + */ + void startDispatching() throws Exception; + + /** + * Stops dispatching of scheduled Jobs to registered listeners. + * + * @throws Exception + */ + void stopDispatching() throws Exception; + + /** + * Add a Job listener + * + * @param l + * @throws Exception + */ + void addListener(JobListener l) throws Exception; + + /** + * remove a JobListener + * + * @param l + * @throws Exception */ - public abstract String getName() throws Exception; -/** - * Add a Job listener - * @param l - * @throws Exception - */ - public abstract void addListener(JobListener l) throws Exception; -/** - * remove a JobListener - * @param l - * @throws Exception - */ - public abstract void removeListener(JobListener l) throws Exception; + void removeListener(JobListener l) throws Exception; /** * Add a job to be scheduled - * @param jobId a unique identifier for the job - * @param payload the message to be sent when the job is scheduled - * @param delay the time in milliseconds before the job will be run + * + * @param jobId + * a unique identifier for the job + * @param payload + * the message to be sent when the job is scheduled + * @param delay + * the time in milliseconds before the job will be run * @throws Exception */ - public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception; + void schedule(String jobId, ByteSequence payload, long delay) throws Exception; /** * Add a job to be scheduled - * @param jobId a unique identifier for the job - * @param payload the message to be sent when the job is scheduled - * @param cronEntry - cron entry + * + * @param jobId + * a unique identifier for the job + * @param payload + * the message to be sent when the job is scheduled + * @param cronEntry + * - cron entry * @throws Exception */ - public abstract void schedule(String jobId, ByteSequence payload,String cronEntry) throws Exception; + void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception; - /** * Add a job to be scheduled - * @param jobId a unique identifier for the job - * @param payload the message to be sent when the job is scheduled - * @param cronEntry - cron entry - * @param delay time in ms to wait before scheduling - * @param period the time in milliseconds between successive executions of the Job - * @param repeat the number of times to execute the job - less than 0 will be repeated forever + * + * @param jobId + * a unique identifier for the job + * @param payload + * the message to be sent when the job is scheduled + * @param cronEntry + * - cron entry + * @param delay + * time in ms to wait before scheduling + * @param period + * the time in milliseconds between successive executions of the Job + * @param repeat + * the number of times to execute the job - less than 0 will be repeated forever * @throws Exception */ - public abstract void schedule(String jobId, ByteSequence payload,String cronEntry,long delay, long period, int repeat) throws Exception; + void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception; /** * remove all jobs scheduled to run at this time + * * @param time - * @throws Exception + * @throws Exception */ - public abstract void remove(long time) throws Exception; + void remove(long time) throws Exception; /** * remove a job with the matching jobId + * * @param jobId - * @throws Exception + * @throws Exception */ - public abstract void remove(String jobId) throws Exception; - + void remove(String jobId) throws Exception; + /** * remove all the Jobs from the scheduler + * * @throws Exception */ - public abstract void removeAllJobs() throws Exception; - + void removeAllJobs() throws Exception; + /** * remove all the Jobs from the scheduler that are due between the start and finish times - * @param start time in milliseconds - * @param finish time in milliseconds + * + * @param start + * time in milliseconds + * @param finish + * time in milliseconds * @throws Exception */ - public abstract void removeAllJobs(long start,long finish) throws Exception; - + void removeAllJobs(long start, long finish) throws Exception; - /** * Get the next time jobs will be fired + * * @return the time in milliseconds - * @throws Exception + * @throws Exception */ - public abstract long getNextScheduleTime() throws Exception; - + long getNextScheduleTime() throws Exception; + /** * Get all the jobs scheduled to run next + * * @return a list of jobs that will be scheduled next * @throws Exception */ - public abstract List getNextScheduleJobs() throws Exception; - + List getNextScheduleJobs() throws Exception; + /** * Get all the outstanding Jobs - * @return a list of all jobs - * @throws Exception + * + * @return a list of all jobs + * @throws Exception */ - public abstract List getAllJobs() throws Exception; - + List getAllJobs() throws Exception; + /** * Get all outstanding jobs due to run between start and finish + * * @param start * @param finish * @return a list of jobs * @throws Exception */ - public abstract List getAllJobs(long start,long finish)throws Exception; + List getAllJobs(long start, long finish) throws Exception; } \ No newline at end of file Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=1516912&r1=1516911&r2=1516912&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Fri Aug 23 16:01:12 2013 @@ -18,118 +18,147 @@ package org.apache.activemq.broker.sched import java.util.Collections; import java.util.List; + import org.apache.activemq.util.ByteSequence; public class JobSchedulerFacade implements JobScheduler { private final SchedulerBroker broker; - - JobSchedulerFacade(SchedulerBroker broker){ - this.broker=broker; + + JobSchedulerFacade(SchedulerBroker broker) { + this.broker = broker; } + + @Override public void addListener(JobListener l) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.addListener(l); } } + @Override public List getAllJobs() throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { return js.getAllJobs(); } return Collections.emptyList(); } + @Override public List getAllJobs(long start, long finish) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { - return js.getAllJobs(start,finish); + if (js != null) { + return js.getAllJobs(start, finish); } return Collections.emptyList(); } + @Override public String getName() throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { return js.getName(); } return ""; } + @Override public List getNextScheduleJobs() throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { return js.getNextScheduleJobs(); } return Collections.emptyList(); } + @Override public long getNextScheduleTime() throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { return js.getNextScheduleTime(); } return 0; } + @Override public void remove(long time) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.remove(time); } } + @Override public void remove(String jobId) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.remove(jobId); } - } + @Override public void removeAllJobs() throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.removeAllJobs(); } } + @Override public void removeAllJobs(long start, long finish) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { - js.removeAllJobs(start,finish); + if (js != null) { + js.removeAllJobs(start, finish); } - } + @Override public void removeListener(JobListener l) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.removeListener(l); } - } + @Override public void schedule(String jobId, ByteSequence payload, long delay) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.schedule(jobId, payload, delay); } } - public void schedule(String jobId, ByteSequence payload,String cronEntry, long start, long period, int repeat) throws Exception { + @Override + public void schedule(String jobId, ByteSequence payload, String cronEntry, long start, long period, int repeat) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { - js.schedule(jobId, payload, cronEntry,start,period,repeat); + if (js != null) { + js.schedule(jobId, payload, cronEntry, start, period, repeat); } } + + @Override public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception { JobScheduler js = this.broker.getInternalScheduler(); - if (js !=null) { + if (js != null) { js.schedule(jobId, payload, cronEntry); } - + } + + @Override + public void startDispatching() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js != null) { + js.startDispatching(); + } + } + + @Override + public void stopDispatching() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js != null) { + js.stopDispatching(); + } } } Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=1516912&r1=1516911&r2=1516912&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Fri Aug 23 16:01:12 2013 @@ -16,14 +16,16 @@ */ package org.apache.activemq.broker.scheduler; -import org.apache.activemq.Service; - import java.io.File; +import org.apache.activemq.Service; + /** - * @author Hiram Chirino + * A Job Scheduler Store interface use to manage delay processing of Messaging + * related jobs. */ public interface JobSchedulerStore extends Service { + File getDirectory(); void setDirectory(File directory); Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1516912&r1=1516911&r2=1516912&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Aug 23 16:01:12 2013 @@ -292,6 +292,7 @@ public class SchedulerBroker extends Bro if (this.scheduler == null) { this.scheduler = store.getJobScheduler("JMS"); this.scheduler.addListener(this); + this.scheduler.startDispatching(); } return this.scheduler; } Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?rev=1516912&r1=1516911&r2=1516912&view=diff ============================================================================== --- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java (original) +++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Fri Aug 23 16:01:12 2013 @@ -51,7 +51,7 @@ class JobSchedulerImpl extends ServiceSu private String name; BTreeIndex> index; private Thread thread; - private final Object listenerLock = new Object(); + private final AtomicBoolean started = new AtomicBoolean(false); private final List jobListeners = new CopyOnWriteArrayList(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final ScheduleTime scheduleTime = new ScheduleTime(); @@ -82,9 +82,6 @@ class JobSchedulerImpl extends ServiceSu @Override public void addListener(JobListener l) { this.jobListeners.add(l); - synchronized (this.listenerLock) { - this.listenerLock.notify(); - } } /* @@ -480,19 +477,6 @@ class JobSchedulerImpl extends ServiceSu protected void mainLoop() { while (this.running.get()) { - - // Can't start pumping messages until a listener is added otherwise we'd discard messages - // without any warning. - synchronized (listenerLock) { - while (this.running.get() && this.jobListeners.isEmpty()) { - try { - LOG.debug("Scheduled Message dispatch paused while awaiting a Job Listener"); - this.listenerLock.wait(); - } catch (InterruptedException e) { - } - } - } - this.scheduleTime.clearNewJob(); try { // peek the next job @@ -584,24 +568,39 @@ class JobSchedulerImpl extends ServiceSu } @Override + public void startDispatching() throws Exception { + if (!this.running.get()) { + return; + } + + if (started.compareAndSet(false, true)) { + this.thread = new Thread(this, "JobScheduler:" + this.name); + this.thread.setDaemon(true); + this.thread.start(); + } + } + + @Override + public void stopDispatching() throws Exception { + if (started.compareAndSet(true, false)) { + this.scheduleTime.wakeup(); + Thread t = this.thread; + this.thread = null; + if (t != null) { + t.join(1000); + } + } + } + + @Override protected void doStart() throws Exception { this.running.set(true); - synchronized (this.listenerLock) { - this.listenerLock.notify(); - } - this.thread = new Thread(this, "JobScheduler:" + this.name); - this.thread.setDaemon(true); - this.thread.start(); } @Override protected void doStop(ServiceStopper stopper) throws Exception { this.running.set(false); - this.scheduleTime.wakeup(); - Thread t = this.thread; - if (t != null) { - t.join(1000); - } + stopDispatching(); } long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {