Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 077E0110DF for ; Mon, 7 Jul 2014 16:43:55 +0000 (UTC) Received: (qmail 44688 invoked by uid 500); 7 Jul 2014 16:43:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 44559 invoked by uid 500); 7 Jul 2014 16:43:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44347 invoked by uid 99); 7 Jul 2014 16:43:54 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2014 16:43:54 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 864059A7C4B; Mon, 7 Jul 2014 16:43:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Mon, 07 Jul 2014 16:43:58 -0000 Message-Id: <163c7a0f35b64a31b843fb82d0a3e23b@git.apache.org> In-Reply-To: <1e9e3e211f2246e88fb4a5820b753684@git.apache.org> References: <1e9e3e211f2246e88fb4a5820b753684@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] git commit: https://issues.apache.org/jira/browse/AMQ-3758 https://issues.apache.org/jira/browse/AMQ-3758 Refactor the scheduler store into a more KahaDB style store that can recover from various problems like missing journal files or corruption as well as rebuild its index when needed. Move the scheduler store into a more configurable style that allows for users to plug in their own implementations. Store update from legacy versions is automatic. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74846bb2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74846bb2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74846bb2 Branch: refs/heads/trunk Commit: 74846bb2b40e6183f2accdcab388c2d309c1eaad Parents: aa79c7e Author: Timothy Bish Authored: Mon Jul 7 12:28:11 2014 -0400 Committer: Timothy Bish Committed: Mon Jul 7 12:28:11 2014 -0400 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 25 +- .../activemq/broker/jmx/JobSchedulerView.java | 56 +- .../broker/jmx/JobSchedulerViewMBean.java | 113 +- .../apache/activemq/broker/scheduler/Job.java | 23 +- .../activemq/broker/scheduler/JobListener.java | 16 +- .../activemq/broker/scheduler/JobScheduler.java | 33 +- .../broker/scheduler/JobSchedulerFacade.java | 6 + .../broker/scheduler/JobSchedulerStore.java | 43 + .../activemq/broker/scheduler/JobSupport.java | 5 +- .../activemq/store/PersistenceAdapter.java | 119 +- .../store/memory/MemoryPersistenceAdapter.java | 36 +- .../java/org/apache/activemq/util/IOHelper.java | 68 +- .../store/jdbc/JDBCPersistenceAdapter.java | 7 + .../journal/JournalPersistenceAdapter.java | 71 +- .../store/kahadb/AbstractKahaDBMetaData.java | 57 + .../store/kahadb/AbstractKahaDBStore.java | 745 ++++++++++++ .../activemq/store/kahadb/KahaDBMetaData.java | 135 +++ .../store/kahadb/KahaDBPersistenceAdapter.java | 15 +- .../activemq/store/kahadb/KahaDBStore.java | 55 +- .../kahadb/MultiKahaDBPersistenceAdapter.java | 56 +- .../kahadb/MultiKahaDBTransactionStore.java | 18 +- .../activemq/store/kahadb/TempKahaDBStore.java | 138 ++- .../apache/activemq/store/kahadb/Visitor.java | 20 + .../store/kahadb/scheduler/JobImpl.java | 21 +- .../store/kahadb/scheduler/JobLocation.java | 77 +- .../scheduler/JobLocationsMarshaller.java | 53 + .../kahadb/scheduler/JobSchedulerImpl.java | 837 ++++++++------ .../scheduler/JobSchedulerKahaDBMetaData.java | 246 ++++ .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++----- .../scheduler/UnknownStoreVersionException.java | 24 + .../kahadb/scheduler/legacy/LegacyJobImpl.java | 72 ++ .../scheduler/legacy/LegacyJobLocation.java | 296 +++++ .../legacy/LegacyJobSchedulerImpl.java | 222 ++++ .../legacy/LegacyJobSchedulerStoreImpl.java | 378 ++++++ .../scheduler/legacy/LegacyStoreReplayer.java | 155 +++ .../src/main/proto/journal-data.proto | 61 + .../apache/activemq/leveldb/LevelDBStore.scala | 5 + .../leveldb/replicated/ProxyLevelDBStore.scala | 5 + .../JobSchedulerBrokerShutdownTest.java | 1 + .../JobSchedulerJmxManagementTests.java | 155 +++ .../scheduler/JobSchedulerManagementTest.java | 84 +- .../JobSchedulerStoreCheckpointTest.java | 125 ++ .../broker/scheduler/JobSchedulerStoreTest.java | 46 +- .../broker/scheduler/JobSchedulerTest.java | 36 + .../scheduler/JobSchedulerTestSupport.java | 112 ++ .../KahaDBSchedulerIndexRebuildTest.java | 179 +++ .../KahaDBSchedulerMissingJournalLogsTest.java | 204 ++++ .../scheduler/SchedulerDBVersionTest.java | 164 +++ .../src/test/resources/log4j.properties | 1 + .../activemq/store/schedulerDB/legacy/db-1.log | Bin 0 -> 524288 bytes .../store/schedulerDB/legacy/scheduleDB.data | Bin 0 -> 20480 bytes .../store/schedulerDB/legacy/scheduleDB.redo | Bin 0 -> 16408 bytes 52 files changed, 5584 insertions(+), 911 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 044af08..1ff4a21 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1868,6 +1868,23 @@ public class BrokerService implements Service { try { PersistenceAdapter pa = getPersistenceAdapter(); + if (pa != null) { + this.jobSchedulerStore = pa.createJobSchedulerStore(); + jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); + configureService(jobSchedulerStore); + jobSchedulerStore.start(); + return this.jobSchedulerStore; + } + } catch (IOException e) { + throw new RuntimeException(e); + } catch (UnsupportedOperationException ex) { + // It's ok if the store doesn't implement a scheduler. + } catch (Exception e) { + throw new RuntimeException(e); + } + + try { + PersistenceAdapter pa = getPersistenceAdapter(); if (pa != null && pa instanceof JobSchedulerStore) { this.jobSchedulerStore = (JobSchedulerStore) pa; configureService(jobSchedulerStore); @@ -1877,9 +1894,13 @@ public class BrokerService implements Service { throw new RuntimeException(e); } + // Load the KahaDB store as a last resort, this only works if KahaDB is + // included at runtime, otherwise this will fail. User should disable + // scheduler support if this fails. try { - String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl"; - jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance(); + String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; + PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); + jobSchedulerStore = adaptor.createJobSchedulerStore(); jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); configureService(jobSchedulerStore); jobSchedulerStore.start(); http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java index 9e5a1fb..2118a96 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java @@ -16,23 +16,39 @@ */ package org.apache.activemq.broker.jmx; +import java.util.List; + +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSupport; -import javax.management.openmbean.*; -import java.io.IOException; -import java.util.List; - +/** + * MBean object that can be used to manage a single instance of a JobScheduler. The object + * provides methods for querying for jobs and removing some or all of the jobs that are + * scheduled in the managed store. + */ public class JobSchedulerView implements JobSchedulerViewMBean { private final JobScheduler jobScheduler; + /** + * Creates a new instance of the JobScheduler management MBean. + * + * @param jobScheduler + * The scheduler instance to manage. + */ public JobSchedulerView(JobScheduler jobScheduler) { this.jobScheduler = jobScheduler; } + @Override public TabularData getAllJobs() throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } + @Override public TabularData getAllJobs(String startTime, String finishTime) throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } + @Override public TabularData getNextScheduleJobs() throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } + @Override public String getNextScheduleTime() throws Exception { long time = this.jobScheduler.getNextScheduleTime(); return JobSupport.getDateTime(time); } + @Override public void removeAllJobs() throws Exception { this.jobScheduler.removeAllJobs(); - } + @Override public void removeAllJobs(String startTime, String finishTime) throws Exception { long start = JobSupport.getDataTime(startTime); long finish = JobSupport.getDataTime(finishTime); this.jobScheduler.removeAllJobs(start, finish); + } + @Override + public void removeAllJobsAtScheduledTime(String time) throws Exception { + long removeAtTime = JobSupport.getDataTime(time); + this.jobScheduler.remove(removeAtTime); } + @Override + public void removeJobAtScheduledTime(String time) throws Exception { + removeAllJobsAtScheduledTime(time); + } + + @Override public void removeJob(String jobId) throws Exception { this.jobScheduler.remove(jobId); - } - public void removeJobAtScheduledTime(String time) throws IOException { - // TODO Auto-generated method stub + @Override + public int getExecutionCount(String jobId) throws Exception { + int result = 0; - } + List jobs = this.jobScheduler.getAllJobs(); + for (Job job : jobs) { + if (job.getJobId().equals(jobId)) { + result = job.getExecutionCount(); + } + } + return result; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java index f5745ea..76a7926 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java @@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx; import javax.management.openmbean.TabularData; - - public interface JobSchedulerViewMBean { + /** - * remove all jobs scheduled to run at this time + * Remove all jobs scheduled to run at this time. If there are no jobs scheduled + * at the given time this methods returns without making any modifications to the + * scheduler store. + * * @param time - * @throws Exception + * the string formated time that should be used to remove jobs. + * + * @throws Exception if an error occurs while performing the remove. + * + * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what + * the method is actually doing. */ + @Deprecated @MBeanInfo("remove jobs with matching execution time") public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; /** - * remove a job with the matching jobId + * Remove all jobs scheduled to run at this time. If there are no jobs scheduled + * at the given time this methods returns without making any modifications to the + * scheduler store. + * + * @param time + * the string formated time that should be used to remove jobs. + * + * @throws Exception if an error occurs while performing the remove. + */ + @MBeanInfo("remove jobs with matching execution time") + public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; + + /** + * Remove a job with the matching jobId. If the method does not find a matching job + * then it returns without throwing an error or making any modifications to the job + * scheduler store. + * * @param jobId - * @throws Exception + * the Job Id to remove from the scheduler store. + * + * @throws Exception if an error occurs while attempting to remove the Job. */ @MBeanInfo("remove jobs with matching jobId") public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception; - + /** - * remove all the Jobs from the scheduler - * @throws Exception + * Remove all the Jobs from the scheduler, + * + * @throws Exception if an error occurs while purging the store. */ @MBeanInfo("remove all scheduled jobs") public abstract void removeAllJobs() throws Exception; - + /** - * remove all the Jobs from the scheduler that are due between the start and finish times - * @param start time - * @param finish time - * @throws Exception + * Remove all the Jobs from the scheduler that are due between the start and finish times. + * + * @param start + * the starting time to remove jobs from. + * @param finish + * the finish time for the remove operation. + * + * @throws Exception if an error occurs while attempting to remove the jobs. */ @MBeanInfo("remove all scheduled jobs between time ranges ") public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception; - - /** - * Get the next time jobs will be fired - * @return the time in milliseconds - * @throws Exception + * Get the next time jobs will be fired from this scheduler store. + * + * @return the time in milliseconds of the next job to execute. + * + * @throws Exception if an error occurs while accessing the store. */ @MBeanInfo("get the next time a job is due to be scheduled ") public abstract String getNextScheduleTime() throws Exception; - + + /** + * Gets the number of times a scheduled Job has been executed. + * + * @return the total number of time a scheduled job has executed. + * + * @throws Exception if an error occurs while querying for the Job. + */ + @MBeanInfo("get the next time a job is due to be scheduled ") + public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception; + /** - * Get all the jobs scheduled to run next + * Get all the jobs scheduled to run next. + * * @return a list of jobs that will be scheduled next - * @throws Exception + * + * @throws Exception if an error occurs while reading the scheduler store. */ @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ") public abstract TabularData getNextScheduleJobs() throws Exception; - - /** - * Get all the outstanding Jobs - * @return a table of all jobs - * @throws Exception + /** + * Get all the outstanding Jobs that are scheduled in this scheduler store. + * + * @return a table of all jobs in this scheduler store. + * + * @throws Exception if an error occurs while reading the store. */ @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ") public abstract TabularData getAllJobs() throws Exception; - + /** - * Get all outstanding jobs due to run between start and finish + * Get all outstanding jobs due to run between start and finish time range. + * * @param start + * the starting time range to query the store for jobs. * @param finish - * @return a table of jobs in the range - * @throws Exception - + * the ending time of this query for scheduled jobs. + * + * @return a table of jobs in the range given. + * + * @throws Exception if an error occurs while querying the scheduler store. */ @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ") public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception; + } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java index 7b28a5b..047fe23 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java @@ -16,7 +16,12 @@ */ package org.apache.activemq.broker.scheduler; - +/** + * Interface for a scheduled Job object. + * + * Each Job is identified by a unique Job Id which can be used to reference the Job + * in the Job Scheduler store for updates or removal. + */ public interface Job { /** @@ -38,11 +43,12 @@ public interface Job { * @return the Delay */ public abstract long getDelay(); + /** * @return the period */ public abstract long getPeriod(); - + /** * @return the cron entry */ @@ -52,17 +58,24 @@ public interface Job { * @return the payload */ public abstract byte[] getPayload(); - + /** * Get the start time as a Date time string * @return the date time */ public String getStartTime(); - + /** - * Get the time the job is next due to execute + * Get the time the job is next due to execute * @return the date time */ public String getNextExecutionTime(); + /** + * Gets the total number of times this job has executed. + * + * @returns the number of times this job has been executed. + */ + public int getExecutionCount(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java index c53d9c6..a453595 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java @@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler; import org.apache.activemq.util.ByteSequence; +/** + * Job event listener interface. Provides event points for Job related events + * such as job ready events. + */ public interface JobListener { - + /** - * A Job that has been scheduled is now ready - * @param id + * A Job that has been scheduled is now ready to be fired. The Job is passed + * in its raw byte form and must be un-marshaled before being delivered. + * + * @param jobId + * The unique Job Id of the Job that is ready to fire. * @param job + * The job that is now ready, delivered in byte form. */ - public void scheduledJob(String id,ByteSequence job); + public void scheduledJob(String id, ByteSequence job); } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java index 2e96eae..e951861 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java @@ -46,20 +46,25 @@ public interface JobScheduler { void stopDispatching() throws Exception; /** - * Add a Job listener + * Add a Job listener which will receive events related to scheduled jobs. + * + * @param listener + * The job listener to add. * - * @param l * @throws Exception */ - void addListener(JobListener l) throws Exception; + void addListener(JobListener listener) throws Exception; /** - * remove a JobListener + * remove a JobListener that was previously registered. If the given listener is not in + * the registry this method has no effect. + * + * @param listener + * The listener that should be removed from the listener registry. * - * @param l * @throws Exception */ - void removeListener(JobListener l) throws Exception; + void removeListener(JobListener listener) throws Exception; /** * Add a job to be scheduled @@ -70,7 +75,8 @@ public interface JobScheduler { * the message to be sent when the job is scheduled * @param delay * the time in milliseconds before the job will be run - * @throws Exception + * + * @throws Exception if an error occurs while scheduling the Job. */ void schedule(String jobId, ByteSequence payload, long delay) throws Exception; @@ -82,8 +88,9 @@ public interface JobScheduler { * @param payload * the message to be sent when the job is scheduled * @param cronEntry - * - cron entry - * @throws Exception + * The cron entry to use to schedule this job. + * + * @throws Exception if an error occurs while scheduling the Job. */ void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception; @@ -95,7 +102,7 @@ public interface JobScheduler { * @param payload * the message to be sent when the job is scheduled * @param cronEntry - * - cron entry + * cron entry * @param delay * time in ms to wait before scheduling * @param period @@ -110,6 +117,8 @@ public interface JobScheduler { * remove all jobs scheduled to run at this time * * @param time + * The UTC time to use to remove a batch of scheduled Jobs. + * * @throws Exception */ void remove(long time) throws Exception; @@ -118,7 +127,9 @@ public interface JobScheduler { * remove a job with the matching jobId * * @param jobId - * @throws Exception + * The unique Job Id to search for and remove from the scheduled set of jobs. + * + * @throws Exception if an error occurs while removing the Job. */ void remove(String jobId) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java index d46d04a..24a216a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java @@ -21,6 +21,12 @@ import java.util.List; import org.apache.activemq.util.ByteSequence; +/** + * A wrapper for instances of the JobScheduler interface that ensures that methods + * provides safe and sane return values and can deal with null values being passed + * in etc. Provides a measure of safety when using unknown implementations of the + * JobSchedulerStore which might not always do the right thing. + */ public class JobSchedulerFacade implements JobScheduler { private final SchedulerBroker broker; http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java index 3cbc367..c6863c7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java @@ -26,13 +26,56 @@ import org.apache.activemq.Service; */ public interface JobSchedulerStore extends Service { + /** + * Gets the location where the Job Scheduler will write the persistent data used + * to preserve and recover scheduled Jobs. + * + * If the scheduler implementation does not utilize a file system based store this + * method returns null. + * + * @return the directory where persistent store data is written. + */ File getDirectory(); + /** + * Sets the directory where persistent store data will be written. This method + * must be called before the scheduler store is started to have any effect. + * + * @param directory + * The directory where the job scheduler store is to be located. + */ void setDirectory(File directory); + /** + * The size of the current store on disk if the store utilizes a disk based store + * mechanism. + * + * @return the current store size on disk. + */ long size(); + /** + * Returns the JobScheduler instance identified by the given name. + * + * @param name + * the name of the JobScheduler instance to lookup. + * + * @return the named JobScheduler or null if none exists with the given name. + * + * @throws Exception if an error occurs while loading the named scheduler. + */ JobScheduler getJobScheduler(String name) throws Exception; + /** + * Removes the named JobScheduler if it exists, purging all scheduled messages + * assigned to it. + * + * @param name + * the name of the scheduler instance to remove. + * + * @return true if there was a scheduler with the given name to remove. + * + * @throws Exception if an error occurs while removing the scheduler. + */ boolean removeJobScheduler(String name) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java index 6b78d77..fc5b8dd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java @@ -20,7 +20,11 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; +/** + * A class to provide common Job Scheduler related methods. + */ public class JobSupport { + public static String getDateTime(long value) { DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(value); @@ -32,5 +36,4 @@ public class JobSupport { Date date = dfm.parse(value); return date.getTime(); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index 31efd32..01a9634 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage; /** * Adapter to the actual persistence mechanism used with ActiveMQ * - * + * */ public interface PersistenceAdapter extends Service { /** - * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination} - * objects that the persistence store is aware exist. + * Returns a set of all the + * {@link org.apache.activemq.command.ActiveMQDestination} objects that the + * persistence store is aware exist. * * @return active destinations */ Set getDestinations(); /** - * Factory method to create a new queue message store with the given destination name + * Factory method to create a new queue message store with the given + * destination name + * * @param destination * @return the message store - * @throws IOException + * @throws IOException */ MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException; /** - * Factory method to create a new topic message store with the given destination name - * @param destination + * Factory method to create a new topic message store with the given + * destination name + * + * @param destination * @return the topic message store - * @throws IOException + * @throws IOException */ TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException; /** + * Creates and returns a new Job Scheduler store instance. + * + * @return a new JobSchedulerStore instance if this Persistence adapter provides its own. + * + * @throws IOException If an error occurs while creating the new JobSchedulerStore. + * @throws UnsupportedOperationException If this adapter does not provide its own + * scheduler store implementation. + */ + JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException; + + /** * Cleanup method to remove any state associated with the given destination. * This method does not stop the message store (it might not be cached). - * @param destination Destination to forget + * + * @param destination + * Destination to forget */ void removeQueueMessageStore(ActiveMQQueue destination); /** * Cleanup method to remove any state associated with the given destination * This method does not stop the message store (it might not be cached). - * @param destination Destination to forget + * + * @param destination + * Destination to forget */ void removeTopicMessageStore(ActiveMQTopic destination); /** - * Factory method to create a new persistent prepared transaction store for XA recovery + * Factory method to create a new persistent prepared transaction store for + * XA recovery + * * @return transaction store - * @throws IOException + * @throws IOException */ TransactionStore createTransactionStore() throws IOException; /** - * This method starts a transaction on the persistent storage - which is nothing to - * do with JMS or XA transactions - its purely a mechanism to perform multiple writes - * to a persistent store in 1 transaction as a performance optimization. + * This method starts a transaction on the persistent storage - which is + * nothing to do with JMS or XA transactions - its purely a mechanism to + * perform multiple writes to a persistent store in 1 transaction as a + * performance optimization. *

- * Typically one transaction will require one disk synchronization point and so for - * real high performance its usually faster to perform many writes within the same - * transaction to minimize latency caused by disk synchronization. This is especially - * true when using tools like Berkeley Db or embedded JDBC servers. - * @param context - * @throws IOException + * Typically one transaction will require one disk synchronization point and + * so for real high performance its usually faster to perform many writes + * within the same transaction to minimize latency caused by disk + * synchronization. This is especially true when using tools like Berkeley + * Db or embedded JDBC servers. + * + * @param context + * @throws IOException */ void beginTransaction(ConnectionContext context) throws IOException; - /** * Commit a persistence transaction - * @param context - * @throws IOException + * + * @param context + * @throws IOException * * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ @@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service { /** * Rollback a persistence transaction - * @param context - * @throws IOException + * + * @param context + * @throws IOException * * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ void rollbackTransaction(ConnectionContext context) throws IOException; - + /** - * + * * @return last broker sequence * @throws IOException */ long getLastMessageBrokerSequenceId() throws IOException; - + /** * Delete's all the messages in the persistent store. - * + * * @throws IOException */ void deleteAllMessages() throws IOException; - + /** - * @param usageManager The UsageManager that is controlling the broker's memory usage. + * @param usageManager + * The UsageManager that is controlling the broker's memory + * usage. */ void setUsageManager(SystemUsage usageManager); - + /** * Set the name of the broker using the adapter + * * @param brokerName */ void setBrokerName(String brokerName); - + /** * Set the directory where any data files should be created + * * @param dir */ void setDirectory(File dir); @@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service { * @return the directory used by the persistence adaptor */ File getDirectory(); - + /** * checkpoint any - * @param sync - * @throws IOException + * + * @param sync + * @throws IOException * */ void checkpoint(boolean sync) throws IOException; - + /** * A hint to return the size of the store on disk + * * @return disk space used in bytes of 0 if not implemented */ long size(); /** - * return the last stored producer sequenceId for this producer Id - * used to suppress duplicate sends on failover reconnect at the transport - * when a reconnect occurs - * @param id the producerId to find a sequenceId for + * return the last stored producer sequenceId for this producer Id used to + * suppress duplicate sends on failover reconnect at the transport when a + * reconnect occurs + * + * @param id + * the producerId to find a sequenceId for * @return the last stored sequence id or -1 if no suppression needed */ long getLastProducerSequenceId(ProducerId id) throws IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 0fd6bfc..73ea104 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean - * + * */ public class MemoryPersistenceAdapter implements PersistenceAdapter { private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); @@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { ConcurrentHashMap queues = new ConcurrentHashMap(); private boolean useExternalMessageReferences; + @Override public Set getDestinations() { Set rc = new HashSet(queues.size() + topics.size()); for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) { @@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return new MemoryPersistenceAdapter(); } + @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { MessageStore rc = queues.get(destination); if (rc == null) { @@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return rc; } + @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { TopicMessageStore rc = topics.get(destination); if (rc == null) { @@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * * @param destination Destination to forget */ + @Override public void removeQueueMessageStore(ActiveMQQueue destination) { queues.remove(destination); } @@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * * @param destination Destination to forget */ + @Override public void removeTopicMessageStore(ActiveMQTopic destination) { topics.remove(destination); } + @Override public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { transactionStore = new MemoryTransactionStore(this); @@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return transactionStore; } + @Override public void beginTransaction(ConnectionContext context) { } + @Override public void commitTransaction(ConnectionContext context) { } + @Override public void rollbackTransaction(ConnectionContext context) { } + @Override public void start() throws Exception { } + @Override public void stop() throws Exception { } + @Override public long getLastMessageBrokerSequenceId() throws IOException { return 0; } + @Override public void deleteAllMessages() throws IOException { for (Iterator iter = topics.values().iterator(); iter.hasNext();) { MemoryMessageStore store = asMemoryMessageStore(iter.next()); @@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * @param usageManager The UsageManager that is controlling the broker's * memory usage. */ + @Override public void setUsageManager(SystemUsage usageManager) { } + @Override public String toString() { return "MemoryPersistenceAdapter"; } + @Override public void setBrokerName(String brokerName) { } + @Override public void setDirectory(File dir) { } - + + @Override public File getDirectory(){ return null; } + @Override public void checkpoint(boolean sync) throws IOException { } - + + @Override public long size(){ return 0; } - + public void setCreateTransactionStore(boolean create) throws IOException { if (create) { createTransactionStore(); } } + @Override public long getLastProducerSequenceId(ProducerId id) { // memory map does duplicate suppression return -1; } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + // We could eventuall implement an in memory scheduler. + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java index a623de9..2a70194 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java @@ -61,8 +61,9 @@ public final class IOHelper { } /** - * Converts any string into a string that is safe to use as a file name. - * The result will only include ascii characters and numbers, and the "-","_", and "." characters. + * Converts any string into a string that is safe to use as a file name. The + * result will only include ascii characters and numbers, and the "-","_", + * and "." characters. * * @param name * @return @@ -76,15 +77,16 @@ public final class IOHelper { } /** - * Converts any string into a string that is safe to use as a file name. - * The result will only include ascii characters and numbers, and the "-","_", and "." characters. + * Converts any string into a string that is safe to use as a file name. The + * result will only include ascii characters and numbers, and the "-","_", + * and "." characters. * * @param name * @param dirSeparators * @param maxFileLength * @return */ - public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) { + public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) { int size = name.length(); StringBuffer rc = new StringBuffer(size * 2); for (int i = 0; i < size; i++) { @@ -92,8 +94,7 @@ public final class IOHelper { boolean valid = c >= 'a' && c <= 'z'; valid = valid || (c >= 'A' && c <= 'Z'); valid = valid || (c >= '0' && c <= '9'); - valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#') - ||(dirSeparators && ( (c == '/') || (c == '\\'))); + valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\'))); if (valid) { rc.append(c); @@ -105,7 +106,7 @@ public final class IOHelper { } String result = rc.toString(); if (result.length() > maxFileLength) { - result = result.substring(result.length()-maxFileLength,result.length()); + result = result.substring(result.length() - maxFileLength, result.length()); } return result; } @@ -168,8 +169,7 @@ public final class IOHelper { } else { for (int i = 0; i < files.length; i++) { File file = files[i]; - if (file.getName().equals(".") - || file.getName().equals("..")) { + if (file.getName().equals(".") || file.getName().equals("..")) { continue; } if (file.isDirectory()) { @@ -190,6 +190,27 @@ public final class IOHelper { } } + public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException { + if (!srcDirectory.isDirectory()) { + throw new IOException("source is not a directory"); + } + + if (targetDirectory.exists() && !targetDirectory.isDirectory()) { + throw new IOException("target exists and is not a directory"); + } else { + mkdirs(targetDirectory); + } + + List filesToMove = new ArrayList(); + getFiles(srcDirectory, filesToMove, filter); + + for (File file : filesToMove) { + if (!file.isDirectory()) { + moveFile(file, targetDirectory); + } + } + } + public static void copyFile(File src, File dest) throws IOException { copyFile(src, dest, null); } @@ -222,32 +243,32 @@ public final class IOHelper { File parent = src.getParentFile(); String fromPath = from.getAbsolutePath(); if (parent.getAbsolutePath().equals(fromPath)) { - //one level down + // one level down result = to; - }else { + } else { String parentPath = parent.getAbsolutePath(); String path = parentPath.substring(fromPath.length()); - result = new File(to.getAbsolutePath()+File.separator+path); + result = new File(to.getAbsolutePath() + File.separator + path); } return result; } - static List getFiles(File dir,FilenameFilter filter){ + static List getFiles(File dir, FilenameFilter filter) { List result = new ArrayList(); - getFiles(dir,result,filter); + getFiles(dir, result, filter); return result; } - static void getFiles(File dir,List list,FilenameFilter filter) { + static void getFiles(File dir, List list, FilenameFilter filter) { if (!list.contains(dir)) { list.add(dir); - String[] fileNames=dir.list(filter); - for (int i =0; i < fileNames.length;i++) { - File f = new File(dir,fileNames[i]); + String[] fileNames = dir.list(filter); + for (int i = 0; i < fileNames.length; i++) { + File f = new File(dir, fileNames[i]); if (f.isFile()) { list.add(f); - }else { - getFiles(dir,list,filter); + } else { + getFiles(dir, list, filter); } } } @@ -286,12 +307,13 @@ public final class IOHelper { public static void mkdirs(File dir) throws IOException { if (dir.exists()) { if (!dir.isDirectory()) { - throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name"); + throw new IOException("Failed to create directory '" + dir + + "', regular file already existed with that name"); } } else { if (!dir.mkdirs()) { - throw new IOException("Failed to create directory '" + dir+"'"); + throw new IOException("Failed to create directory '" + dir + "'"); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 7ff4ae0..a3a8250 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.Locker; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements this.lockDataSource = dataSource; } + @Override public BrokerService getBrokerService() { return brokerService; } @@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } return result; } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 565fc9f..cc5282f 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.JournalEventListener; @@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory; * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some * other long term persistent storage. - * + * * @org.apache.xbean.XBean - * + * */ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { private BrokerService brokerService; - + protected Scheduler scheduler; private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); @@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private TaskRunnerFactory taskRunnerFactory; private File directory; - public JournalPersistenceAdapter() { + public JournalPersistenceAdapter() { } - + public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { setJournal(journal); setTaskRunnerFactory(taskRunnerFactory); @@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve this.journal = journal; journal.setJournalEventListener(this); } - + public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { this.longTermPersistence = longTermPersistence; } - + final Runnable createPeriodicCheckpointTask() { return new Runnable() { + @Override public void run() { long lastTime = 0; synchronized (this) { @@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * @param usageManager The UsageManager that is controlling the * destination's memory usage. */ + @Override public void setUsageManager(SystemUsage usageManager) { this.usageManager = usageManager; longTermPersistence.setUsageManager(usageManager); } + @Override public Set getDestinations() { Set destinations = new HashSet(longTermPersistence.getDestinations()); destinations.addAll(queues.keySet()); @@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } + @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { JournalMessageStore store = queues.get(destination); if (store == null) { @@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return store; } + @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { JournalTopicMessageStore store = topics.get(destinationName); if (store == null) { @@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * * @param destination Destination to forget */ + @Override public void removeQueueMessageStore(ActiveMQQueue destination) { queues.remove(destination); } @@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * * @param destination Destination to forget */ + @Override public void removeTopicMessageStore(ActiveMQTopic destination) { topics.remove(destination); } + @Override public TransactionStore createTransactionStore() throws IOException { return transactionStore; } + @Override public long getLastMessageBrokerSequenceId() throws IOException { return longTermPersistence.getLastMessageBrokerSequenceId(); } + @Override public void beginTransaction(ConnectionContext context) throws IOException { longTermPersistence.beginTransaction(context); } + @Override public void commitTransaction(ConnectionContext context) throws IOException { longTermPersistence.commitTransaction(context); } + @Override public void rollbackTransaction(ConnectionContext context) throws IOException { longTermPersistence.rollbackTransaction(context); } + @Override public synchronized void start() throws Exception { if (!started.compareAndSet(false, true)) { return; @@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { + @Override public boolean iterate() { return doCheckpoint(); } }, "ActiveMQ Journal Checkpoint Worker"); checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + @Override public Thread newThread(Runnable runable) { Thread t = new Thread(runable, "Journal checkpoint worker"); t.setPriority(7); @@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } + @Override public void stop() throws Exception { this.usageManager.getMemoryUsage().removeUsageListener(this); @@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve /** * The Journal give us a call back so that we can move old data out of the * journal. Taking a checkpoint does this for us. - * + * * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) */ + @Override public void overflowNotification(RecordLocation safeLocation) { checkpoint(false, true); } /** * When we checkpoint we move all the journalled data to long term storage. - * + * */ public void checkpoint(boolean sync, boolean fullCheckpoint) { try { @@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } + @Override public void checkpoint(boolean sync) { checkpoint(sync, sync); } /** * This does the actual checkpoint. - * + * * @return */ public boolean doCheckpoint() { @@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve // We do many partial checkpoints (fullCheckpoint==false) to move // topic messages // to long term store as soon as possible. - // + // // We want to avoid doing that for queue messages since removes the // come in the same // checkpoint cycle will nullify the previous message add. @@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve try { final JournalMessageStore ms = iterator.next(); FutureTask task = new FutureTask(new Callable() { + @Override public RecordLocation call() throws Exception { return ms.checkpoint(); } @@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve try { final JournalTopicMessageStore ms = iterator.next(); FutureTask task = new FutureTask(new Callable() { + @Override public RecordLocation call() throws Exception { return ms.checkpoint(); } @@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve /** * Move all the messages that were in the journal into long term storage. We * just replay and do a checkpoint. - * + * * @throws IOException * @throws IOException * @throws InvalidRecordLocationException @@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { if (started.get()) { try { - return journal.write(toPacket(wireFormat.marshal(command)), sync); + return journal.write(toPacket(wireFormat.marshal(command)), sync); } catch (IOException ioe) { - LOG.error("Cannot write to the journal", ioe); - brokerService.handleIOException(ioe); - throw ioe; + LOG.error("Cannot write to the journal", ioe); + brokerService.handleIOException(ioe); + throw ioe; } } throw new IOException("closed"); @@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return writeCommand(trace, sync); } + @Override public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { newPercentUsage = (newPercentUsage / 10) * 10; oldPercentUsage = (oldPercentUsage / 10) * 10; @@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return transactionStore; } + @Override public void deleteAllMessages() throws IOException { try { JournalTrace trace = new JournalTrace(); @@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); } + @Override public void setBrokerName(String brokerName) { longTermPersistence.setBrokerName(brokerName); } @@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return "JournalPersistenceAdapter(" + longTermPersistence + ")"; } + @Override public void setDirectory(File dir) { this.directory=dir; } - + + @Override public File getDirectory(){ return directory; } - + + @Override public long size(){ return 0; } + @Override public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; PersistenceAdapter pa = getLongTermPersistence(); @@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } + @Override public long getLastProducerSequenceId(ProducerId id) { return -1; } + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return longTermPersistence.createJobSchedulerStore(); + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java new file mode 100644 index 0000000..edb2750 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Page; + +public abstract class AbstractKahaDBMetaData implements KahaDBMetaData { + + private int state; + private Location lastUpdateLocation; + private Page page; + + @Override + public Page getPage() { + return page; + } + + @Override + public int getState() { + return state; + } + + @Override + public Location getLastUpdateLocation() { + return lastUpdateLocation; + } + + @Override + public void setPage(Page page) { + this.page = page; + } + + @Override + public void setState(int value) { + this.state = value; + } + + @Override + public void setLastUpdateLocation(Location location) { + this.lastUpdateLocation = location; + } +}