Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 72623 invoked from network); 6 Feb 2010 08:58:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Feb 2010 08:58:01 -0000 Received: (qmail 15832 invoked by uid 500); 6 Feb 2010 08:58:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 15764 invoked by uid 500); 6 Feb 2010 08:57:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 15755 invoked by uid 99); 6 Feb 2010 08:57:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Feb 2010 08:57:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Feb 2010 08:57:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3AF1923888CF; Sat, 6 Feb 2010 08:57:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r907197 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/scheduler/ test/java/org/apache/activemq/bro... Date: Sat, 06 Feb 2010 08:57:24 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100206085725.3AF1923888CF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Sat Feb 6 08:57:24 2010 New Revision: 907197 URL: http://svn.apache.org/viewvc?rev=907197&view=rev Log: Added JMX support for JobScheduling Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java Sat Feb 6 08:57:24 2010 @@ -29,6 +29,10 @@ * The number of times to repeat scheduling a message for delivery */ public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT"; + /** + * Use a Cron tab entry to set the schedule + */ + public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON"; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Feb 6 08:57:24 2010 @@ -45,6 +45,8 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.FTConnectorView; import org.apache.activemq.broker.jmx.JmsConnectorView; +import org.apache.activemq.broker.jmx.JobSchedulerView; +import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; import org.apache.activemq.broker.jmx.ManagedRegionBroker; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.NetworkConnectorView; @@ -1764,7 +1766,23 @@ protected Broker addInterceptors(Broker broker) throws Exception { broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); if (isSchedulerSupport()) { - broker = new SchedulerBroker(broker,getSchedulerDirectoryFile()); + SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile()); + if (isUseJmx()) { + JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); + try { + ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + + "Type=jobScheduler," + "jobSchedulerName=JMS"); + + AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); + this.adminView.setJMSJobScheduler(objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " + + e.getMessage(), e); + } + + } + broker = sb; } if (isAdvisorySupport()) { broker = new AdvisoryBroker(broker); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Sat Feb 6 08:57:24 2010 @@ -44,6 +44,7 @@ ManagedRegionBroker broker; private final BrokerService brokerService; private final AtomicInteger sessionIdCounter = new AtomicInteger(0); + private ObjectName jmsJobScheduler; public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { this.brokerService = brokerService; @@ -365,4 +366,12 @@ return ""; } } + + public ObjectName getJMSJobScheduler() { + return this.jmsJobScheduler; + } + + public void setJMSJobScheduler(ObjectName name) { + this.jmsJobScheduler=name; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Sat Feb 6 08:57:24 2010 @@ -17,7 +17,6 @@ package org.apache.activemq.broker.jmx; import javax.management.ObjectName; - import org.apache.activemq.Service; @@ -244,4 +243,7 @@ @MBeanInfo("The location of the data directory") public String getDataDirectory(); + @MBeanInfo("JMSJobScheduler") + ObjectName getJMSJobScheduler(); + } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java?rev=907197&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java Sat Feb 6 08:57:24 2010 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.io.IOException; +import java.util.List; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; +import org.apache.activemq.broker.scheduler.Job; +import org.apache.activemq.broker.scheduler.JobImpl; +import org.apache.activemq.broker.scheduler.JobScheduler; + +public class JobSchedulerView implements JobSchedulerViewMBean { + + private final JobScheduler jobScheduler; + + public JobSchedulerView(JobScheduler jobScheduler) { + this.jobScheduler = jobScheduler; + } + + public TabularData getAllJobs() throws Exception { + OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); + CompositeType ct = factory.getCompositeType(); + TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); + TabularDataSupport rc = new TabularDataSupport(tt); + List jobs = this.jobScheduler.getAllJobs(); + for (Job job : jobs) { + rc.put(new CompositeDataSupport(ct, factory.getFields(job))); + } + return rc; + } + + public TabularData getAllJobs(String startTime, String finishTime) throws Exception { + OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); + CompositeType ct = factory.getCompositeType(); + TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); + TabularDataSupport rc = new TabularDataSupport(tt); + long start = JobImpl.getDataTime(startTime); + long finish = JobImpl.getDataTime(finishTime); + List jobs = this.jobScheduler.getAllJobs(start, finish); + for (Job job : jobs) { + rc.put(new CompositeDataSupport(ct, factory.getFields(job))); + } + return rc; + } + + public TabularData getNextScheduleJobs() throws Exception { + OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); + CompositeType ct = factory.getCompositeType(); + TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); + TabularDataSupport rc = new TabularDataSupport(tt); + List jobs = this.jobScheduler.getNextScheduleJobs(); + for (Job job : jobs) { + rc.put(new CompositeDataSupport(ct, factory.getFields(job))); + } + return rc; + } + + public String getNextScheduleTime() throws Exception { + long time = this.jobScheduler.getNextScheduleTime(); + return JobImpl.getDateTime(time); + } + + public void removeAllJobs() throws Exception { + this.jobScheduler.removeAllJobs(); + + } + + public void removeAllJobs(String startTime, String finishTime) throws Exception { + long start = JobImpl.getDataTime(startTime); + long finish = JobImpl.getDataTime(finishTime); + this.jobScheduler.removeAllJobs(start, finish); + + } + + public void removeJob(String jobId) throws Exception { + this.jobScheduler.remove(jobId); + + } + + public void removeJobAtScheduledTime(String time) throws IOException { + // TODO Auto-generated method stub + + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java?rev=907197&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java Sat Feb 6 08:57:24 2010 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.management.openmbean.TabularData; + + + +public interface JobSchedulerViewMBean { + /** + * remove all jobs scheduled to run at this time + * @param time + * @throws Exception + */ + @MBeanInfo("remove jobs with matching execution time") + public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; + + /** + * remove a job with the matching jobId + * @param jobId + * @throws Exception + */ + @MBeanInfo("remove jobs with matching jobId") + public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception; + + /** + * remove all the Jobs from the scheduler + * @throws Exception + */ + @MBeanInfo("remove all scheduled jobs") + public abstract void removeAllJobs() throws Exception; + + /** + * remove all the Jobs from the scheduler that are due between the start and finish times + * @param start time + * @param finish time + * @throws Exception + */ + @MBeanInfo("remove all scheduled jobs between time ranges ") + public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception; + + + + /** + * Get the next time jobs will be fired + * @return the time in milliseconds + * @throws Exception + */ + @MBeanInfo("get the next time a job is due to be scheduled ") + public abstract String getNextScheduleTime() throws Exception; + + /** + * Get all the jobs scheduled to run next + * @return a list of jobs that will be scheduled next + * @throws Exception + */ + @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ") + public abstract TabularData getNextScheduleJobs() throws Exception; + + /** + * Get all the outstanding Jobs + * @return a table of all jobs + * @throws Exception + + */ + @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ") + public abstract TabularData getAllJobs() throws Exception; + + /** + * Get all outstanding jobs due to run between start and finish + * @param start + * @param finish + * @return a table of jobs in the range + * @throws Exception + + */ + @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ") + public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception; +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Sat Feb 6 08:57:24 2010 @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; - import javax.management.Attribute; import javax.management.JMException; import javax.management.MBeanServer; @@ -41,7 +40,6 @@ import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXServiceURL; - import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -69,12 +67,12 @@ private int connectorPort = 1099; private int rmiServerPort; private String connectorPath = "/jmxrmi"; - private AtomicBoolean started = new AtomicBoolean(false); - private AtomicBoolean connectorStarting = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean connectorStarting = new AtomicBoolean(false); private JMXConnectorServer connectorServer; private ObjectName namingServiceObjectName; private Registry registry; - private List registeredMBeanNames = new CopyOnWriteArrayList(); + private final List registeredMBeanNames = new CopyOnWriteArrayList(); public ManagementContext() { this(null); @@ -94,6 +92,7 @@ } catch (Throwable ignore) { } Thread t = new Thread("JMX connector") { + @Override public void run() { try { JMXConnectorServer server = connectorServer; @@ -314,6 +313,10 @@ return getMBeanServer().queryNames(name, query); } + public ObjectInstance getObjectInstance(ObjectName name) throws Exception { + return getMBeanServer().getObjectInstance(name); + } + /** * Unregister an MBean * Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Sat Feb 6 08:57:24 2010 @@ -16,6 +16,13 @@ */ package org.apache.activemq.broker.jmx; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.BODY_LENGTH; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.BODY_PREVIEW; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.CONTENT_MAP; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.JMSXGROUP_ID; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.JMSXGROUP_SEQ; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.MESSAGE_TEXT; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.ORIGINAL_DESTINATION; import java.io.IOException; import java.util.ArrayList; import java.util.Date; @@ -23,10 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; - import javax.jms.DeliveryMode; import javax.jms.JMSException; -import javax.jms.Destination; import javax.management.openmbean.ArrayType; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -34,9 +39,9 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.OpenType; import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularType; import javax.management.openmbean.TabularDataSupport; - +import javax.management.openmbean.TabularType; +import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMessage; @@ -44,7 +49,6 @@ import org.apache.activemq.command.ActiveMQStreamMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; -import static org.apache.activemq.broker.jmx.CompositeDataConstants.*; public final class OpenTypeSupport { @@ -54,14 +58,14 @@ Map getFields(Object o) throws OpenDataException; } - private static final Map OPEN_TYPE_FACTORIES = new HashMap(); + private static final Map OPEN_TYPE_FACTORIES = new HashMap(); abstract static class AbstractOpenTypeFactory implements OpenTypeFactory { private CompositeType compositeType; - private List itemNamesList = new ArrayList(); - private List itemDescriptionsList = new ArrayList(); - private List itemTypesList = new ArrayList(); + private final List itemNamesList = new ArrayList(); + private final List itemDescriptionsList = new ArrayList(); + private final List itemTypesList = new ArrayList(); public CompositeType getCompositeType() throws OpenDataException { if (compositeType == null) { @@ -109,10 +113,12 @@ protected TabularType floatPropertyTabularType; protected TabularType doublePropertyTabularType; + @Override protected String getTypeName() { return ActiveMQMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); addItem("JMSCorrelationID", "JMSCorrelationID", SimpleType.STRING); @@ -150,6 +156,7 @@ addItem(CompositeDataConstants.DOUBLE_PROPERTIES, "User Double Properties", doublePropertyTabularType); } + @Override public Map getFields(Object o) throws OpenDataException { ActiveMQMessage m = (ActiveMQMessage)o; Map rc = super.getFields(o); @@ -255,16 +262,19 @@ static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override protected String getTypeName() { return ActiveMQBytesMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); addItem(BODY_LENGTH, "Body length", SimpleType.LONG); addItem(BODY_PREVIEW, "Body preview", new ArrayType(1, SimpleType.BYTE)); } + @Override public Map getFields(Object o) throws OpenDataException { ActiveMQBytesMessage m = (ActiveMQBytesMessage)o; Map rc = super.getFields(o); @@ -298,15 +308,18 @@ static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override protected String getTypeName() { return ActiveMQMapMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); addItem(CONTENT_MAP, "Content map", SimpleType.STRING); } + @Override public Map getFields(Object o) throws OpenDataException { ActiveMQMapMessage m = (ActiveMQMapMessage)o; Map rc = super.getFields(o); @@ -320,14 +333,17 @@ } static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override protected String getTypeName() { return ActiveMQObjectMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); } + @Override public Map getFields(Object o) throws OpenDataException { Map rc = super.getFields(o); return rc; @@ -335,14 +351,17 @@ } static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override protected String getTypeName() { return ActiveMQStreamMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); } + @Override public Map getFields(Object o) throws OpenDataException { Map rc = super.getFields(o); return rc; @@ -351,15 +370,18 @@ static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override protected String getTypeName() { return ActiveMQTextMessage.class.getName(); } + @Override protected void init() throws OpenDataException { super.init(); addItem(MESSAGE_TEXT, MESSAGE_TEXT, SimpleType.STRING); } + @Override public Map getFields(Object o) throws OpenDataException { ActiveMQTextMessage m = (ActiveMQTextMessage)o; Map rc = super.getFields(o); @@ -371,6 +393,41 @@ return rc; } } + + + static class JobOpenTypeFactory extends AbstractOpenTypeFactory { + + @Override + protected String getTypeName() { + return Job.class.getName(); + } + + @Override + protected void init() throws OpenDataException { + super.init(); + addItem("jobId", "jobId", SimpleType.STRING); + addItem("cronEntry", "Cron entry", SimpleType.STRING); + addItem("start", "start time", SimpleType.STRING); + addItem("next", "next time", SimpleType.STRING); + addItem("period", "period between jobs", SimpleType.LONG); + addItem("repeat", "number of times to repeat", SimpleType.INTEGER); + } + + @Override + public Map getFields(Object o) throws OpenDataException { + Job job = (Job) o; + Map rc = super.getFields(o); + rc.put("jobId", job.getJobId()); + rc.put("cronEntry", "" + job.getCronEntry()); + rc.put("start", job.getStartTime()); + rc.put("next", job.getNextExecutionTime()); + rc.put("period", job.getPeriod()); + rc.put("repeat", job.getRepeat()); + return rc; + } + } + + static { OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory()); @@ -379,12 +436,13 @@ OPEN_TYPE_FACTORIES.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory()); OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory()); OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory()); } private OpenTypeSupport() { } - public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException { + public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException { return OPEN_TYPE_FACTORIES.get(clazz); } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java?rev=907197&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java Sat Feb 6 08:57:24 2010 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + + +public interface Job { + + /** + * @return the jobId + */ + public abstract String getJobId(); + + /** + * @return the repeat + */ + public abstract int getRepeat(); + + /** + * @return the start + */ + public abstract long getStart(); + + /** + * @return the period + */ + public abstract long getPeriod(); + + /** + * @return the cron entry + */ + public abstract String getCronEntry(); + + /** + * @return the payload + */ + public abstract byte[] getPayload(); + + /** + * Get the start time as a Date time string + * @return the date time + */ + public String getStartTime(); + + /** + * Get the time the job is next due to execute + * @return the date time + */ + public String getNextExecutionTime(); + +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java?rev=907197&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java Sat Feb 6 08:57:24 2010 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import org.apache.kahadb.util.ByteSequence; + + +public class JobImpl implements Job { + private final JobLocation location; + private final byte[] payload; + + protected JobImpl(JobLocation location,ByteSequence bs) { + this.location=location; + this.payload = new byte[bs.getLength()]; + System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength()); + } + + public String getJobId() { + return this.location.getJobId(); + } + + public byte[] getPayload() { + return this.payload; + } + + public long getPeriod() { + return this.location.getPeriod(); + } + + public int getRepeat() { + return this.location.getRepeat(); + } + + public long getStart() { + return this.location.getStart(); + } + + public String getCronEntry() { + return this.location.getCronEntry(); + } + + + + public String getNextExecutionTime() { + // TODO Auto-generated method stub + return null; + } + + public String getStartTime() { + return JobImpl.getDateTime(getStart()); + } + + public static long getDataTime(String value) throws Exception { + DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + Date date = dfm.parse(value); + return date.getTime(); + } + + public static String getDateTime(long value) { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(value); + return dateFormat.format(date); + } + + + + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java Sat Feb 6 08:57:24 2010 @@ -30,6 +30,7 @@ private int repeat; private long start; private long period; + private String cronEntry; private final Location location; public JobLocation(Location location) { @@ -46,14 +47,19 @@ this.repeat = in.readInt(); this.start = in.readLong(); this.period = in.readLong(); + this.cronEntry=in.readUTF(); this.location.readExternal(in); } public void writeExternal(DataOutput out) throws IOException { out.writeUTF(this.jobId); - out.writeInt(repeat); - out.writeLong(start); - out.writeLong(period); + out.writeInt(this.repeat); + out.writeLong(this.start); + out.writeLong(this.period); + if (this.cronEntry==null) { + this.cronEntry=""; + } + out.writeUTF(this.cronEntry); this.location.writeExternal(out); } @@ -117,6 +123,20 @@ public void setPeriod(long period) { this.period = period; } + + /** + * @return the cronEntry + */ + public synchronized String getCronEntry() { + return this.cronEntry; + } + + /** + * @param cronEntry the cronEntry to set + */ + public synchronized void setCronEntry(String cronEntry) { + this.cronEntry = cronEntry; + } /** * @return the location Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Sat Feb 6 08:57:24 2010 @@ -16,35 +16,37 @@ */ package org.apache.activemq.broker.scheduler; -import java.io.IOException; import java.util.List; import org.apache.kahadb.util.ByteSequence; -interface JobScheduler { +public interface JobScheduler { /** * @return the name of the scheduler + * @throws Exception */ - public abstract String getName(); + public abstract String getName() throws Exception; /** * Add a Job listener * @param l + * @throws Exception */ - public abstract void addListener(JobListener l); + public abstract void addListener(JobListener l) throws Exception; /** * remove a JobListener * @param l + * @throws Exception */ - public abstract void removeListener(JobListener l); + public abstract void removeListener(JobListener l) throws Exception; /** * Add a job to be scheduled * @param jobId a unique identifier for the job * @param payload the message to be sent when the job is scheduled * @param delay the time in milliseconds before the job will be run - * @throws IOException + * @throws Exception */ - public abstract void schedule(String jobId, ByteSequence payload,long delay) throws IOException; + public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception; /** @@ -54,36 +56,68 @@ * @param start * @param period the time in milliseconds between successive executions of the Job * @param repeat the number of times to execute the job - less than 0 will be repeated forever - * @throws IOException + * @throws Exception */ - public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws IOException; + public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws Exception; /** * remove all jobs scheduled to run at this time * @param time - * @throws IOException + * @throws Exception */ - public abstract void remove(long time) throws IOException; + public abstract void remove(long time) throws Exception; /** * remove a job with the matching jobId * @param jobId - * @throws IOException + * @throws Exception */ - public abstract void remove(String jobId) throws IOException; - + public abstract void remove(String jobId) throws Exception; + /** - * Get all the jobs scheduled to run next - * @return a list of messages that will be scheduled next - * @throws IOException + * remove all the Jobs from the scheduler + * @throws Exception */ - public abstract List getNextScheduleJobs() throws IOException; + public abstract void removeAllJobs() throws Exception; + + /** + * remove all the Jobs from the scheduler that are due between the start and finish times + * @param start time in milliseconds + * @param finish time in milliseconds + * @throws Exception + */ + public abstract void removeAllJobs(long start,long finish) throws Exception; + + /** * Get the next time jobs will be fired * @return the time in milliseconds - * @throws IOException + * @throws Exception + */ + public abstract long getNextScheduleTime() throws Exception; + + /** + * Get all the jobs scheduled to run next + * @return a list of jobs that will be scheduled next + * @throws Exception + */ + public abstract List getNextScheduleJobs() throws Exception; + + /** + * Get all the outstanding Jobs + * @return a list of all jobs + * @throws Exception + */ + public abstract List getAllJobs() throws Exception; + + /** + * Get all outstanding jobs due to run between start and finish + * @param start + * @param finish + * @return a list of jobs + * @throws Exception */ - public abstract long getNextScheduleTime() throws IOException; + public abstract List getAllJobs(long start,long finish)throws Exception; } \ No newline at end of file Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=907197&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Sat Feb 6 08:57:24 2010 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.util.Collections; +import java.util.List; +import org.apache.kahadb.util.ByteSequence; + +public class JobSchedulerFacade implements JobScheduler { + + private final SchedulerBroker broker; + + JobSchedulerFacade(SchedulerBroker broker){ + this.broker=broker; + } + public void addListener(JobListener l) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.addListener(l); + } + } + + public List getAllJobs() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + return js.getAllJobs(); + } + return Collections.emptyList(); + } + + public List getAllJobs(long start, long finish) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + return js.getAllJobs(start,finish); + } + return Collections.emptyList(); + } + + public String getName() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + return js.getName(); + } + return ""; + } + + public List getNextScheduleJobs() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + return js.getNextScheduleJobs(); + } + return Collections.emptyList(); + } + + public long getNextScheduleTime() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + return js.getNextScheduleTime(); + } + return 0; + } + + public void remove(long time) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.remove(time); + } + } + + public void remove(String jobId) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.remove(jobId); + } + + } + + public void removeAllJobs() throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.removeAllJobs(); + } + } + + public void removeAllJobs(long start, long finish) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.removeAllJobs(start,finish); + } + + } + + public void removeListener(JobListener l) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.removeListener(l); + } + + } + + public void schedule(String jobId, ByteSequence payload, long delay) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.schedule(jobId, payload, delay); + } + } + + public void schedule(String jobId, ByteSequence payload, long start, long period, int repeat) throws Exception { + JobScheduler js = this.broker.getInternalScheduler(); + if (js !=null) { + js.schedule(jobId, payload, start,period,repeat); + } + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java Sat Feb 6 08:57:24 2010 @@ -82,7 +82,6 @@ this.jobListeners.remove(l); } - public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { this.store.getPageFile().tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -91,14 +90,14 @@ }); } - - public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, final int repeat) throws IOException { + public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, + final int repeat) throws IOException { this.store.getPageFile().tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { schedule(tx, jobId, payload, start, period, repeat); } }); - + } /* @@ -147,8 +146,8 @@ * (non-Javadoc) * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs() */ - public synchronized List getNextScheduleJobs() throws IOException { - final List result = new ArrayList(); + public synchronized List getNextScheduleJobs() throws IOException { + final List result = new ArrayList(); this.store.getPageFile().tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -156,26 +155,90 @@ if (first != null) { for (JobLocation jl : first.getValue()) { ByteSequence bs = getJob(jl.getLocation()); - result.add(bs); + Job job = new JobImpl(jl, bs); + result.add(job); + } + } + } + }); + return result; + } + + public synchronized List getAllJobs() throws IOException { + final List result = new ArrayList(); + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + Iterator>> iter = index.iterator(store.getPageFile().tx()); + while (iter.hasNext()) { + Map.Entry> next = iter.next(); + if (next != null) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getJob(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); + } + } else { + break; + } + } + + } + }); + return result; + } + + public synchronized List getAllJobs(final long start, final long finish) throws IOException { + final List result = new ArrayList(); + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + Iterator>> iter = index.iterator(store.getPageFile().tx(), start); + while (iter.hasNext()) { + Map.Entry> next = iter.next(); + if (next != null && next.getKey().longValue() <= finish) { + for (JobLocation jl : next.getValue()) { + ByteSequence bs = getJob(jl.getLocation()); + Job job = new JobImpl(jl, bs); + result.add(job); + } + } else { + break; } } + } }); return result; } + public synchronized void removeAllJobs() throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + destroy(tx); + } + }); + } + + public synchronized void removeAllJobs(final long start, final long finish) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + destroy(tx,start,finish); + } + }); + + } + ByteSequence getJob(Location location) throws IllegalStateException, IOException { return this.store.getJob(location); } - void schedule(Transaction tx, String jobId, ByteSequence payload,long start, long period, int repeat) + void schedule(Transaction tx, String jobId, ByteSequence payload, long start, long period, int repeat) throws IOException { List values = null; long startTime; long time; if (start > 0) { time = startTime = start; - }else { + } else { startTime = System.currentTimeMillis(); time = startTime + period; } @@ -239,15 +302,40 @@ } synchronized void destroy(Transaction tx) throws IOException { + List keys = new ArrayList(); for (Iterator>> i = this.index.iterator(tx); i.hasNext();) { Map.Entry> entry = i.next(); + keys.add(entry.getKey()); List values = entry.getValue(); if (values != null) { for (JobLocation jl : values) { this.store.decrementJournalCount(tx, jl.getLocation()); } } + } + for (Long l : keys) { + this.index.remove(tx, l); + } + } + synchronized void destroy(Transaction tx, long start, long finish) throws IOException { + List keys = new ArrayList(); + for (Iterator>> i = this.index.iterator(tx, start); i.hasNext();) { + Map.Entry> entry = i.next(); + if (entry.getKey().longValue() <= finish) { + keys.add(entry.getKey()); + List values = entry.getValue(); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); + } + } + } else { + break; + } + } + for (Long l : keys) { + this.index.remove(tx, l); } } @@ -311,7 +399,7 @@ ByteSequence payload = this.store.getJob(jl.getLocation()); String jobId = jl.getJobId(); long period = jl.getPeriod(); - schedule(jobId, payload,0, period, repeat); + schedule(jobId, payload, 0, period, repeat); } } // now remove jobs from this execution time @@ -408,7 +496,4 @@ } } } - - - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Sat Feb 6 08:57:24 2010 @@ -60,6 +60,11 @@ LOG.info("Scheduler using directory: " + directory); } + + public synchronized JobScheduler getJobScheduler() throws Exception { + return new JobSchedulerFacade(this); + } + /** * @return the directory */ @@ -114,7 +119,7 @@ repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); } org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend); - getScheduler().schedule( messageSend.getMessageId().toString(), + getInternalScheduler().schedule( messageSend.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat); } else { @@ -137,12 +142,15 @@ messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); } } + //Add the jobId as a property + messageSend.setProperty("scheduledJobId", id); //if this goes across a network - we don't want it rescheduled messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START); messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); producerExchange.setConnectionContext(context); producerExchange.setMutable(true); @@ -153,11 +161,11 @@ } } - - private JobScheduler getScheduler() throws Exception { + + protected synchronized JobScheduler getInternalScheduler() throws Exception { if (this.started.get()) { if (this.scheduler == null) { - this.scheduler = getStore().getJobScheduler("ActiveMQ"); + this.scheduler = getStore().getJobScheduler("JMS"); this.scheduler.addListener(this); } return this.scheduler; @@ -165,6 +173,8 @@ return null; } + + private JobSchedulerStore getStore() throws Exception { if (started.get()) { if (this.store == null) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Sat Feb 6 08:57:24 2010 @@ -44,7 +44,7 @@ for (ByteSequence job:list) { js.schedule("id:"+(count++), job,startTime,10000,-1); } - Listtest = js.getNextScheduleJobs(); + Listtest = js.getNextScheduleJobs(); assertEquals(list.size(),test.size()); store.stop(); @@ -54,7 +54,7 @@ assertEquals(list.size(),test.size()); for (int i = 0; i < list.size();i++) { String orig = new String(list.get(i).getData()); - String payload = new String(test.get(i).getData()); + String payload = new String(test.get(i).getPayload()); assertEquals(orig,payload); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=907197&r1=907196&r2=907197&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Sat Feb 6 08:57:24 2010 @@ -19,7 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; -import java.io.IOException; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.util.IOHelper; @@ -116,7 +116,7 @@ } @Test - public void testRemoveString() throws IOException { + public void testRemoveString() throws Exception { final int COUNT = 10; final String test = "TESTREMOVE"; long time = System.currentTimeMillis() + 20000; @@ -135,6 +135,82 @@ assertEquals(size, COUNT); } + @Test + public void testgetAllJobs() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + final String test = "TEST"; + long time = System.currentTimeMillis() + 20000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), time, 10 + i, -1); + } + List list = scheduler.getAllJobs(); + + assertEquals(list.size(), COUNT); + int count = 0; + for (Job job : list) { + + assertEquals(job.getJobId(), ID + count); + count++; + } + } + + @Test + public void testgetAllJobsInRange() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + final String test = "TEST"; + long start = System.currentTimeMillis() + 10000; + + long time = System.currentTimeMillis() + 20000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + if (i < (COUNT - 2)) { + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0); + } else { + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0); + } + } + long finish = start + ((COUNT - 2) * 1000); + List list = scheduler.getAllJobs(start, finish); + + assertEquals(list.size(), COUNT - 2); + int count = 0; + for (Job job : list) { + + assertEquals(job.getJobId(), ID + count); + count++; + } + } + + @Test + public void testRemoveAllJobsInRange() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + final String test = "TEST"; + long start = System.currentTimeMillis() + 10000; + + long time = System.currentTimeMillis() + 20000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + if (i < (COUNT - 2)) { + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0); + } else { + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0); + } + } + long finish = start + ((COUNT - 2) * 1000); + scheduler.removeAllJobs(start, finish); + List list = scheduler.getAllJobs(); + assertEquals(list.size(), 2); + int count = COUNT - 2; + for (Job job : list) { + assertEquals(job.getJobId(), ID + count); + count++; + } + } + @Before public void setUp() throws Exception { File directory = new File("target/test/ScheduledDB");