activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r899633 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/scheduler/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/broker...
Date Fri, 15 Jan 2010 14:09:31 GMT
Author: rajdavies
Date: Fri Jan 15 14:09:30 2010
New Revision: 899633

URL: http://svn.apache.org/viewvc?rev=899633&view=rev
Log:
Added fix for https://issues.apache.org/activemq/browse/AMQ-451

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+public interface ScheduledMessage {
+    /**
+     * The time in milliseconds that a message will be scheduled to be delivered by the broker
+     */
+    public static final String AMQ_SCHEDULED_START = "AMQ_SCHEDULED_START_TIME";
+    /**
+     * The time in milliseconds to wait after the start time to wait before scheduling the message again
+     */
+    public static final String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD";
+    /**
+     * The number of times to repeat scheduling a message for delivery
+     */
+    public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
+    
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=899633&r1=899632&r2=899633&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Jan 15 14:09:30 2010
@@ -31,10 +31,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
@@ -63,6 +61,7 @@
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.scheduler.SchedulerBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.kaha.Store;
@@ -134,11 +133,11 @@
     private PersistenceAdapterFactory persistenceFactory;
     protected DestinationFactory destinationFactory;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
-    private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
-    private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
-    private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
-    private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
-    private List<Service> services = new ArrayList<Service>();
+    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
+    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
+    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
+    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
+    private final List<Service> services = new ArrayList<Service>();
     private MasterConnector masterConnector;
     private String masterConnectorURI;
     private transient Thread shutdownHook;
@@ -151,8 +150,8 @@
     private boolean advisorySupport = true;
     private URI vmConnectorURI;
     private PolicyMap destinationPolicy;
-    private AtomicBoolean started = new AtomicBoolean(false);
-    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
     private boolean keepDurableSubsActive = true;
     private boolean useVirtualTopics = true;
@@ -164,8 +163,8 @@
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName;
-    private CountDownLatch stoppedLatch = new CountDownLatch(1);
-    private CountDownLatch startedLatch = new CountDownLatch(1);
+    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
+    private final CountDownLatch startedLatch = new CountDownLatch(1);
     private boolean supportFailOver;
     private Broker regionBroker;
     private int producerSystemUsagePortion = 60;
@@ -176,12 +175,14 @@
     private boolean dedicatedTaskRunner;
     private boolean cacheTempDestinations = false;// useful for failover
     private int timeBeforePurgeTempDestinations = 5000;
-    private List<Runnable> shutdownHooks = new ArrayList<Runnable>();
+    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
     private boolean systemExitOnShutdown;
     private int systemExitOnShutdownExitCode;
     private SslContext sslContext;
     private boolean forceStart = false;
     private IOExceptionHandler ioExceptionHandler;
+    private boolean schedulerSupport = true;
+    private File schedulerDirectoryFile;
 
 	static {
         String localHostName = "localhost";
@@ -512,7 +513,8 @@
         
         if (systemExitOnShutdown) {
         	new Thread() {
-        		public void run() {
+        		@Override
+                public void run() {
         			System.exit(systemExitOnShutdownExitCode);
         		}
         	}.start();
@@ -1064,7 +1066,7 @@
     }
 
     public Service[] getServices() {
-        return (Service[]) services.toArray(new Service[0]);
+        return services.toArray(new Service[0]);
     }
 
     /**
@@ -1675,15 +1677,18 @@
         broker = new MutableBrokerFilter(broker) {
             Broker old;
 
+            @Override
             public void stop() throws Exception {
                 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
                     // Just ignore additional stop actions.
+                    @Override
                     public void stop() throws Exception {
                     }
                 });
                 old.stop();
             }
 
+            @Override
             public void start() throws Exception {
                 if (forceStart && old != null) {
                     this.next.set(old);
@@ -1757,6 +1762,9 @@
      */
     protected Broker addInterceptors(Broker broker) throws Exception {
         broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
+        if (isSchedulerSupport()) {
+            broker = new SchedulerBroker(broker,getSchedulerDirectoryFile());
+        }
         if (isAdvisorySupport()) {
             broker = new AdvisoryBroker(broker);
         }
@@ -1821,6 +1829,7 @@
     protected void addShutdownHook() {
         if (useShutdownHook) {
             shutdownHook = new Thread("ActiveMQ ShutdownHook") {
+                @Override
                 public void run() {
                     containerShutdown();
                 }
@@ -2141,6 +2150,41 @@
         ioExceptionHandler.setBrokerService(this);
         this.ioExceptionHandler = ioExceptionHandler;
     }
+
+    /**
+     * @return the schedulerSupport
+     */
+    public boolean isSchedulerSupport() {
+        return this.schedulerSupport;
+    }
+
+    /**
+     * @param schedulerSupport the schedulerSupport to set
+     */
+    public void setSchedulerSupport(boolean schedulerSupport) {
+        this.schedulerSupport = schedulerSupport;
+    }
+
+    /**
+     * @return the schedulerDirectory
+     */
+    public File getSchedulerDirectoryFile() {
+        if (this.schedulerDirectoryFile == null) {
+            this.schedulerDirectoryFile = new File(IOHelper.getDefaultDataDirectory(),"scheduler");
+        }
+        return schedulerDirectoryFile;
+    }
+
+    /**
+     * @param schedulerDirectory the schedulerDirectory to set
+     */
+    public void setSchedulerDirectoryFile(File schedulerDirectory) {
+        this.schedulerDirectoryFile = schedulerDirectory;
+    }
+    
+    public void setSchedulerDirectory(String schedulerDirectory) {
+        setSchedulerDirectoryFile(new File(schedulerDirectory));
+    }
     
    
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import org.apache.kahadb.util.ByteSequence;
+
+public interface JobListener {
+    
+    /**
+     * A Job that has been scheduled is now ready 
+     * @param id
+     * @param job
+     */
+    public void scheduledJob(String id,ByteSequence job);
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.VariableMarshaller;
+
+class JobLocation {
+   
+    private String jobId;
+    private int repeat;
+    private long start;
+    private long period;
+    private final Location location;
+
+    public JobLocation(Location location) {
+        this.location = location;
+
+    }
+
+    public JobLocation() {
+        this(new Location());
+    }
+
+    public void readExternal(DataInput in) throws IOException {
+        this.jobId = in.readUTF();
+        this.repeat = in.readInt();
+        this.start = in.readLong();
+        this.period = in.readLong();
+        this.location.readExternal(in);
+    }
+
+    public void writeExternal(DataOutput out) throws IOException {
+        out.writeUTF(this.jobId);
+        out.writeInt(repeat);
+        out.writeLong(start);
+        out.writeLong(period);
+        this.location.writeExternal(out);
+    }
+
+    /**
+     * @return the jobId
+     */
+    public String getJobId() {
+        return this.jobId;
+    }
+
+    /**
+     * @param jobId
+     *            the jobId to set
+     */
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+    
+
+    /**
+     * @return the repeat
+     */
+    public int getRepeat() {
+        return this.repeat;
+    }
+
+    /**
+     * @param repeat
+     *            the repeat to set
+     */
+    public void setRepeat(int repeat) {
+        this.repeat = repeat;
+    }
+
+    /**
+     * @return the start
+     */
+    public long getStart() {
+        return this.start;
+    }
+
+    /**
+     * @param start
+     *            the start to set
+     */
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    /**
+     * @return the period
+     */
+    public long getPeriod() {
+        return this.period;
+    }
+
+    /**
+     * @param period
+     *            the period to set
+     */
+    public void setPeriod(long period) {
+        this.period = period;
+    }
+
+    /**
+     * @return the location
+     */
+    public Location getLocation() {
+        return this.location;
+    }
+    
+    static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
+        static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
+        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+            List<JobLocation> result = new ArrayList<JobLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                JobLocation jobLocation = new JobLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
+            }
+            return result;
+        }
+
+        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (JobLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.kahadb.util.ByteSequence;
+
+interface JobScheduler {
+
+    /**
+     * @return the name of the scheduler
+     */
+    public abstract String getName();
+/**
+ * Add a Job listener
+ * @param l
+ */
+    public abstract void addListener(JobListener l);
+/**
+ * remove a JobListener
+ * @param l
+ */
+    public abstract void removeListener(JobListener l);
+
+    /**
+     * Add a job to be scheduled
+     * @param jobId a unique identifier for the job
+     * @param payload the message to be sent when the job is scheduled
+     * @param delay the time in milliseconds before the job will be run
+     * @throws IOException
+     */
+    public abstract void schedule(String jobId, ByteSequence payload,long delay) throws IOException;
+
+    
+    /**
+     * Add a job to be scheduled
+     * @param jobId a unique identifier for the job
+     * @param payload the message to be sent when the job is scheduled
+     * @param start 
+     * @param period the time in milliseconds between successive executions of the Job
+     * @param repeat the number of times to execute the job - less than 0 will be repeated forever
+     * @throws IOException
+     */
+    public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws IOException;
+
+    /**
+     * remove all jobs scheduled to run at this time
+     * @param time
+     * @throws IOException
+     */
+    public abstract void remove(long time) throws IOException;
+
+    /**
+     * remove a job with the matching jobId
+     * @param jobId
+     * @throws IOException
+     */
+    public abstract void remove(String jobId) throws IOException;
+
+    /**
+     * Get all the jobs scheduled to run next
+     * @return a list of messages that will be scheduled next
+     * @throws IOException
+     */
+    public abstract List<ByteSequence> getNextScheduleJobs() throws IOException;
+    
+    /**
+     * Get the next time jobs will be fired
+     * @return the time in milliseconds
+     * @throws IOException 
+     */
+    public abstract long getNextScheduleTime() throws IOException;
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
+
+class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
+    private static final Log LOG = LogFactory.getLog(JobSchedulerImpl.class);
+    final JobSchedulerStore store;
+    private final AtomicBoolean running = new AtomicBoolean();
+    private String name;
+    BTreeIndex<Long, List<JobLocation>> index;
+    private Thread thread;
+    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+
+    JobSchedulerImpl(JobSchedulerStore store) {
+
+        this.store = store;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#getName()
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq
+     * .beanstalk.JobListener)
+     */
+    public void addListener(JobListener l) {
+        this.jobListeners.add(l);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache.
+     * activemq.beanstalk.JobListener)
+     */
+    public void removeListener(JobListener l) {
+        this.jobListeners.remove(l);
+    }
+
+   
+    public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, 0, delay, 0);
+            }
+        });
+    }
+
+   
+    public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, final int repeat) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, start, period, repeat);
+            }
+        });
+       
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
+     */
+    public synchronized void remove(final long time) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, time);
+            }
+        });
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long,
+     * java.lang.String)
+     */
+    public synchronized void remove(final long time, final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, time, jobId);
+            }
+        });
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
+     */
+    public synchronized void remove(final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, jobId);
+            }
+        });
+    }
+
+    public synchronized long getNextScheduleTime() throws IOException {
+        Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+        return first != null ? first.getKey() : -1l;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
+     */
+    public synchronized List<ByteSequence> getNextScheduleJobs() throws IOException {
+        final List<ByteSequence> result = new ArrayList<ByteSequence>();
+
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
+                if (first != null) {
+                    for (JobLocation jl : first.getValue()) {
+                        ByteSequence bs = getJob(jl.getLocation());
+                        result.add(bs);
+                    }
+                }
+            }
+        });
+        return result;
+    }
+
+    ByteSequence getJob(Location location) throws IllegalStateException, IOException {
+        return this.store.getJob(location);
+    }
+
+    void schedule(Transaction tx,  String jobId, ByteSequence payload,long start, long period, int repeat)
+            throws IOException {
+        List<JobLocation> values = null;
+        long startTime;
+        long time;
+        if (start > 0) {
+            time = startTime = start;
+        }else {
+            startTime = System.currentTimeMillis();
+            time = startTime + period;
+        }
+        if (this.index.containsKey(tx, time)) {
+            values = this.index.remove(tx, time);
+        }
+        if (values == null) {
+            values = new ArrayList<JobLocation>();
+        }
+
+        Location location = this.store.write(payload, false);
+        JobLocation jobLocation = new JobLocation(location);
+        jobLocation.setJobId(jobId);
+        jobLocation.setPeriod(period);
+        jobLocation.setRepeat(repeat);
+        values.add(jobLocation);
+        this.index.put(tx, time, values);
+        this.store.incrementJournalCount(tx, location);
+        poke();
+    }
+
+    void remove(Transaction tx, long time, String jobId) throws IOException {
+        List<JobLocation> values = this.index.remove(tx, time);
+        if (values != null) {
+            for (int i = 0; i < values.size(); i++) {
+                JobLocation jl = values.get(i);
+                if (jl.getJobId().equals(jobId)) {
+                    values.remove(i);
+                    if (!values.isEmpty()) {
+                        this.index.put(tx, time, values);
+                    }
+                    this.store.decrementJournalCount(tx, jl.getLocation());
+                    break;
+                }
+            }
+        }
+    }
+
+    void remove(Transaction tx, long time) throws IOException {
+        List<JobLocation> values = this.index.remove(tx, time);
+        if (values != null) {
+            for (JobLocation jl : values) {
+                this.store.decrementJournalCount(tx, jl.getLocation());
+            }
+        }
+    }
+
+    void remove(Transaction tx, String id) throws IOException {
+        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+            Map.Entry<Long, List<JobLocation>> entry = i.next();
+            List<JobLocation> values = entry.getValue();
+            if (values != null) {
+                for (JobLocation jl : values) {
+                    if (jl.getJobId().equals(id)) {
+                        remove(tx, entry.getKey(), id);
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    synchronized void destroy(Transaction tx) throws IOException {
+        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+            Map.Entry<Long, List<JobLocation>> entry = i.next();
+            List<JobLocation> values = entry.getValue();
+            if (values != null) {
+                for (JobLocation jl : values) {
+                    this.store.decrementJournalCount(tx, jl.getLocation());
+                }
+            }
+
+        }
+    }
+
+    synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
+        if (!this.store.isStopped() && !this.store.isStopping()) {
+            Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+            return first;
+        }
+        return null;
+
+    }
+
+    void fireJobs(List<JobLocation> list) throws IllegalStateException, IOException {
+        for (JobLocation jl : list) {
+            ByteSequence bs = this.store.getJob(jl.getLocation());
+            for (JobListener l : jobListeners) {
+                l.scheduledJob(jl.getJobId(), bs);
+            }
+        }
+    }
+
+    public void run() {
+        try {
+            mainLoop();
+        } catch (Throwable e) {
+            if (this.running.get() && isStarted()) {
+                LOG.error(this + " Caught exception in mainloop", e);
+            }
+        } finally {
+            if (running.get()) {
+                try {
+                    stop();
+                } catch (Exception e) {
+                    LOG.error("Failed to stop " + this);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JobScheduler:" + this.name;
+    }
+
+    protected void mainLoop() {
+        while (this.running.get()) {
+            try {
+                // peek the next job
+                long currentTime = System.currentTimeMillis();
+
+                Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
+                if (first != null) {
+                    List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
+                    long executionTime = first.getKey();
+                    if (executionTime <= currentTime) {
+                        fireJobs(list);
+                        for (JobLocation jl : list) {
+                            int repeat = jl.getRepeat();
+                            if (repeat != 0) {
+                                repeat--;
+                                ByteSequence payload = this.store.getJob(jl.getLocation());
+                                String jobId = jl.getJobId();
+                                long period = jl.getPeriod();
+                                schedule(jobId, payload,0, period, repeat);
+                            }
+                        }
+                        // now remove jobs from this execution time
+                        remove(executionTime);
+                    } else {
+                        long waitTime = executionTime - currentTime;
+                        synchronized (this.running) {
+                            this.running.wait(waitTime);
+                        }
+                    }
+                } else {
+                    synchronized (this.running) {
+                        this.running.wait(250);
+                    }
+                }
+
+            } catch (InterruptedException e) {
+            } catch (IOException ioe) {
+                LOG.error(this.name + " Failed to schedule job", ioe);
+                try {
+                    this.store.stop();
+                } catch (Exception e) {
+                    LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        this.running.set(true);
+        this.thread = new Thread(this, "JobScheduler:" + this.name);
+        this.thread.setDaemon(true);
+        this.thread.start();
+
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        this.running.set(false);
+        poke();
+        Thread t = this.thread;
+        if (t != null) {
+            t.join(1000);
+        }
+
+    }
+
+    protected void poke() {
+        synchronized (this.running) {
+            this.running.notifyAll();
+        }
+    }
+
+    void createIndexes(Transaction tx) throws IOException {
+        this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
+    }
+
+    void load(Transaction tx) throws IOException {
+        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
+        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
+        this.index.load(tx);
+    }
+
+    void read(DataInput in) throws IOException {
+        this.name = in.readUTF();
+        this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
+        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
+        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeUTF(name);
+        out.writeLong(this.index.getPageId());
+    }
+
+    static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+        static ValueMarshaller INSTANCE = new ValueMarshaller();
+        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+            List<JobLocation> result = new ArrayList<JobLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                JobLocation jobLocation = new JobLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
+            }
+            return result;
+        }
+
+        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (JobLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
+        }
+    }
+
+ 
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Journal;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IntegerMarshaller;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
+
+public class JobSchedulerStore extends ServiceSupport {
+    static final Log LOG = LogFactory.getLog(JobSchedulerStore.class);
+    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+
+    private File directory;
+    PageFile pageFile;
+    private Journal journal;
+    private LockFile lockFile;
+    private boolean failIfDatabaseIsLocked;
+    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+    private boolean enableIndexWriteAsync = false;
+    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    MetaData metaData = new MetaData(this);
+    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
+    Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
+
+    protected class MetaData {
+        protected MetaData(JobSchedulerStore store) {
+            this.store = store;
+        }
+        private final JobSchedulerStore store;
+        Page<MetaData> page;
+        BTreeIndex<Integer, Integer> journalRC;
+        BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
+
+        void createIndexes(Transaction tx) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
+        }
+
+        void load(Transaction tx) throws IOException {
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.storedSchedulers.load(tx);
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.load(tx);
+        }
+
+        void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
+            for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+                Entry<String, JobSchedulerImpl> entry = i.next();
+                entry.getValue().load(tx);
+                schedulers.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        public void read(DataInput is) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+        }
+
+        public void write(DataOutput os) throws IOException {
+            os.writeLong(this.storedSchedulers.getPageId());
+            os.writeLong(this.journalRC.getPageId());
+
+        }
+    }
+
+    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
+        private final JobSchedulerStore store;
+
+        MetaDataMarshaller(JobSchedulerStore store) {
+            this.store = store;
+        }
+        public MetaData readPayload(DataInput dataIn) throws IOException {
+            MetaData rc = new MetaData(this.store);
+            rc.read(dataIn);
+            return rc;
+        }
+
+        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
+            object.write(dataOut);
+        }
+    }
+
+    class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+            List<JobLocation> result = new ArrayList<JobLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                JobLocation jobLocation = new JobLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
+            }
+            return result;
+        }
+
+        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (JobLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
+        }
+    }
+
+    class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
+        private final JobSchedulerStore store;
+        JobSchedulerMarshaller(JobSchedulerStore store) {
+            this.store = store;
+        }
+        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
+            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
+            result.read(dataIn);
+            return result;
+        }
+
+        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
+            js.write(dataOut);
+        }
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public JobScheduler getJobScheduler(final String name) throws Exception {
+        JobSchedulerImpl result = this.schedulers.get(name);
+        if (result == null) {
+            final JobSchedulerImpl js = new JobSchedulerImpl(this);
+            js.setName(name);
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    js.createIndexes(tx);
+                    js.load(tx);
+                    metaData.storedSchedulers.put(tx, name, js);
+                }
+            });
+            result = js;
+            this.schedulers.put(name, js);
+            if (isStarted()) {
+                result.start();
+            }
+        }
+        return result;
+    }
+
+    synchronized public boolean removeJobScheduler(final String name) throws Exception {
+        boolean result = false;
+        final JobSchedulerImpl js = this.schedulers.remove(name);
+        result = js != null;
+        if (result) {
+            js.stop();
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    metaData.storedSchedulers.remove(tx, name);
+                    js.destroy(tx);
+                }
+            });
+        }
+        return result;
+    }
+
+    @Override
+    protected synchronized void doStart() throws Exception {
+        if (this.directory == null) {
+            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
+        }
+        IOHelper.mkdirs(this.directory);
+        lock();
+        this.journal = new Journal();
+        this.journal.setDirectory(directory);
+        this.journal.setMaxFileLength(getJournalMaxFileLength());
+        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+        this.journal.start();
+        this.pageFile = new PageFile(directory, "scheduleDB");
+        this.pageFile.load();
+
+        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                if (pageFile.getPageCount() == 0) {
+                    Page<MetaData> page = tx.allocate();
+                    assert page.getPageId() == 0;
+                    page.set(metaData);
+                    metaData.page = page;
+                    metaData.createIndexes(tx);
+                    tx.store(metaData.page, metaDataMarshaller, true);
+
+                } else {
+                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
+                    metaData = page.get();
+                    metaData.page = page;
+                }
+                metaData.load(tx);
+                metaData.loadScheduler(tx, schedulers);
+                for (JobSchedulerImpl js :schedulers.values()) {
+                    try {
+                        js.start();
+                    } catch (Exception e) {
+                        JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
+                    }
+               }
+            }
+        });
+
+        this.pageFile.flush();
+        LOG.info(this + " started");
+    }
+    
+    @Override
+    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+        for (JobSchedulerImpl js : this.schedulers.values()) {
+            js.stop();
+        }
+        if (this.pageFile != null) {
+            this.pageFile.unload();
+        }
+        if (this.journal != null) {
+            journal.close();
+        }
+        if (this.lockFile != null) {
+            this.lockFile.unlock();
+        }
+        this.lockFile = null;
+        LOG.info(this + " stopped");
+
+    }
+
+    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        Integer val = this.metaData.journalRC.get(tx, logId);
+        int refCount = val != null ? val.intValue() + 1 : 1;
+        this.metaData.journalRC.put(tx, logId, refCount);
+
+    }
+
+    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        int refCount = this.metaData.journalRC.get(tx, logId);
+        refCount--;
+        if (refCount <= 0) {
+            this.metaData.journalRC.remove(tx, logId);
+            Set<Integer> set = new HashSet<Integer>();
+            set.add(logId);
+            this.journal.removeDataFiles(set);
+        } else {
+            this.metaData.journalRC.put(tx, logId, refCount);
+        }
+
+    }
+
+    synchronized ByteSequence getJob(Location location) throws IllegalStateException, IOException {
+        ByteSequence result = null;
+        result = this.journal.read(location);
+        return result;
+    }
+
+    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+        return this.journal.write(payload, sync);
+    }
+
+    private void lock() throws IOException {
+        if (lockFile == null) {
+            File lockFileName = new File(directory, "lock");
+            lockFile = new LockFile(lockFileName, true);
+            if (failIfDatabaseIsLocked) {
+                lockFile.lock();
+            } else {
+                while (true) {
+                    try {
+                        lockFile.lock();
+                        break;
+                    } catch (IOException e) {
+                        LOG.info("Database " + lockFileName + " is locked... waiting "
+                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+                                + " seconds for the database to be unlocked. Reason: " + e);
+                        try {
+                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
+                        } catch (InterruptedException e1) {
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    PageFile getPageFile() {
+        this.pageFile.isLoaded();
+        return this.pageFile;
+    }
+
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
+    }
+
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    }
+
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
+
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
+
+    public int getJournalMaxWriteBatchSize() {
+        return journalMaxWriteBatchSize;
+    }
+
+    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+    }
+
+    public boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
+
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
+
+    @Override
+    public String toString() {
+        return "JobSchedulerStore:" + this.directory;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.TypeConversionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.ByteSequence;
+
+public class SchedulerBroker extends BrokerFilter implements JobListener {
+    private static final Log LOG = LogFactory.getLog(SchedulerBroker.class);
+    private static final IdGenerator ID_GENERATOR = new IdGenerator();
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+    private final AtomicBoolean started = new AtomicBoolean();
+    private final WireFormat wireFormat = new OpenWireFormat();
+    private final ConnectionContext context = new ConnectionContext();
+    private final ProducerId producerId = new ProducerId();
+    private File directory;
+
+    private JobSchedulerStore store;
+    private JobScheduler scheduler;
+
+    public SchedulerBroker(Broker next, File directory) throws Exception {
+        super(next);
+        this.directory = directory;
+        this.producerId.setConnectionId(ID_GENERATOR.generateId());
+        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+        context.setBroker(next);
+        LOG.info("Scheduler using directory: " + directory);
+
+    }
+    /**
+     * @return the directory
+     */
+    public File getDirectory() {
+        return this.directory;
+    }
+    /**
+     * @param directory
+     *            the directory to set
+     */
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.started.set(true);
+        super.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (this.started.compareAndSet(true, false)) {
+
+            if (this.store != null) {
+                this.store.stop();
+            }
+            if (this.scheduler != null) {
+                this.scheduler.removeListener(this);
+                this.scheduler = null;
+            }
+        }
+        super.stop();
+    }
+
+    @Override
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        long start = 0;
+        long period = 0;
+        int repeat = 0;
+
+        Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
+
+        if (periodValue != null) {
+            period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+            Object startValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_START);
+            if (startValue != null) {
+                start = (Long) TypeConversionSupport.convert(startValue, Long.class);
+            }
+            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            if (repeatValue != null) {
+                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+            }
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
+            getScheduler().schedule( messageSend.getMessageId().toString(),
+                    new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat);
+
+        } else {
+
+            super.send(producerExchange, messageSend);
+        }
+    }
+
+    public void scheduledJob(String id, ByteSequence job) {
+        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
+                .getOffset(), job.getLength());
+        try {
+            Message messageSend = (Message) this.wireFormat.unmarshal(packet);
+            messageSend.setOriginalTransactionId(null);
+            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            if (repeatValue != null) {
+                int repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+                if (repeat != 0) {
+                    //create a unique id - the original message could be sent lots of times
+                    messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+                }
+            }   
+           
+            //if this goes across a network - we don't want it rescheduled
+            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
+            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START);
+            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            
+            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+            producerExchange.setConnectionContext(context);
+            producerExchange.setMutable(true);
+            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+            super.send(producerExchange, messageSend);
+        } catch (Exception e) {
+            LOG.error("Failed to send scheduled message " + id, e);
+        }
+
+    }
+
+    private JobScheduler getScheduler() throws Exception {
+        if (this.started.get()) {
+            if (this.scheduler == null) {
+                this.scheduler = getStore().getJobScheduler("ActiveMQ");
+                this.scheduler.addListener(this);
+            }
+            return this.scheduler;
+        }
+        return null;
+    }
+
+    private JobSchedulerStore getStore() throws Exception {
+        if (started.get()) {
+            if (this.store == null) {
+                this.store = new JobSchedulerStore();
+                this.store.setDirectory(directory);
+                this.store.start();
+            }
+            return this.store;
+        }
+        return null;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=899633&r1=899632&r2=899633&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Fri Jan 15 14:09:30 2010
@@ -24,14 +24,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
-
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotWriteableException;
-
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.filter.PropertyExpression;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.Callback;
@@ -42,8 +41,7 @@
  * @version $Revision:$
  * @openwire:marshaller code="23"
  */
-public class ActiveMQMessage extends Message implements org.apache.activemq.Message {
-
+public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
     private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
 
@@ -54,6 +52,7 @@
     }
 
 
+    @Override
     public Message copy() {
         ActiveMQMessage copy = new ActiveMQMessage();
         copy(copy);
@@ -65,6 +64,7 @@
         copy.acknowledgeCallback = acknowledgeCallback;
     }
 
+    @Override
     public int hashCode() {
         MessageId id = getMessageId();
         if (id != null) {
@@ -74,6 +74,7 @@
         }
     }
 
+    @Override
     public boolean equals(Object o) {
         if (this == o) {
             return true;
@@ -100,6 +101,7 @@
         }
     }
 
+    @Override
     public void clearBody() throws JMSException {
         setContent(null);
         readOnlyBody = false;
@@ -262,6 +264,7 @@
         this.setPriority((byte) priority);
     }
 
+    @Override
     public void clearProperties() {
         super.clearProperties();
         readOnlyProperties = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=899633&r1=899632&r2=899633&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jan 15 14:09:30 2010
@@ -23,7 +23,6 @@
 import java.util.HashMap;
 import java.util.Map;
 import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.Destination;
@@ -173,6 +172,11 @@
         lazyCreateProperties();
         properties.put(name, value);
     }
+    
+    public void removeProperty(String name) throws IOException {
+        lazyCreateProperties();
+        properties.remove(name);
+    }
 
     protected void lazyCreateProperties() throws IOException {
         if (properties == null) {
@@ -583,6 +587,7 @@
         this.memoryUsage=usage;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return true;
     }
@@ -687,6 +692,7 @@
         this.cluster = cluster;
     }
 
+    @Override
     public boolean isMessage() {
         return true;
     }
@@ -717,10 +723,12 @@
         return false;
     }
     
+    @Override
     public String toString() {
         return toString(null);
     }
     
+    @Override
     public String toString(Map<String, Object>overrideFields) {
         try {
             getProperties();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+
+public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
+
+    
+    public void testSchedule() throws Exception {
+        final int COUNT = 1;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time =  5000;
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, time);
+      
+        producer.send(message);
+        producer.close();
+        //make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
+    public void testScheduleRepeated() throws Exception {
+        final int NUMBER = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(NUMBER);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+                count.incrementAndGet();
+            }
+        });
+
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        long time = System.currentTimeMillis() + 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1);
+        producer.send(message);
+        producer.close();
+        assertEquals(latch.getCount(), NUMBER);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+        //wait a little longer - make sure we only get NUMBER of replays
+        Thread.sleep(1000);
+        assertEquals(NUMBER, count.get());
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        super.setUp();
+    }
+    
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.setDataDirectory("target");
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.activemq.util.IOHelper;
+import org.apache.kahadb.util.ByteSequence;
+
+public class JobSchedulerStoreTest extends TestCase {
+
+	public void testRestart() throws Exception {
+		JobSchedulerStore store = new JobSchedulerStore();
+		File directory = new File("target/test/ScheduledDB");
+		  IOHelper.mkdirs(directory);
+	      IOHelper.deleteChildren(directory);
+	      store.setDirectory(directory);
+		final int NUMBER = 1000;
+		store.start();
+		List<ByteSequence>list = new ArrayList<ByteSequence>();
+		for (int i = 0; i < NUMBER;i++ ) {
+            ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes());
+            list.add(buff);     
+        }
+		JobScheduler js = store.getJobScheduler("test");
+		int count = 0;
+		long startTime = System.currentTimeMillis()+10000;
+		for (ByteSequence job:list) {
+		    js.schedule("id:"+(count++), job,startTime,10000,-1);	    
+		}
+		List<ByteSequence>test = js.getNextScheduleJobs();
+		assertEquals(list.size(),test.size());
+		store.stop();
+		
+		store.start();
+		js = store.getJobScheduler("test");
+		test = js.getNextScheduleJobs();
+		assertEquals(list.size(),test.size());
+		for (int i = 0; i < list.size();i++) {
+		    String orig = new String(list.get(i).getData());
+		    String payload = new String(test.get(i).getData());
+		    assertEquals(orig,payload);
+		}
+	}
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=899633&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Fri Jan 15 14:09:30 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.util.IOHelper;
+import org.apache.kahadb.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JobSchedulerTest {
+
+    private JobSchedulerStore store;
+    private JobScheduler scheduler;
+
+    @Test
+    public void testAddLongStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+
+        });
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 10);
+        }
+        latch.await(1, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test
+    public void testAddLongLongIntStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+
+        });
+        long time = System.currentTimeMillis() + 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
+        }
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test
+    public void testAddStopThenDeliver() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        long time = System.currentTimeMillis() + 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
+        }
+        File directory = store.getDirectory();
+        tearDown();
+        startStore(directory);
+        scheduler.addListener(new JobListener() {
+
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+
+        });
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test
+    public void testRemoveLong() throws Exception {
+        final int COUNT = 10;
+
+        long time = System.currentTimeMillis() + 20000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
+
+        }
+        int size = scheduler.getNextScheduleJobs().size();
+        assertEquals(size, COUNT);
+        long removeTime = scheduler.getNextScheduleTime();
+        scheduler.remove(removeTime);
+        size = scheduler.getNextScheduleJobs().size();
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testRemoveString() throws IOException {
+        final int COUNT = 10;
+        final String test = "TESTREMOVE";
+        long time = System.currentTimeMillis() + 20000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
+            if (i == COUNT / 2) {
+                scheduler.schedule(test, new ByteSequence(test.getBytes()), time, 10, -1);
+            }
+        }
+
+        int size = scheduler.getNextScheduleJobs().size();
+        assertEquals(size, COUNT + 1);
+        scheduler.remove(test);
+        size = scheduler.getNextScheduleJobs().size();
+        assertEquals(size, COUNT);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        File directory = new File("target/test/ScheduledDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        startStore(directory);
+
+    }
+
+    protected void startStore(File directory) throws Exception {
+        store = new JobSchedulerStore();
+        store.setDirectory(directory);
+        store.start();
+        scheduler = store.getJobScheduler("test");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        store.stop();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message