Return-Path: Delivered-To: apmail-geronimo-scm-archive@www.apache.org Received: (qmail 73527 invoked from network); 18 Jul 2004 22:11:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 18 Jul 2004 22:11:01 -0000 Received: (qmail 41509 invoked by uid 500); 18 Jul 2004 22:11:00 -0000 Delivered-To: apmail-geronimo-scm-archive@geronimo.apache.org Received: (qmail 41490 invoked by uid 500); 18 Jul 2004 22:10:59 -0000 Mailing-List: contact scm-help@geronimo.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: list-post: Reply-To: dev@geronimo.apache.org Delivered-To: mailing list scm@geronimo.apache.org Received: (qmail 41476 invoked by uid 500); 18 Jul 2004 22:10:59 -0000 Delivered-To: apmail-incubator-geronimo-cvs@apache.org Received: (qmail 41472 invoked by uid 99); 18 Jul 2004 22:10:59 -0000 X-ASF-Spam-Status: No, hits=0.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.27.1) with SMTP; Sun, 18 Jul 2004 15:10:58 -0700 Received: (qmail 73498 invoked by uid 1712); 18 Jul 2004 22:10:57 -0000 Date: 18 Jul 2004 22:10:57 -0000 Message-ID: <20040718221057.73497.qmail@minotaur.apache.org> From: djencks@apache.org To: incubator-geronimo-cvs@apache.org Subject: cvs commit: incubator-geronimo/modules/timer/src/test/org/apache/geronimo/timer/jdbc JDBCWorkerPersistenceTest.java X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N djencks 2004/07/18 15:10:57 Added: modules/timer project.properties project.xml modules/timer/src/java/org/apache/geronimo/timer ExecutorFeedingTimerTask.java ExecutorTask.java ExecutorTaskFactory.java NontransactionalExecutorTask.java NontransactionalExecutorTaskFactory.java PersistenceException.java PersistentTimer.java Playback.java ThreadPooledTimer.java TransactionalExecutorTask.java TransactionalExecutorTaskFactory.java UserTaskFactory.java WorkInfo.java WorkerPersistence.java modules/timer/src/java/org/apache/geronimo/timer/jdbc JDBCStoreThreadPooledNonTransactionalTimer.java JDBCStoreThreadPooledTransactionalTimer.java JDBCWorkerPersistence.java modules/timer/src/java/org/apache/geronimo/timer/vm VMStoreThreadPooledNonTransactionalTimer.java VMStoreThreadPooledTransactionalTimer.java VMWorkerPersistence.java modules/timer/src/test/org/apache/geronimo/timer AbstractThreadPooledTimerTest.java NontransactionalThreadPooledTimerTest.java TransactionalThreadPooledTimerTest.java modules/timer/src/test/org/apache/geronimo/timer/jdbc JDBCWorkerPersistenceTest.java Log: Geronimo support for transactional timer that executes in a thread pool. Revision Changes Path 1.1 incubator-geronimo/modules/timer/project.properties Index: project.properties =================================================================== ## ## $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ ## maven.repo.remote=http://www.apache.org/~djencks/maven, http://www.ibiblio.org/maven 1.1 incubator-geronimo/modules/timer/project.xml Index: project.xml =================================================================== 3 ${basedir}/../../etc/project.xml Geronimo :: Timer geronimo-timer Geronimo Timer Geronimo Timer http://incubator.apache.org/projects/geronimo/timer/ /www/incubator.apache.org/projects/geronimo/timer /www/incubator.apache.org/projects/geronimo/builds/timer org.apache.geronimo.timer geronimo geronimo-connector ${pom.currentVersion} geronimo geronimo-core ${pom.currentVersion} geronimo geronimo-kernel ${pom.currentVersion} geronimo geronimo-transaction ${pom.currentVersion} geronimo-spec geronimo-spec-jta 1.0.1B-rc2 geronimo-spec geronimo-spec-j2ee-connector 1.5-rc2 xstream xstream 1.0.1 xpp3 xpp3 http://www.extreme.indiana.edu/xgws/xsoap/xpp 1.1.3.3 axion axion 1.0-M3-dev true commons-primitives commons-primitives 1.0 true concurrent concurrent 1.3.4 commons-logging commons-logging 1.0.3 http://jakarta.apache.org/commons/logging/ mx4j mx4j 2.0.1 regexp regexp 1.3 http://jakarta.apache.org/regexp tranql tranql 1.0-SNAPSHOT http://tranql.codehaus.org 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/ExecutorFeedingTimerTask.java Index: ExecutorFeedingTimerTask.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import java.util.TimerTask; import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.Synchronization; import javax.transaction.Status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class ExecutorFeedingTimerTask extends TimerTask { private static final Log log = LogFactory.getLog(ExecutorFeedingTimerTask.class); private final WorkInfo workInfo; private final ThreadPooledTimer threadPooledTimer; public ExecutorFeedingTimerTask(WorkInfo workInfo, ThreadPooledTimer threadPooledTimer) { this.workInfo = workInfo; this.threadPooledTimer = threadPooledTimer; } public void run() { try { threadPooledTimer.getExecutor().execute(workInfo.getExecutorTask()); } catch (InterruptedException e) { log.warn(e); } } public boolean cancel() { try { threadPooledTimer.getWorkerPersistence().cancel(workInfo.getId()); } catch (PersistenceException e) { log.warn(e); } try { threadPooledTimer.registerSynchronization(new CancelSynchronization(this)); } catch (RollbackException e) { log.info(e); throw (IllegalStateException) new IllegalStateException("RollbackException when trying to register cacel synchronization").initCause(e); } catch (SystemException e) { log.info(e); throw (IllegalStateException) new IllegalStateException("SystemException when trying to register cacel synchronization").initCause(e); } //return value is meaningless. return true; } private void doCancel() { super.cancel(); } private static class CancelSynchronization implements Synchronization { private final ExecutorFeedingTimerTask worker; public CancelSynchronization(ExecutorFeedingTimerTask worker) { this.worker = worker; } public void beforeCompletion() { } public void afterCompletion(int status) { if (status == Status.STATUS_COMMITTED) { worker.doCancel(); } } } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/ExecutorTask.java Index: ExecutorTask.java =================================================================== package org.apache.geronimo.timer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface ExecutorTask extends Runnable { } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/ExecutorTaskFactory.java Index: ExecutorTaskFactory.java =================================================================== package org.apache.geronimo.timer; import java.util.Date; import org.apache.geronimo.timer.ExecutorTask; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface ExecutorTaskFactory { //TODO make the WorkerPersistence smarter so the oneTime and atFixedRate parameters are not needed. // This could be done by eg. using a stored procedure for update/delete. ExecutorTask createExecutorTask(Runnable userTask, WorkInfo workInfo, ThreadPooledTimer threadPooledTimer); } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/NontransactionalExecutorTask.java Index: NontransactionalExecutorTask.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.geronimo.timer.ExecutorTask; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class NontransactionalExecutorTask implements ExecutorTask { private static final Log log = LogFactory.getLog(NontransactionalExecutorTask.class); private final Runnable userTask; private final WorkInfo workInfo; private final ThreadPooledTimer threadPooledTimer; public NontransactionalExecutorTask(Runnable userTask, WorkInfo workInfo, ThreadPooledTimer threadPooledTimer) { this.userTask = userTask; this.workInfo = workInfo; this.threadPooledTimer = threadPooledTimer; } public void run() { try { userTask.run(); } catch (Exception e) { log.info(e); } try { threadPooledTimer.workPerformed(workInfo); } catch (PersistenceException e) { log.info(e); } if (workInfo.isOneTime()) { threadPooledTimer.removeWorkInfo(workInfo); } } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/NontransactionalExecutorTaskFactory.java Index: NontransactionalExecutorTaskFactory.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import org.apache.geronimo.timer.ExecutorTask; import org.apache.geronimo.timer.ExecutorTaskFactory; import org.apache.geronimo.timer.NontransactionalExecutorTask; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class NontransactionalExecutorTaskFactory implements ExecutorTaskFactory { public ExecutorTask createExecutorTask(Runnable userTask, WorkInfo workInfo, ThreadPooledTimer threadPooledTimer) { return new NontransactionalExecutorTask(userTask, workInfo, threadPooledTimer); } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/PersistenceException.java Index: PersistenceException.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class PersistenceException extends Exception { public PersistenceException() { super(); } public PersistenceException(String message) { super(message); } public PersistenceException(String message, Throwable cause) { super(message, cause); } public PersistenceException(Throwable cause) { super(cause); } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/PersistentTimer.java Index: PersistentTimer.java =================================================================== package org.apache.geronimo.timer; import java.util.Date; import java.util.Collection; import javax.transaction.RollbackException; import javax.transaction.SystemException; import org.apache.geronimo.timer.PersistenceException; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface PersistentTimer { WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException; WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException; WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException; WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException; WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException; WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException; Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException; Collection getIdsByKey(String key) throws PersistenceException; WorkInfo getWorkInfo(Long id); } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/Playback.java Index: Playback.java =================================================================== package org.apache.geronimo.timer; import java.util.Date; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface Playback { void schedule(WorkInfo workInfo); } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/ThreadPooledTimer.java Index: ThreadPooledTimer.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Timer; import javax.transaction.RollbackException; import javax.transaction.Status; import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.Transaction; import EDU.oswego.cs.dl.util.concurrent.Executor; import org.apache.geronimo.gbean.GBeanLifecycle; import org.apache.geronimo.gbean.WaitingException; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.transaction.context.TransactionContext; import org.apache.geronimo.timer.ExecutorFeedingTimerTask; import org.apache.geronimo.timer.ExecutorTask; import org.apache.geronimo.timer.ExecutorTaskFactory; import org.apache.geronimo.timer.PersistenceException; import org.apache.geronimo.timer.PersistentTimer; import org.apache.geronimo.timer.Playback; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle { private final ExecutorTaskFactory executorTaskFactory; private final WorkerPersistence workerPersistence; private final Executor executor; private Timer delegate; private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap()); //default constructor for use as reference endpoint. public ThreadPooledTimer() { this(null, null, null); } public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor) { this.executorTaskFactory = executorTaskFactory; this.workerPersistence = workerPersistence; this.executor = executor; } public void doStart() throws WaitingException, Exception { delegate = new Timer(true); } public void doStop() { if (delegate != null) { delegate.cancel(); delegate = null; } } public void doFail() { doStop(); } public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException { Date time = new Date(System.currentTimeMillis() + delay); return schedule(key, userTaskFactory, userInfo, time); } public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException { WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, false, userInfo, time, null); registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time)); addWorkInfo(worker); return worker; } public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException { Date time = new Date(System.currentTimeMillis() + delay); return schedule(key, userTaskFactory, userInfo, time, period); } public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException { WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, false, userInfo, firstTime, new Long(period)); registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); addWorkInfo(worker); return worker; } public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException { Date time = new Date(System.currentTimeMillis() + delay); return scheduleAtFixedRate(key, userTaskFactory, userInfo, time, period); } public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException { WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, true, userInfo, firstTime, new Long(period)); registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); addWorkInfo(worker); return worker; } public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException { PlaybackImpl playback = new PlaybackImpl(userTaskFactory); workerPersistence.playback(key, playback); return playback.getWorkInfos(); } public Collection getIdsByKey(String key) throws PersistenceException { return workerPersistence.getIdsByKey(key); } public WorkInfo getWorkInfo(Long id) { return (WorkInfo) idToWorkInfoMap.get(id); } private void addWorkInfo(WorkInfo worker) { idToWorkInfoMap.put(new Long(worker.getId()), worker); } void removeWorkInfo(WorkInfo workInfo) { idToWorkInfoMap.remove(new Long(workInfo.getId())); } void workPerformed(WorkInfo workInfo) throws PersistenceException { if (workInfo.isOneTime()) { workerPersistence.cancel(workInfo.getId()); } else if (workInfo.getAtFixedRate()) { workerPersistence.fixedRateWorkPerformed(workInfo.getId()); workInfo.nextTime(); } else { workInfo.nextInterval(); //TODO this is wrong, need different update. workerPersistence.fixedRateWorkPerformed(workInfo.getId()); } } private Timer getTimer() { if (delegate == null) { throw new IllegalStateException("Timer is stopped"); } return delegate; } WorkerPersistence getWorkerPersistence() { return workerPersistence; } Executor getExecutor() { return executor; } private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, boolean atFixedRate, Object userInfo, Date time, Long period) throws PersistenceException { WorkInfo workInfo = new WorkInfo(key, userInfo, time, period, atFixedRate); //save and assign id workerPersistence.save(workInfo); Runnable userTask = userTaskFactory.newTask(workInfo.getId()); ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this); ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this); workInfo.initialize(worker, executorTask); return workInfo; } void registerSynchronization(Synchronization sync) throws RollbackException, SystemException { TransactionContext transactionContext = TransactionContext.getContext(); Transaction transaction = transactionContext == null ? null : transactionContext.getTransaction(); if (transaction == null) { sync.beforeCompletion(); sync.afterCompletion(Status.STATUS_COMMITTED); } else { assert transactionContext.isActive(): "Trying to register a sync on an inactive transaction context"; transaction.registerSynchronization(sync); } } private class ScheduleSynchronization implements Synchronization { private final ExecutorFeedingTimerTask worker; private final Date time; public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) { this.worker = worker; this.time = time; } public void beforeCompletion() { } public void afterCompletion(int status) { if (status == Status.STATUS_COMMITTED) { getTimer().schedule(worker, time); } } } private class ScheduleRepeatedSynchronization implements Synchronization { private final ExecutorFeedingTimerTask worker; private final Date time; private final long period; public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { this.worker = worker; this.time = time; this.period = period; } public void beforeCompletion() { } public void afterCompletion(int status) { if (status == Status.STATUS_COMMITTED) { getTimer().schedule(worker, time, period); } } } private class ScheduleAtFixedRateSynchronization implements Synchronization { private final ExecutorFeedingTimerTask worker; private final Date time; private final long period; public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { this.worker = worker; this.time = time; this.period = period; } public void beforeCompletion() { } public void afterCompletion(int status) { if (status == Status.STATUS_COMMITTED) { getTimer().scheduleAtFixedRate(worker, time, period); } } } private class PlaybackImpl implements Playback { private final UserTaskFactory userTaskFactory; private final Collection workInfos = new ArrayList(); public PlaybackImpl(UserTaskFactory userTaskFactory) { this.userTaskFactory = userTaskFactory; } public void schedule(WorkInfo workInfo) { Runnable userTask = userTaskFactory.newTask(workInfo.getId()); ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this); ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this); workInfo.initialize(worker, executorTask); if (workInfo.getPeriod() == null) { getTimer().schedule(worker, workInfo.getTime()); } else if (!workInfo.getAtFixedRate()) { getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); } else { getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); } addWorkInfo(workInfo); workInfos.add(workInfo); } public Collection getWorkInfos() { return workInfos; } } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/TransactionalExecutorTask.java Index: TransactionalExecutorTask.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.geronimo.transaction.context.ContainerTransactionContext; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.ExecutorTask; import org.apache.geronimo.timer.PersistenceException; import org.apache.geronimo.timer.ThreadPooledTimer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class TransactionalExecutorTask implements ExecutorTask { private static final Log log = LogFactory.getLog(TransactionalExecutorTask.class); private final Runnable userTask; private final WorkInfo workInfo; private final ThreadPooledTimer threadPooledTimer; private final TransactionContextManager transactionContextManager; private final int repeatCount; public TransactionalExecutorTask(Runnable userTask, WorkInfo workInfo, ThreadPooledTimer threadPooledTimer, TransactionContextManager transactionContextManager, int repeatCount) { this.userTask = userTask; this.workInfo = workInfo; this.threadPooledTimer = threadPooledTimer; this.transactionContextManager = transactionContextManager; this.repeatCount = repeatCount; } public void run() { ContainerTransactionContext transactionContext = null; for (int tries = 0; tries < repeatCount; tries++) { try { transactionContext = transactionContextManager.newContainerTransactionContext(); } catch (NotSupportedException e) { log.info(e); break; } catch (SystemException e) { log.info(e); break; } try { try { userTask.run(); } catch (Exception e) { log.info(e); } try { threadPooledTimer.workPerformed(workInfo); } catch (PersistenceException e) { log.info(e); } } finally { try { if (transactionContext.getRollbackOnly()) { transactionContext.rollback(); } else { transactionContext.commit(); if (workInfo.isOneTime()) { threadPooledTimer.removeWorkInfo(workInfo); } return; } } catch (SystemException e) { log.info(e); } catch (HeuristicMixedException e) { log.info(e); } catch (HeuristicRollbackException e) { log.info(e); } catch (RollbackException e) { log.info(e); } } } if (workInfo.isOneTime()) { threadPooledTimer.removeWorkInfo(workInfo); } log.warn("Failed to execute work successfully"); } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/TransactionalExecutorTaskFactory.java Index: TransactionalExecutorTaskFactory.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.ExecutorTask; import org.apache.geronimo.timer.ExecutorTaskFactory; import org.apache.geronimo.timer.ThreadPooledTimer; import org.apache.geronimo.timer.TransactionalExecutorTask; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class TransactionalExecutorTaskFactory implements ExecutorTaskFactory { private final TransactionContextManager transactionContextManager; private int repeatCount; public TransactionalExecutorTaskFactory(TransactionContextManager transactionContextManager, int repeatCount) { this.transactionContextManager = transactionContextManager; this.repeatCount = repeatCount; } public TransactionContextManager getTransactionContextManager() { return transactionContextManager; } public int getRepeatCount() { return repeatCount; } public ExecutorTask createExecutorTask(Runnable userTask, WorkInfo workInfo, ThreadPooledTimer threadPooledTimer) { return new TransactionalExecutorTask(userTask, workInfo, threadPooledTimer, transactionContextManager, repeatCount); } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/UserTaskFactory.java Index: UserTaskFactory.java =================================================================== package org.apache.geronimo.timer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface UserTaskFactory { Runnable newTask(long id); } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/WorkInfo.java Index: WorkInfo.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import java.util.Date; import org.apache.geronimo.timer.ExecutorFeedingTimerTask; import org.apache.geronimo.timer.ExecutorTask; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public class WorkInfo { //these should be persistent. private final String key; private long id = -1; private final Object userInfo; private Date time; private final Long period; private final boolean atFixedRate; //these should not be persistent. private ExecutorFeedingTimerTask worker; private ExecutorTask taskWrapper; private Object clientHandle; public WorkInfo(String key, Object userInfo, Date time, Long period, boolean atFixedRate) { this.key = key; this.userInfo = userInfo; this.time = time; this.period = period; this.atFixedRate = atFixedRate; } public String getKey() { return key; } public long getId() { return id; } public void setId(long id) { if (this.id != -1) { throw new IllegalStateException("Id can be set only once!"); } this.id = id; } public Object getUserInfo() { return userInfo; } public Date getTime() { return time; } public Long getPeriod() { return period; } public boolean getAtFixedRate() { return atFixedRate; } public void initialize(ExecutorFeedingTimerTask worker, ExecutorTask taskWrapper) { this.worker = worker; this.taskWrapper = taskWrapper; } public ExecutorFeedingTimerTask getExecutorFeedingTimerTask() { return worker; } public Runnable getExecutorTask() { return taskWrapper; } public Object getClientHandle() { return clientHandle; } public void setClientHandle(Object clientHandle) { this.clientHandle = clientHandle; } public boolean isOneTime() { return period == null; } void nextTime() { if (period == null) { throw new IllegalStateException("This is a one-time timerTask"); } time = new Date(time.getTime() + period.longValue()); } public void nextInterval() { if (period == null) { throw new IllegalStateException("This is a one-time timerTask"); } time = new Date(System.currentTimeMillis() + period.longValue()); } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/WorkerPersistence.java Index: WorkerPersistence.java =================================================================== package org.apache.geronimo.timer; import java.util.Date; import java.util.Collection; import org.apache.geronimo.timer.PersistenceException; import org.apache.geronimo.timer.Playback; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:56 $ * * */ public interface WorkerPersistence { void save(WorkInfo workInfo) throws PersistenceException; void cancel(long id) throws PersistenceException; void playback(String key, Playback playback) throws PersistenceException; void fixedRateWorkPerformed(long id) throws PersistenceException; void intervalWorkPerformed(long id, long period) throws PersistenceException; Collection getIdsByKey(String key) throws PersistenceException; } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/jdbc/JDBCStoreThreadPooledNonTransactionalTimer.java Index: JDBCStoreThreadPooledNonTransactionalTimer.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.jdbc; import EDU.oswego.cs.dl.util.concurrent.Executor; import org.apache.geronimo.connector.outbound.ManagedConnectionFactoryWrapper; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.kernel.Kernel; import org.apache.geronimo.timer.NontransactionalExecutorTaskFactory; import org.apache.geronimo.timer.PersistentTimer; import org.apache.geronimo.timer.ThreadPooledTimer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class JDBCStoreThreadPooledNonTransactionalTimer extends ThreadPooledTimer { public JDBCStoreThreadPooledNonTransactionalTimer(ManagedConnectionFactoryWrapper managedConnectionFactoryWrapper, Executor threadPool, Kernel kernel) { super(new NontransactionalExecutorTaskFactory(), new JDBCWorkerPersistence(kernel, managedConnectionFactoryWrapper), threadPool); } public static final GBeanInfo GBEAN_INFO; static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(JDBCStoreThreadPooledNonTransactionalTimer.class); infoFactory.addInterface(PersistentTimer.class); infoFactory.addReference("ManagedConnectionFactoryWrapper", ManagedConnectionFactoryWrapper.class); infoFactory.addReference("ThreadPool", Executor.class); infoFactory.addAttribute("kernel", Kernel.class, false); infoFactory.setConstructor(new String[] {"ManagedConnectionFactoryWrapper", "ThreadPool", "kernel"}); GBEAN_INFO = infoFactory.getBeanInfo(); } public static GBeanInfo getGBeanInfo() { return GBEAN_INFO; } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/jdbc/JDBCStoreThreadPooledTransactionalTimer.java Index: JDBCStoreThreadPooledTransactionalTimer.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.jdbc; import EDU.oswego.cs.dl.util.concurrent.Executor; import org.apache.geronimo.connector.outbound.ManagedConnectionFactoryWrapper; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.kernel.Kernel; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.PersistentTimer; import org.apache.geronimo.timer.ThreadPooledTimer; import org.apache.geronimo.timer.TransactionalExecutorTaskFactory; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class JDBCStoreThreadPooledTransactionalTimer extends ThreadPooledTimer { public JDBCStoreThreadPooledTransactionalTimer(int repeatCount, TransactionContextManager transactionContextManager, ManagedConnectionFactoryWrapper managedConnectionFactoryWrapper, Executor threadPool, Kernel kernel) { super(new TransactionalExecutorTaskFactory(transactionContextManager, repeatCount), new JDBCWorkerPersistence(kernel, managedConnectionFactoryWrapper), threadPool); } public static final GBeanInfo GBEAN_INFO; static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(JDBCStoreThreadPooledTransactionalTimer.class); infoFactory.addInterface(PersistentTimer.class); infoFactory.addAttribute("repeatCount", int.class, true); infoFactory.addReference("TransactionContextManager", TransactionContextManager.class); infoFactory.addReference("ManagedConnectionFactoryWrapper", ManagedConnectionFactoryWrapper.class); infoFactory.addReference("ThreadPool", Executor.class); infoFactory.addAttribute("kernel", Kernel.class, false); infoFactory.setConstructor(new String[] {"repeatCount", "TransactionContextManager", "ManagedConnectionFactoryWrapper", "ThreadPool", "kernel"}); GBEAN_INFO = infoFactory.getBeanInfo(); } public static GBeanInfo getGBeanInfo() { return GBEAN_INFO; } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/jdbc/JDBCWorkerPersistence.java Index: JDBCWorkerPersistence.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.jdbc; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.Date; import java.util.Collection; import java.util.ArrayList; import javax.sql.DataSource; import com.thoughtworks.xstream.XStream; import org.apache.geronimo.connector.outbound.ManagedConnectionFactoryWrapper; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.gbean.GBeanLifecycle; import org.apache.geronimo.gbean.WaitingException; import org.apache.geronimo.kernel.Kernel; import org.apache.geronimo.timer.PersistenceException; import org.apache.geronimo.timer.Playback; import org.apache.geronimo.timer.WorkInfo; import org.apache.geronimo.timer.WorkerPersistence; /** * TODO use an insert returning or stored procedure to insert. * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class JDBCWorkerPersistence implements WorkerPersistence, GBeanLifecycle { private static final String createSequenceSQL = "create sequence timertasks_seq"; private static final String createTableSQL = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)"; private static final String sequenceSQL = "select timertasks_seq.nextval"; private static final String insertSQL = "insert into timertasks (id, serverid, timerkey, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)"; private static final String deleteSQL = "delete from timertasks where id=?"; private static final String selectSQL = "select id, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?"; private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?"; private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?"; private static final String selectByKeySQL = "select id from timertasks where key = ?"; private final String serverUniqueId; private final ManagedConnectionFactoryWrapper managedConnectionFactoryWrapper; private DataSource dataSource; public JDBCWorkerPersistence(Kernel kernel, ManagedConnectionFactoryWrapper managedConnectionFactoryWrapper) { assert managedConnectionFactoryWrapper != null; //TODO construct a unique name. this.serverUniqueId = kernel.getKernelName(); this.managedConnectionFactoryWrapper = managedConnectionFactoryWrapper; } protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource) { this.serverUniqueId = serverUniqueId; this.managedConnectionFactoryWrapper = null; this.dataSource = datasource; } public ManagedConnectionFactoryWrapper getManagedConnectionFactoryWrapper() { return managedConnectionFactoryWrapper; } public void save(WorkInfo workInfo) throws PersistenceException { try { Connection c = dataSource.getConnection(); try { long id; PreparedStatement seqStatement = c.prepareStatement(sequenceSQL); try { ResultSet seqRS = seqStatement.executeQuery(); try { seqRS.next(); id = seqRS.getLong(1); } finally { seqRS.close(); } } finally { seqStatement.close(); } workInfo.setId(id); PreparedStatement insertStatement = c.prepareStatement(insertSQL); try { String serializedUserKey = serialize(workInfo.getUserInfo()); insertStatement.setLong(1, id); insertStatement.setString(2, serverUniqueId); insertStatement.setString(3, workInfo.getKey()); insertStatement.setString(4, serializedUserKey); insertStatement.setLong(5, workInfo.getTime().getTime()); if (workInfo.getPeriod() == null) { insertStatement.setNull(6, Types.NUMERIC); } else { insertStatement.setLong(6, workInfo.getPeriod().longValue()); } insertStatement.setBoolean(7, workInfo.getAtFixedRate()); int result = insertStatement.executeUpdate(); if (result != 1) { throw new PersistenceException("Could not insert!"); } } finally { insertStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } } public void cancel(long id) throws PersistenceException { try { Connection c = dataSource.getConnection(); try { PreparedStatement deleteStatement = c.prepareStatement(deleteSQL); try { deleteStatement.setLong(1, id); deleteStatement.execute(); } finally { deleteStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } } public void playback(String key, Playback playback) throws PersistenceException { try { Connection c = dataSource.getConnection(); try { PreparedStatement selectStatement = c.prepareStatement(selectSQL); selectStatement.setString(1, serverUniqueId); selectStatement.setString(2, key); try { ResultSet taskRS = selectStatement.executeQuery(); try { while (taskRS.next()) { long id = taskRS.getLong(1); String serializedUserInfo = taskRS.getString(2); Object userInfo = deserialize(serializedUserInfo); long timeMillis = taskRS.getLong(3); Date time = new Date(timeMillis); Long period = null; period = new Long(taskRS.getLong(4)); if (!taskRS.wasNull()) { period = null; } boolean atFixedRate = taskRS.getBoolean(5); //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence. WorkInfo workInfo = new WorkInfo(key, userInfo, time, period, atFixedRate); workInfo.setId(id); playback.schedule(workInfo); } } finally { taskRS.close(); } } finally { selectStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } } public void fixedRateWorkPerformed(long id) throws PersistenceException { try { Connection c = dataSource.getConnection(); try { PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL); try { updateStatement.setLong(1, id); updateStatement.execute(); } finally { updateStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } } public void intervalWorkPerformed(long id, long period) throws PersistenceException { long next = System.currentTimeMillis() + period; try { Connection c = dataSource.getConnection(); try { PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL); try { updateStatement.setLong(1, next); updateStatement.setLong(2, id); updateStatement.execute(); } finally { updateStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } } public Collection getIdsByKey(String key) throws PersistenceException { Collection ids = new ArrayList(); try { Connection c = dataSource.getConnection(); try { PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL); selectStatement.setString(1, serverUniqueId); selectStatement.setString(2, key); try { ResultSet taskRS = selectStatement.executeQuery(); try { while (taskRS.next()) { long id = taskRS.getLong(1); ids.add(new Long(id)); } } finally { taskRS.close(); } } finally { selectStatement.close(); } } finally { c.close(); } } catch (SQLException e) { throw new PersistenceException(e); } return ids; } private String serialize(Object task) { XStream xStream = new XStream(); return xStream.toXML(task); } private Object deserialize(String serializedRunnable) { XStream xStream = new XStream(); return xStream.fromXML(serializedRunnable); } public void doStart() throws WaitingException, Exception { if (managedConnectionFactoryWrapper != null) { dataSource = (DataSource) managedConnectionFactoryWrapper.getProxy(); } if (createSequenceSQL != null && !createSequenceSQL.equals("")) { execSQL(createSequenceSQL); } if (createTableSQL != null && !createTableSQL.equals("")) { execSQL(createTableSQL); } } public void doStop() throws WaitingException, Exception { dataSource = null; } public void doFail() { dataSource = null; } private void execSQL(String sql) throws SQLException { Connection c = dataSource.getConnection(); try { PreparedStatement updateStatement = c.prepareStatement(sql); try { updateStatement.execute(); } catch (SQLException e) { //ignore... table already exists. } finally { updateStatement.close(); } } finally { c.close(); } } public static final GBeanInfo GBEAN_INFO; static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(JDBCWorkerPersistence.class); infoFactory.addAttribute("sequenceSQL", String.class, true); infoFactory.addAttribute("insertSQL", String.class, true); infoFactory.addAttribute("deleteSQL", String.class, true); infoFactory.addAttribute("fixedRateUpdateSQL", String.class, true); infoFactory.addAttribute("selectSQL", String.class, true); infoFactory.addAttribute("createSequenceSQL", String.class, true); infoFactory.addAttribute("createTableSQL", String.class, true); infoFactory.addAttribute("kernel", Kernel.class, false); infoFactory.addReference("managedConnectionFactoryWrapper", ManagedConnectionFactoryWrapper.class); infoFactory.setConstructor(new String[]{"kernel", "managedConnectionFactoryWrapper", "sequenceSQL", "insertSQL", "deleteSQL", "fixedRateUpdateSQL", "selectSQL"}); GBEAN_INFO = infoFactory.getBeanInfo(); } public static GBeanInfo getGBeanInfo() { return GBEAN_INFO; } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/vm/VMStoreThreadPooledNonTransactionalTimer.java Index: VMStoreThreadPooledNonTransactionalTimer.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.vm; import EDU.oswego.cs.dl.util.concurrent.Executor; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.timer.NontransactionalExecutorTaskFactory; import org.apache.geronimo.timer.PersistentTimer; import org.apache.geronimo.timer.ThreadPooledTimer; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class VMStoreThreadPooledNonTransactionalTimer extends ThreadPooledTimer { public VMStoreThreadPooledNonTransactionalTimer(Executor threadPool) { super(new NontransactionalExecutorTaskFactory(), new VMWorkerPersistence(), threadPool); } public static final GBeanInfo GBEAN_INFO; static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(VMStoreThreadPooledNonTransactionalTimer.class); infoFactory.addInterface(PersistentTimer.class); infoFactory.addReference("ThreadPool", Executor.class); infoFactory.setConstructor(new String[] {"ThreadPool"}); GBEAN_INFO = infoFactory.getBeanInfo(); } public static GBeanInfo getGBeanInfo() { return GBEAN_INFO; } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/vm/VMStoreThreadPooledTransactionalTimer.java Index: VMStoreThreadPooledTransactionalTimer.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.vm; import EDU.oswego.cs.dl.util.concurrent.Executor; import org.apache.geronimo.connector.outbound.ManagedConnectionFactoryWrapper; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.kernel.Kernel; import org.apache.geronimo.timer.PersistentTimer; import org.apache.geronimo.timer.ThreadPooledTimer; import org.apache.geronimo.timer.TransactionalExecutorTaskFactory; import org.apache.geronimo.transaction.context.TransactionContextManager; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class VMStoreThreadPooledTransactionalTimer extends ThreadPooledTimer { public VMStoreThreadPooledTransactionalTimer(int repeatCount, TransactionContextManager transactionContextManager, Executor threadPool) { super(new TransactionalExecutorTaskFactory(transactionContextManager, repeatCount), new VMWorkerPersistence(), threadPool); } public static final GBeanInfo GBEAN_INFO; static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(VMStoreThreadPooledTransactionalTimer.class); infoFactory.addInterface(PersistentTimer.class); infoFactory.addAttribute("repeatCount", int.class, true); infoFactory.addReference("TransactionContextManager", TransactionContextManager.class); infoFactory.addReference("ThreadPool", Executor.class); infoFactory.setConstructor(new String[] {"repeatCount", "TransactionContextManager", "ThreadPool"}); GBEAN_INFO = infoFactory.getBeanInfo(); } public static GBeanInfo getGBeanInfo() { return GBEAN_INFO; } } 1.1 incubator-geronimo/modules/timer/src/java/org/apache/geronimo/timer/vm/VMWorkerPersistence.java Index: VMWorkerPersistence.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.vm; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Collection; import java.util.ArrayList; import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; import org.apache.geronimo.timer.PersistenceException; import org.apache.geronimo.timer.Playback; import org.apache.geronimo.timer.WorkInfo; import org.apache.geronimo.timer.WorkerPersistence; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class VMWorkerPersistence implements WorkerPersistence { private final Map tasks = Collections.synchronizedMap(new LinkedHashMap()); private final SynchronizedLong counter = new SynchronizedLong(0); public void save(WorkInfo workInfo) throws PersistenceException { long id = counter.increment(); workInfo.setId(id); tasks.put(new Long(id), workInfo); } public void cancel(long id) throws PersistenceException { tasks.remove(new Long(id)); } public void playback(String key, Playback playback) throws PersistenceException { synchronized (tasks) { for (Iterator iterator = tasks.entrySet().iterator(); iterator.hasNext();) { Map.Entry entry = (Map.Entry) iterator.next(); WorkInfo workInfo = (WorkInfo) entry.getValue(); playback.schedule(workInfo); } } } public void fixedRateWorkPerformed(long id) throws PersistenceException { //don't do anything, we are sharing the object with NonTransactionalWork, which is incrementing the time itself. // Long key = new Long(id); // synchronized (tasks) { // WorkInfo task = (WorkInfo) tasks.get(key); // task.nextTime(); // //see if task was cancelled while we executed. // if (task != null) { // tasks.put(key, TaskWrapper.nextTask(task)); // } // } } public void intervalWorkPerformed(long id, long period) throws PersistenceException { //dont do anything... sharing data with WorkInfo. } public Collection getIdsByKey(String key) throws PersistenceException { Collection ids = new ArrayList(); for (Iterator iterator = tasks.values().iterator(); iterator.hasNext();) { WorkInfo workInfo = (WorkInfo) iterator.next(); if (key.equals(workInfo.getKey())) { ids.add(new Long(workInfo.getId())); } } return ids; } } 1.1 incubator-geronimo/modules/timer/src/test/org/apache/geronimo/timer/AbstractThreadPooledTimerTest.java Index: AbstractThreadPooledTimerTest.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; import junit.framework.TestCase; import org.apache.geronimo.pool.ThreadPool; import org.apache.geronimo.transaction.context.TransactionContext; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.vm.VMWorkerPersistence; import org.apache.geronimo.timer.ExecutorTaskFactory; import org.apache.geronimo.timer.ThreadPooledTimer; import org.apache.geronimo.timer.UserTaskFactory; import org.apache.geronimo.timer.WorkerPersistence; import org.apache.geronimo.timer.WorkInfo; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public abstract class AbstractThreadPooledTimerTest extends TestCase { private static final int COUNT = 20; //should run with much higher counts, but fails sometimes on slow hardware. private static final long DELAY = 1000; private static final long SLOP = 200; private static final String key = "testThreadPooledTimer"; private ThreadPool threadPool; private ThreadPooledTimer timer; private SynchronizedInt counter = new SynchronizedInt(0); protected TransactionContextManager transactionContextManager; protected ExecutorTaskFactory executableWorkFactory; protected UserTaskFactory userTaskFactory; private Object userKey = "test user info"; protected void setUp() throws Exception { userTaskFactory = new MockUserTaskFactory(); threadPool = new ThreadPool(); threadPool.setPoolSize(30); threadPool.setKeepAliveTime(10000); threadPool.setPoolName("TestPool"); threadPool.doStart(); WorkerPersistence workerPersistence = new VMWorkerPersistence(); timer = new ThreadPooledTimer(executableWorkFactory, workerPersistence, threadPool); timer.doStart(); counter.set(0); TransactionContext.setContext(null); } protected void tearDown() throws Exception { timer.doStop(); threadPool.doStop(); } public void testTasks() throws Exception { for (long i = 0; i < COUNT; i++) { timer.schedule(userTaskFactory, key, userKey, i); } Thread.currentThread().sleep(COUNT + SLOP); assertEquals(COUNT, counter.get()); } public void testCancel() throws Exception { WorkInfo[] workInfos = new WorkInfo[COUNT]; for (long i = 0; i < COUNT; i++) { workInfos[(int) i] = timer.schedule(userTaskFactory, key, userKey, DELAY); } for (int i = 0; i < workInfos.length; i++) { workInfos[i].getExecutorFeedingTimerTask().cancel(); } Thread.currentThread().sleep(SLOP + DELAY); assertEquals(0, counter.get()); } public void testPersistence() throws Exception { for (long i = 0; i < COUNT; i++) { timer.schedule(userTaskFactory, key, userKey, DELAY); } timer.doStop(); assertEquals(0, counter.get()); timer.doStart(); timer.playback(key, userTaskFactory); Thread.currentThread().sleep(2 * SLOP + DELAY); assertEquals(COUNT, counter.get()); } public void testTasksInUnspecifiedTxContext() throws Exception { transactionContextManager.newUnspecifiedTransactionContext(); try { testTasks(); } finally { transactionContextManager.setContext(null); } } public void testCancelInUnspecifiedTxContext() throws Exception { transactionContextManager.newUnspecifiedTransactionContext(); try { testCancel(); } finally { transactionContextManager.setContext(null); } } public void testPersistenceInUnspecifiedTxContext() throws Exception { transactionContextManager.newUnspecifiedTransactionContext(); try { testPersistence(); } finally { transactionContextManager.setContext(null); } } public void testTasksInTransaction() throws Exception { TransactionContext transactionContext = transactionContextManager.newContainerTransactionContext(); for (long i = 0; i < COUNT; i++) { timer.schedule(userTaskFactory, key, userKey, i); } Thread.currentThread().sleep(COUNT + SLOP); assertEquals(0, counter.get()); transactionContext.commit(); Thread.currentThread().sleep(COUNT + SLOP); assertEquals(COUNT, counter.get()); } public void testCancelInTransaction() throws Exception { Thread.currentThread().sleep(SLOP + DELAY); WorkInfo[] workInfos = new WorkInfo[COUNT]; for (long i = 0; i < COUNT; i++) { workInfos[(int) i] = timer.scheduleAtFixedRate(key, userTaskFactory, userKey, DELAY, DELAY); } Thread.currentThread().sleep(SLOP + DELAY); assertEquals(COUNT, counter.get()); TransactionContext transactionContext = transactionContextManager.newContainerTransactionContext(); for (int i = 0; i < workInfos.length; i++) { workInfos[i].getExecutorFeedingTimerTask().cancel(); } Thread.currentThread().sleep(SLOP + DELAY); assertEquals(2 * COUNT, counter.get()); transactionContext.commit(); Thread.currentThread().sleep(SLOP + DELAY); assertEquals(2 * COUNT, counter.get()); } public void testRepeatCountFromPersisted() throws Exception { assert DELAY > 2 * SLOP; timer.scheduleAtFixedRate(key, userTaskFactory, userKey, 0L, DELAY); Thread.currentThread().sleep(4 * DELAY + SLOP); timer.doStop(); assertEquals(5, counter.get()); timer.doStart(); timer.playback(key, userTaskFactory); Thread.currentThread().sleep(5 * DELAY + SLOP); assertEquals(2 * 5, counter.get()); } private class MockUserTaskFactory implements UserTaskFactory { public Runnable newTask(long id) { return new Runnable() { public void run() { counter.increment(); } }; } } } 1.1 incubator-geronimo/modules/timer/src/test/org/apache/geronimo/timer/NontransactionalThreadPooledTimerTest.java Index: NontransactionalThreadPooledTimerTest.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import javax.transaction.TransactionManager; import org.apache.geronimo.transaction.manager.TransactionManagerImpl; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.NontransactionalExecutorTaskFactory; import org.apache.geronimo.timer.AbstractThreadPooledTimerTest; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class NontransactionalThreadPooledTimerTest extends AbstractThreadPooledTimerTest { protected void setUp() throws Exception { TransactionManager transactionManager = new TransactionManagerImpl(); transactionContextManager = new TransactionContextManager(transactionManager); executableWorkFactory = new NontransactionalExecutorTaskFactory(); super.setUp(); } } 1.1 incubator-geronimo/modules/timer/src/test/org/apache/geronimo/timer/TransactionalThreadPooledTimerTest.java Index: TransactionalThreadPooledTimerTest.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer; import javax.transaction.TransactionManager; import org.apache.geronimo.transaction.manager.TransactionManagerImpl; import org.apache.geronimo.transaction.context.TransactionContextManager; import org.apache.geronimo.timer.TransactionalExecutorTaskFactory; import org.apache.geronimo.timer.AbstractThreadPooledTimerTest; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class TransactionalThreadPooledTimerTest extends AbstractThreadPooledTimerTest { protected void setUp() throws Exception { TransactionManager transactionManager = new TransactionManagerImpl(); transactionContextManager = new TransactionContextManager(transactionManager); executableWorkFactory = new TransactionalExecutorTaskFactory(transactionContextManager, 1); super.setUp(); } } 1.1 incubator-geronimo/modules/timer/src/test/org/apache/geronimo/timer/jdbc/JDBCWorkerPersistenceTest.java Index: JDBCWorkerPersistenceTest.java =================================================================== /** * * Copyright 2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.geronimo.timer.jdbc; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Date; import javax.sql.DataSource; import junit.framework.TestCase; import org.apache.geronimo.timer.Playback; import org.apache.geronimo.timer.WorkInfo; import org.apache.geronimo.timer.jdbc.JDBCWorkerPersistence; import org.axiondb.jdbc.AxionDataSource; /** * * * @version $Revision: 1.1 $ $Date: 2004/07/18 22:10:57 $ * * */ public class JDBCWorkerPersistenceTest extends TestCase { private final String countSQL = "select count(*) from timertasks"; private final String serverUniqueId = "TestServerUniqueID"; private final String key = "test:service=Timer"; private final Object userKey = "test user info"; private JDBCWorkerPersistence jdbcWorkerPersistence; private DataSource datasource; private WorkInfo workInfo; protected Date time; protected Long period; protected void setUp() throws Exception { datasource = new AxionDataSource("jdbc:axiondb:testdb"); jdbcWorkerPersistence = new JDBCWorkerPersistence(serverUniqueId, datasource); jdbcWorkerPersistence.doStart(); time = new Date(System.currentTimeMillis()); period = new Long(1000); workInfo = new WorkInfo(key, userKey, time, period, true); } protected void tearDown() throws Exception { jdbcWorkerPersistence.doStop(); } public void testSaveCancel() throws Exception { assertEquals(0, countRows()); jdbcWorkerPersistence.save(workInfo); assertEquals(1, countRows()); jdbcWorkerPersistence.cancel(workInfo.getId()); assertEquals(0, countRows()); } public void testSaveUpdate() throws Exception { assertEquals(0, countRows()); long now = workInfo.getTime().getTime(); jdbcWorkerPersistence.save(workInfo); assertEquals(1, countRows()); jdbcWorkerPersistence.fixedRateWorkPerformed(workInfo.getId()); // showRows(); PlaybackImpl playback = new PlaybackImpl(); jdbcWorkerPersistence.playback(key, playback); assertEquals(now + period.longValue(), playback.getTime().getTime()); assertEquals(1, playback.getCount()); long before = System.currentTimeMillis(); jdbcWorkerPersistence.intervalWorkPerformed(workInfo.getId(), period.longValue()); long after = System.currentTimeMillis(); playback = new PlaybackImpl(); jdbcWorkerPersistence.playback(key, playback); assertTrue(before + period.longValue() <= playback.getTime().getTime()); assertTrue(after + period.longValue() >= playback.getTime().getTime()); assertEquals(1, playback.getCount()); jdbcWorkerPersistence.cancel(workInfo.getId()); assertEquals(0, countRows()); } private void showRows() throws Exception { Connection c = datasource.getConnection(); try { PreparedStatement p = c.prepareStatement("select id, task from timertasks"); try { ResultSet countRS = p.executeQuery(); try { while(countRS.next()) { System.out.println("id: " + countRS.getLong(1) + " task: " + countRS.getString(2)); } } finally { countRS.close(); } } finally { p.close(); } } finally { c.close(); } } private int countRows() throws Exception { Connection c = datasource.getConnection(); try { PreparedStatement p = c.prepareStatement(countSQL); try { ResultSet countRS = p.executeQuery(); try { countRS.next(); return countRS.getInt(1); } finally { countRS.close(); } } finally { p.close(); } } finally { c.close(); } } private static class PlaybackImpl implements Playback { private int count = 0; private Date time; public void schedule(WorkInfo workInfo) { count++; this.time = workInfo.getTime(); } public int getCount() { return count; } public Date getTime() { return time; } } }