camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [3/4] camel git commit: CAMEL-7809 : Quartz PollConsumerScheduler in a cluster tries to create duplicate triggers, fails
Date Mon, 27 Mar 2017 13:23:09 GMT
CAMEL-7809 : Quartz PollConsumerScheduler in a cluster tries to create duplicate triggers,
fails



Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40966458
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40966458
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40966458

Branch: refs/heads/master
Commit: 409664582f532d8b9799e9525ae0e7a34918485f
Parents: 51d27c5
Author: [a556724] etienne dethoor <etienne.dethoor@atos.net>
Authored: Mon Mar 27 13:55:06 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Mar 27 15:12:07 2017 +0200

----------------------------------------------------------------------
 .../QuartzScheduledPollConsumerScheduler.java   | 76 +++++++++++++-------
 1 file changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/40966458/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 51c17dc..436683a 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -35,13 +35,15 @@ import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
+import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A quartz based {@link ScheduledPollConsumerScheduler} which uses a {@link CronTrigger}
to define when the
- * poll should be triggered.
+ * A quartz based {@link ScheduledPollConsumerScheduler} which uses a
+ * {@link CronTrigger} to define when the poll should be triggered.
  */
 public class QuartzScheduledPollConsumerScheduler extends ServiceSupport implements ScheduledPollConsumerScheduler,
NonManagedService {
 
@@ -161,36 +163,50 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport
impleme
             setQuartzScheduler(quartz.getScheduler());
         }
 
-        JobDataMap map = new JobDataMap();
-        // do not store task as its not serializable, if we have route id
-        if (routeId != null) {
-            map.put("routeId", routeId);
-        } else {
-            map.put("task", runnable);
-        }
-        map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
-        map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
-        map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
-
-        job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class)
-                .usingJobData(map)
-                .build();
-
-        // store additional information on job such as camel context etc
-        QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
-
         String id = triggerId;
         if (id == null) {
             id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
         }
+        TriggerKey triggerKey = new TriggerKey(triggerId, triggerGroup);
+        Trigger existingTrigger = quartzScheduler.getTrigger(triggerKey);
+
+        // Is an trigger already exist for this triggerId ?
+        if (existingTrigger == null) {
+            JobDataMap map = new JobDataMap();
+            // do not store task as its not serializable, if we have route id
+            if (routeId != null) {
+                map.put("routeId", routeId);
+            } else {
+                map.put("task", runnable);
+            }
+            map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
+            map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+            map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+            job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class).usingJobData(map).build();
 
-        trigger = TriggerBuilder.newTrigger()
-                .withIdentity(id, triggerGroup)
-                .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
-                .build();
+            // store additional information on job such as camel context etc
+            QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
+            trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
+
+            LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
+            quartzScheduler.scheduleJob(job, trigger);
+        } else {
+            checkTriggerIsNonConflicting(existingTrigger);
+
+            LOG.debug("Trigger with key {} is already present in scheduler. Only updating
it.", triggerKey);
+            job = quartzScheduler.getJobDetail(existingTrigger.getJobKey());
+            JobDataMap jobData = job.getJobDataMap();
+            jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
+            jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
+
+            QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+            LOG.debug("Updated jobData map to {}", jobData);
+
+            quartzScheduler.rescheduleJob(triggerKey, existingTrigger);
+        }
 
-        LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey());
-        quartzScheduler.scheduleJob(job, trigger);
     }
 
     @Override
@@ -205,4 +221,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport
impleme
     protected void doShutdown() throws Exception {
     }
 
+    private void checkTriggerIsNonConflicting(Trigger trigger) {
+        JobDataMap jobDataMap = trigger.getJobDataMap();
+        String routeIdFromTrigger = jobDataMap.getString("routeId");
+        if (routeIdFromTrigger != null && !routeIdFromTrigger.equals(routeId)) {
+            throw new IllegalArgumentException("Trigger key " + trigger.getKey() + " is already
used by route" + routeIdFromTrigger + ". Can't re-use it for route " + routeId);
+        }
+    }
+
 }


Mime
View raw message