Return-Path: Delivered-To: apmail-sling-commits-archive@www.apache.org Received: (qmail 76186 invoked from network); 11 Oct 2010 06:55:28 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 11 Oct 2010 06:55:28 -0000 Received: (qmail 45839 invoked by uid 500); 11 Oct 2010 06:55:27 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 45765 invoked by uid 500); 11 Oct 2010 06:55:26 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 45758 invoked by uid 99); 11 Oct 2010 06:55:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Oct 2010 06:55:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Oct 2010 06:55:22 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 594DA23888CF; Mon, 11 Oct 2010 06:54:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1021247 [2/6] - in /sling/branches/eventing-3.0: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/... Date: Mon, 11 Oct 2010 06:54:14 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101011065444.594DA23888CF@eris.apache.org> Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import java.util.Calendar; +import java.util.Dictionary; + +import javax.jcr.Node; +import javax.jcr.NodeIterator; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.observation.EventIterator; +import javax.jcr.query.Query; +import javax.jcr.query.qom.QueryObjectModelFactory; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.ConfigurationPolicy; +import org.apache.felix.scr.annotations.Properties; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.osgi.OsgiUtil; +import org.apache.sling.event.EventUtil; +import org.apache.sling.event.impl.jobs.jcr.JCRHelper; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +/** + * This event handler distributes events across an application cluster. + * + * We schedule this event handler to run in the background and clean up + * obsolete events. + */ +@Component(label="%dist.events.name", + description="%dist.events.description", + immediate=true, + metatype=true, + policy=ConfigurationPolicy.REQUIRE) +@Service(value=Runnable.class) +@Properties({ + @Property(name="event.topics",value="*",propertyPrivate=true), + @Property(name="event.filter",value="(event.distribute=*)",propertyPrivate=true), + @Property(name="repository.path",value="/var/eventing/distribution",propertyPrivate=true), + @Property(name="scheduler.period", longValue=1800), + @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true) +}) +public class DistributingEventHandler + extends AbstractRepositoryEventHandler + implements Runnable { + + /** Default clean up time is 15 minutes. */ + private static final int DEFAULT_CLEANUP_PERIOD = 15; + + @Property(intValue=DEFAULT_CLEANUP_PERIOD) + private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period"; + + /** We remove everything which is older than 15min by default. */ + private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD; + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#activate(org.osgi.service.component.ComponentContext) + */ + protected void activate(ComponentContext context) { + @SuppressWarnings("unchecked") + final Dictionary props = context.getProperties(); + this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD); + super.activate(context); + } + + /** + * Return the query for the clean up. + */ + protected Query getCleanUpQuery(final Session s) + throws RepositoryException { + final String selectorName = "nodetype"; + final Calendar deleteBefore = Calendar.getInstance(); + deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod); + + final QueryObjectModelFactory qomf = s.getWorkspace().getQueryManager().getQOMFactory(); + + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + qomf.and(qomf.descendantNode(selectorName, this.repositoryPath), + qomf.comparison(qomf.propertyValue(selectorName, JCRHelper.NODE_PROPERTY_CREATED), + QueryObjectModelFactory.JCR_OPERATOR_LESS_THAN, + qomf.literal(s.getValueFactory().createValue(deleteBefore)))), + null, + null + ); + return q; + } + + /** + * This method is invoked periodically. + * @see java.lang.Runnable#run() + */ + public void run() { + if ( this.cleanupPeriod > 0 ) { + this.logger.debug("Cleaning up repository, removing all entries older than {} minutes.", this.cleanupPeriod); + + // we create an own session for concurrency issues + Session s = null; + try { + s = this.environment.createAdminSession(); + final Query q = this.getCleanUpQuery(s); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing query {}", q.getStatement()); + } + final NodeIterator iter = q.execute().getNodes(); + int count = 0; + while ( iter.hasNext() ) { + final Node eventNode = iter.nextNode(); + eventNode.remove(); + count++; + } + s.save(); + logger.debug("Removed {} entries from the repository.", count); + + } catch (RepositoryException e) { + // in the case of an error, we just log this as a warning + this.logger.warn("Exception during repository cleanup.", e); + } finally { + if ( s != null ) { + s.logout(); + } + } + } + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue() + */ + protected void processWriteQueue() { + while ( this.running ) { + // so let's wait/get the next job from the queue + Event event = null; + try { + event = this.writeQueue.take(); + } catch (InterruptedException e) { + // we ignore this + this.ignoreException(e); + } + if ( event != null && this.running ) { + try { + this.writeEvent(event, null); + } catch (Exception e) { + this.logger.error("Exception during writing the event to the repository.", e); + } + } + } + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground() + */ + protected void runInBackground() { + while ( this.running ) { + // so let's wait/get the next job from the queue + EventInfo info = null; + try { + info = this.queue.take(); + } catch (InterruptedException e) { + // we ignore this + this.ignoreException(e); + } + if ( info != null && this.running ) { + if ( info.nodePath != null) { + Session session = null; + try { + session = this.environment.createAdminSession(); + final Node eventNode = (Node)session.getItem(info.nodePath); + if ( eventNode.isNodeType(this.getEventNodeType()) ) { + final EventAdmin localEA = this.environment.getEventAdmin(); + if ( localEA != null ) { + localEA.postEvent(this.readEvent(eventNode)); + } else { + this.logger.error("Unable to post event as no event admin is available."); + } + } + } catch (Exception ex) { + this.logger.error("Exception during reading the event from the repository.", ex); + } finally { + if ( session != null ) { + session.logout(); + } + } + } + } + } + } + + /** + * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event) + */ + public void handleEvent(final Event event) { + try { + this.writeQueue.put(event); + } catch (InterruptedException ex) { + // we ignore this + this.ignoreException(ex); + } + } + + /** + * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator) + */ + public void onEvent(final EventIterator iterator) { + while ( iterator.hasNext() ) { + final javax.jcr.observation.Event event = iterator.nextEvent(); + try { + final EventInfo info = new EventInfo(); + info.nodePath = event.getPath(); + this.queue.put(info); + } catch (InterruptedException ex) { + // we ignore this + this.ignoreException(ex); + } catch (RepositoryException ex) { + this.logger.error("Exception during reading the event from the repository.", ex); + } + } + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#addEventProperties(javax.jcr.Node, java.util.Dictionary) + */ + protected void addEventProperties(Node eventNode, Dictionary properties) + throws RepositoryException { + super.addEventProperties(eventNode, properties); + properties.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString()); + } + + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession() + */ + protected void startWriterSession() throws RepositoryException { + super.startWriterSession(); + this.writerSession.getWorkspace().getObservationManager() + .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true); + } +} Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import javax.jcr.RepositoryException; +import javax.jcr.Session; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.classloader.DynamicClassLoaderManager; +import org.apache.sling.commons.threads.ThreadPool; +import org.apache.sling.event.impl.support.Environment; +import org.apache.sling.jcr.api.SlingRepository; +import org.apache.sling.settings.SlingSettingsService; +import org.osgi.service.event.EventAdmin; + +/** + * Environment component. This component provides "global settings" + * to all services, like the application id and the thread pool. + * @since 3.0 + */ +@Component() +@Service(value=EnvironmentComponent.class) +public class EnvironmentComponent { + + @Reference + private SlingRepository repository; + + @Reference + private EventAdmin eventAdmin; + + @Reference(policy=ReferencePolicy.DYNAMIC,cardinality=ReferenceCardinality.OPTIONAL_UNARY) + private DynamicClassLoaderManager classLoaderManager; + + /** + * Our thread pool. + */ + @Reference(referenceInterface=EventingThreadPool.class) + private ThreadPool threadPool; + + /** Sling settings service. */ + @Reference + private SlingSettingsService settingsService; + + /** + * Activate this component. + */ + @Activate + protected void activate() { + // Set the application id and the thread pool + Environment.APPLICATION_ID = this.settingsService.getSlingId(); + Environment.THREAD_POOL = this.threadPool; + } + + /** + * Dectivate this component. + */ + @Deactivate + protected void deactivate() { + // Unset the thread pool + if ( Environment.THREAD_POOL == this.threadPool ) { + Environment.THREAD_POOL = null; + } + } + + /** + * Return the dynamic classloader for loading events from the repository. + */ + public ClassLoader getDynamicClassLoader() { + final DynamicClassLoaderManager dclm = this.classLoaderManager; + if ( dclm != null ) { + return dclm.getDynamicClassLoader(); + } + // if we don't have a dynamic classloader, we return our classloader + return this.getClass().getClassLoader(); + } + + /** + * Create a new admin session. + * @return A new admin session. + * @throws RepositoryException + */ + public Session createAdminSession() + throws RepositoryException { + final SlingRepository repo = this.repository; + if ( repo == null ) { + // as the repo is a hard dependency for this service, the repo + // is always available, but we check for null anyway! + throw new RepositoryException("Repository is currently not available."); + } + return repo.loginAdministrative(null); + } + + /** + * Return the event admin. + */ + public EventAdmin getEventAdmin() { + return this.eventAdmin; + } + + protected void bindThreadPool(final EventingThreadPool etp) { + this.threadPool = etp; + } + + protected void unbindThreadPool(final EventingThreadPool etp) { + if ( this.threadPool == etp ) { + this.threadPool = null; + } + } +} Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.osgi.OsgiUtil; +import org.apache.sling.commons.threads.ModifiableThreadPoolConfig; +import org.apache.sling.commons.threads.ThreadPool; +import org.apache.sling.commons.threads.ThreadPoolConfig; +import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority; +import org.apache.sling.commons.threads.ThreadPoolManager; +import org.osgi.service.component.ComponentContext; + + +/** + * The configurable eventing thread pool. + */ +@Component(label="%event.pool.name", + description="%event.pool.description", + metatype=true) +@Service(value=EventingThreadPool.class) +public class EventingThreadPool implements ThreadPool { + + @Reference + protected ThreadPoolManager threadPoolManager; + + /** The real thread pool used. */ + private org.apache.sling.commons.threads.ThreadPool threadPool; + + private static final int DEFAULT_MIN_POOL_SIZE = 35; // this is sufficient for all threads + approx 25 job queues + private static final int DEFAULT_MAX_POOL_SIZE = 50; + + @Property(intValue=DEFAULT_MIN_POOL_SIZE) + private static final String PROPERTY_MIN_POOL_SIZE = "minPoolSize"; + @Property(intValue=DEFAULT_MAX_POOL_SIZE) + private static final String PROPERTY_MAX_POOL_SIZE = "maxPoolSize"; + + @Property(value="NORM", + options={@PropertyOption(name="NORM",value="Norm"), + @PropertyOption(name="MIN",value="Min"), + @PropertyOption(name="MAX",value="Max")}) + private static final String PROPERTY_PRIORITY = "priority"; + + /** + * Activate this component. + * @param context + */ + protected void activate(final ComponentContext ctx) { + final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig(); + config.setMinPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MIN_POOL_SIZE), DEFAULT_MIN_POOL_SIZE)); + config.setMaxPoolSize(OsgiUtil.toInteger(ctx.getProperties().get(PROPERTY_MAX_POOL_SIZE), DEFAULT_MAX_POOL_SIZE)); + config.setQueueSize(-1); // unlimited + config.setShutdownGraceful(true); + config.setPriority(ThreadPriority.valueOf(OsgiUtil.toString(ctx.getProperties().get(PROPERTY_PRIORITY), "NORM"))); + config.setDaemon(true); + this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool"); + } + + /** + * Deactivate this component. + * @param context + */ + protected void deactivate(final ComponentContext context) { + this.threadPoolManager.release(this.threadPool); + } + + /** + * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable) + */ + public void execute(Runnable runnable) { + threadPool.execute(runnable); + } + + /** + * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration() + */ + public ThreadPoolConfig getConfiguration() { + return threadPool.getConfiguration(); + } + + /** + * @see org.apache.sling.commons.threads.ThreadPool#getName() + */ + public String getName() { + return threadPool.getName(); + } +} Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,809 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.Date; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import javax.jcr.Item; +import javax.jcr.Node; +import javax.jcr.NodeIterator; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.Value; +import javax.jcr.lock.Lock; +import javax.jcr.lock.LockException; +import javax.jcr.observation.EventIterator; +import javax.jcr.query.Query; +import javax.jcr.query.QueryManager; +import javax.jcr.query.qom.Comparison; +import javax.jcr.query.qom.Constraint; +import javax.jcr.query.qom.QueryObjectModelFactory; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Properties; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.scheduler.Job; +import org.apache.sling.commons.scheduler.JobContext; +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.event.EventUtil; +import org.apache.sling.event.TimedEventStatusProvider; +import org.apache.sling.event.impl.jobs.Utility; +import org.apache.sling.event.impl.jobs.jcr.JCRHelper; +import org.apache.sling.event.impl.support.Environment; +import org.apache.sling.event.jobs.JobUtil; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + + +/** + * An event handler for timed events. + * + */ +@Component(immediate=true) +@Service(value=TimedEventStatusProvider.class) +@Properties({ + @Property(name="event.topics",propertyPrivate=true, + value={"org/osgi/framework/BundleEvent/UPDATED", + "org/osgi/framework/BundleEvent/STARTED", + EventUtil.TOPIC_TIMED_EVENT}), + @Property(name="repository.path",value="/var/eventing/timed-jobs",propertyPrivate=true) +}) +public class TimedJobHandler + extends AbstractRepositoryEventHandler + implements Job, TimedEventStatusProvider { + + private static final String JOB_TOPIC = "topic"; + + private static final String JOB_CONFIG = "config"; + + private static final String JOB_SCHEDULE_INFO = "info"; + + @Reference + private Scheduler scheduler; + + /** Unloaded events. */ + private SetunloadedEvents = new HashSet(); + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#startWriterSession() + */ + protected void startWriterSession() throws RepositoryException { + super.startWriterSession(); + // load timed events from repository + this.loadEvents(); + this.writerSession.getWorkspace().getObservationManager() + .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED|javax.jcr.observation.Event.PROPERTY_REMOVED, this.repositoryPath, true, null, null, true); + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue() + */ + protected void processWriteQueue() { + while ( this.running ) { + Event event = null; + try { + event = this.writeQueue.take(); + } catch (InterruptedException e) { + // we ignore this + this.ignoreException(e); + } + if ( this.running && event != null ) { + ScheduleInfo scheduleInfo = null; + try { + scheduleInfo = new ScheduleInfo(event); + } catch (IllegalArgumentException iae) { + this.logger.error(iae.getMessage()); + } + if ( scheduleInfo != null ) { + final EventInfo info = new EventInfo(); + info.event = event; + + // write event and update path + // if something went wrong we get the node path and reschedule + synchronized ( this.writeLock ) { + info.nodePath = this.persistEvent(info.event, scheduleInfo); + } + if ( info.nodePath != null ) { + try { + this.queue.put(info); + } catch (InterruptedException e) { + // this should never happen, so we ignore it + this.ignoreException(e); + } + } + } + } + } + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#runInBackground() + */ + protected void runInBackground() { + while ( this.running ) { + // so let's wait/get the next info from the queue + EventInfo info = null; + try { + info = this.queue.take(); + } catch (InterruptedException e) { + // we ignore this + this.ignoreException(e); + } + if ( info != null && this.running ) { + synchronized ( this.writeLock ) { + ScheduleInfo scheduleInfo = null; + try { + scheduleInfo = new ScheduleInfo(info.event); + } catch (IllegalArgumentException iae) { + this.logger.error(iae.getMessage()); + } + if ( scheduleInfo != null ) { + try { + this.writerSession.refresh(true); + if ( this.writerSession.itemExists(info.nodePath) ) { + final Node eventNode = (Node) this.writerSession.getItem(info.nodePath); + if ( !eventNode.isLocked() ) { + // lock node + Lock lock = null; + try { + lock = eventNode.getSession().getWorkspace().getLockManager().lock(info.nodePath, false, true, Long.MAX_VALUE, "TimedJobHandler"); + } catch (RepositoryException re) { + // lock failed which means that the node is locked by someone else, so we don't have to requeue + } + if ( lock != null ) { + // if something went wrong, we reschedule + if ( !this.processEvent(info.event, scheduleInfo) ) { + try { + this.queue.put(info); + } catch (InterruptedException e) { + // this should never happen, so we ignore it + this.ignoreException(e); + } + } + } + } + } + } catch (RepositoryException e) { + // ignore + this.ignoreException(e); + } + } + } + } + } + } + + protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) { + try { + // get parent node + final Node parentNode = this.getWriterRootNode(); + final String nodeName = scheduleInfo.jobId; + // is there already a node? + final Node foundNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null; + Lock lock = null; + if ( scheduleInfo.isStopEvent() ) { + // if this is a stop event, we should remove the node from the repository + // if there is no node someone else was faster and we can ignore this + if ( foundNode != null ) { + try { + foundNode.remove(); + writerSession.save(); + } catch (LockException le) { + // if someone else has the lock this is fine + } + } + // stop the scheduler + processEvent(event, scheduleInfo); + } else { + // if there is already a node, it means we must handle an update + if ( foundNode != null ) { + try { + foundNode.remove(); + writerSession.save(); + } catch (LockException le) { + // if someone else has the lock this is fine + } + // create a stop event + processEvent(event, scheduleInfo.getStopInfo()); + } + // we only write the event if this is a local one + if ( EventUtil.isLocal(event) ) { + + // write event to repository, lock it and schedule the event + final Node eventNode = writeEvent(event, nodeName); + lock = eventNode.getSession().getWorkspace().getLockManager().lock(eventNode.getPath(), false, true, Long.MAX_VALUE, "TimedJobHandler"); + } + } + + if ( lock != null ) { + // if something went wrong, we reschedule + if ( !this.processEvent(event, scheduleInfo) ) { + final String path = lock.getNode().getPath(); + writerSession.getWorkspace().getLockManager().unlock(path); + return path; + } + } + } catch (RepositoryException re ) { + // something went wrong, so let's log it + this.logger.error("Exception during writing new job to repository.", re); + } + return null; + } + + /** + * Process the event. + * If a scheduler is available, a job is scheduled or stopped. + * @param event The incomming event. + * @return + */ + protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) { + final Scheduler localScheduler = this.scheduler; + if ( localScheduler != null ) { + // is this a stop event? + if ( scheduleInfo.isStopEvent() ) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC) + "(" + scheduleInfo.jobId + ")"); + } + try { + localScheduler.removeJob(scheduleInfo.jobId); + } catch (NoSuchElementException nsee) { + // this can happen if the job is scheduled on another node + // so we can just ignore this + } + return true; + } + // we ignore remote job events + if ( !EventUtil.isLocal(event) ) { + return true; + } + + // Create configuration for scheduled job + final Map config = new HashMap(); + // copy properties + final Hashtable properties = new Hashtable(); + config.put(JOB_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC)); + final String[] names = event.getPropertyNames(); + if ( names != null ) { + for(int i=0; i 0 ) { + doIt = true; + } + } + if ( doIt ) { + final Runnable t = new Runnable() { + + public void run() { + synchronized (unloadedEvents) { + Session s = null; + final Set newUnloadedEvents = new HashSet(); + newUnloadedEvents.addAll(unloadedEvents); + try { + s = environment.createAdminSession(); + for(String path : unloadedEvents ) { + newUnloadedEvents.remove(path); + try { + if ( s.itemExists(path) ) { + final Node eventNode = (Node) s.getItem(path); + if ( !eventNode.isLocked() ) { + try { + final EventInfo info = new EventInfo(); + info.event = readEvent(eventNode); + info.nodePath = path; + try { + queue.put(info); + } catch (InterruptedException e) { + // we ignore this exception as this should never occur + ignoreException(e); + } + } catch (ClassNotFoundException cnfe) { + newUnloadedEvents.add(path); + ignoreException(cnfe); + } + } + } + } catch (RepositoryException re) { + // we ignore this and readd + newUnloadedEvents.add(path); + ignoreException(re); + } + } + } catch (RepositoryException re) { + // unable to create session, so we try it again next time + ignoreException(re); + } finally { + if ( s != null ) { + s.logout(); + } + unloadedEvents.clear(); + unloadedEvents.addAll(newUnloadedEvents); + } + } + } + + }; + Environment.THREAD_POOL.execute(t); + } + } + } + + /** + * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext) + */ + public void execute(JobContext context) { + final String topic = (String) context.getConfiguration().get(JOB_TOPIC); + @SuppressWarnings("unchecked") + final Dictionary properties = (Dictionary) context.getConfiguration().get(JOB_CONFIG); + final EventAdmin ea = this.environment.getEventAdmin(); + if ( ea != null ) { + try { + ea.postEvent(new Event(topic, properties)); + } catch (IllegalArgumentException iae) { + this.logger.error("Scheduled event has illegal topic: " + topic, iae); + } + } else { + this.logger.warn("Unable to send timed event as no event admin service is available."); + } + final ScheduleInfo info = (ScheduleInfo) context.getConfiguration().get(JOB_SCHEDULE_INFO); + // is this job scheduled for a specific date? + if ( info.date != null ) { + // we can remove it from the repository + // we create an own session here + Session s = null; + try { + s = this.environment.createAdminSession(); + if ( s.itemExists(this.repositoryPath) ) { + final Node parentNode = (Node)s.getItem(this.repositoryPath); + final String nodeName = info.jobId; + final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null; + if ( eventNode != null ) { + try { + eventNode.remove(); + s.save(); + } catch (RepositoryException re) { + // we ignore the exception if removing fails + ignoreException(re); + } + } + } + } catch (RepositoryException re) { + this.logger.error("Unable to create a session.", re); + } finally { + if ( s != null ) { + s.logout(); + } + } + } + } + + /** + * Load all active timed events from the repository. + * @throws RepositoryException + */ + protected void loadEvents() { + try { + final QueryManager qManager = this.writerSession.getWorkspace().getQueryManager(); + final String selectorName = "nodetype"; + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + qomf.descendantNode(selectorName, this.repositoryPath), + null, + null + ); + final NodeIterator result = q.execute().getNodes(); + while ( result.hasNext() ) { + final Node eventNode = result.nextNode(); + if ( !eventNode.isLocked() ) { + final String nodePath = eventNode.getPath(); + try { + final Event event = this.readEvent(eventNode); + final EventInfo info = new EventInfo(); + info.event = event; + info.nodePath = nodePath; + try { + this.queue.put(info); + } catch (InterruptedException e) { + // we ignore this exception as this should never occur + this.ignoreException(e); + } + } catch (ClassNotFoundException cnfe) { + // add it to the unloaded set + synchronized (unloadedEvents) { + this.unloadedEvents.add(nodePath); + } + this.ignoreException(cnfe); + } catch (RepositoryException re) { + // if reading an event fails, we ignore this + this.ignoreException(re); + } + } + } + } catch (RepositoryException re) { + this.logger.error("Exception during initial loading of stored timed events.", re); + } + } + + /** + * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event) + */ + protected void addNodeProperties(Node eventNode, Event event) + throws RepositoryException { + super.addNodeProperties(eventNode, event); + eventNode.setProperty(JCRHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC)); + final ScheduleInfo info = new ScheduleInfo(event); + if ( info.date != null ) { + final Calendar c = Calendar.getInstance(); + c.setTime(info.date); + eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_DATE, c); + } + if ( info.expression != null ) { + eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_EXPRESSION, info.expression); + } + if ( info.period != null ) { + eventNode.setProperty(JCRHelper.NODE_PROPERTY_TE_PERIOD, info.period.longValue()); + } + } + + /** + * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#getEventNodeType() + */ + protected String getEventNodeType() { + return JCRHelper.TIMED_EVENT_NODE_TYPE; + } + + protected static final class ScheduleInfo implements Serializable { + + private static final long serialVersionUID = 8667701700547811142L; + + public final String expression; + public final Long period; + public final Date date; + public final String jobId; + + public ScheduleInfo(final Event event) + throws IllegalArgumentException { + // let's see if a schedule information is specified or if the job should be stopped + this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE); + this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD); + this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE); + int count = 0; + if ( this.expression != null) { + count++; + } + if ( this.period != null ) { + count++; + } + if ( this.date != null ) { + count++; + } + if ( count > 1 ) { + throw new IllegalArgumentException("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE + + ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD + + ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should be used."); + } + // we create a job id consisting of the real event topic and an (optional) id + // if the event contains a timed event id or a job id we'll append that to the name + String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC); + if ( topic == null ) { + throw new IllegalArgumentException("Timed event does not contain required property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC); + } + String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID); + String jId = (String)event.getProperty(JobUtil.PROPERTY_JOB_NAME); + + //this.jobId = getJobId(topic, id, jId); + this.jobId = getJobId(topic, id, jId); + } + + private ScheduleInfo(String jobId) { + this.expression = null; + this.period = null; + this.date = null; + this.jobId = jobId; + } + + public ScheduleInfo getStopInfo() { + return new ScheduleInfo(this.jobId); + } + + public boolean isStopEvent() { + return this.expression == null && this.period == null && this.date == null; + } + + public static String getJobId(String topic, String timedEventId, String jobId) { + return topic.replace('/', '.') + "/TimedEvent " + (timedEventId != null ? Utility.filter(timedEventId) : "") + '_' + (jobId != null ? Utility.filter(jobId) : ""); + } + } + + /** + * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvent(java.lang.String, java.lang.String, java.lang.String) + */ + public Event getScheduledEvent(String topic, String eventId, String jobId) { + Session s = null; + try { + s = this.environment.createAdminSession(); + if ( s.itemExists(this.repositoryPath) ) { + final Node parentNode = (Node)s.getItem(this.repositoryPath); + final String nodeName = ScheduleInfo.getJobId(topic, eventId, jobId); + final Node eventNode = parentNode.hasNode(nodeName) ? parentNode.getNode(nodeName) : null; + if ( eventNode != null ) { + return this.readEvent(eventNode); + } + } + } catch (RepositoryException re) { + this.logger.error("Unable to create a session.", re); + } catch (ClassNotFoundException e) { + this.ignoreException(e); + } finally { + if ( s != null ) { + s.logout(); + } + } + return null; + } + + /** + * @see org.apache.sling.event.TimedEventStatusProvider#getScheduledEvents(java.lang.String, java.util.Map...) + */ + public Collection getScheduledEvents(String topic, Map... filterProps) { + // we create a new session + Session s = null; + final List jobs = new ArrayList(); + try { + s = this.environment.createAdminSession(); + final QueryManager qManager = s.getWorkspace().getQueryManager(); + final String selectorName = "nodetype"; + + final QueryObjectModelFactory qomf = qManager.getQOMFactory(); + + final String path; + if ( topic == null ) { + path = this.repositoryPath; + } else { + path = this.repositoryPath + '/' + topic.replace('/', '.'); + } + Constraint constraint = qomf.descendantNode(selectorName, path); + if ( filterProps != null && filterProps.length > 0 ) { + Constraint orConstraint = null; + for (Map template : filterProps) { + Constraint comp = null; + final Iterator> i = template.entrySet().iterator(); + while ( i.hasNext() ) { + final Map.Entry current = i.next(); + // check prop name first + final String propName = JCRHelper.getNodePropertyName(current.getKey()); + if ( propName != null ) { + // check value + final Value value = JCRHelper.getNodePropertyValue(s.getValueFactory(), current.getValue()); + if ( value != null ) { + final Comparison newComp = qomf.comparison(qomf.propertyValue(selectorName, propName), + QueryObjectModelFactory.JCR_OPERATOR_EQUAL_TO, + qomf.literal(value)); + if ( comp == null ) { + comp = newComp; + } else { + comp = qomf.and(comp, newComp); + } + } + } + } + if ( comp != null ) { + if ( orConstraint == null ) { + orConstraint = comp; + } else { + orConstraint = qomf.or(orConstraint, comp); + } + } + } + if ( orConstraint != null ) { + constraint = qomf.and(constraint, orConstraint); + } + } + final Query q = qomf.createQuery( + qomf.selector(getEventNodeType(), selectorName), + constraint, + null, + null + ); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing job query {}.", q.getStatement()); + } + + final NodeIterator iter = q.execute().getNodes(); + while ( iter.hasNext() ) { + final Node eventNode = iter.nextNode(); + try { + final Event event = this.readEvent(eventNode); + jobs.add(event); + } catch (ClassNotFoundException cnfe) { + // in the case of a class not found exception we just ignore the exception + this.ignoreException(cnfe); + } + } + } catch (RepositoryException e) { + // in the case of an error, we return an empty list + this.ignoreException(e); + } finally { + if ( s != null) { + s.logout(); + } + } + return jobs; + } + + /** + * @see org.apache.sling.event.TimedEventStatusProvider#cancelTimedEvent(java.lang.String) + */ + public void cancelTimedEvent(String jobId) { + synchronized ( this.writeLock ) { + try { + // is there a node? + final Item foundNode = this.writerSession.itemExists(jobId) ? this.writerSession.getItem(jobId) : null; + // we should remove the node from the repository + // if there is no node someone else was faster and we can ignore this + if ( foundNode != null ) { + try { + foundNode.remove(); + writerSession.save(); + } catch (LockException le) { + // if someone else has the lock this is fine + } + } + } catch ( RepositoryException re) { + this.logger.error("Unable to cancel timed event: " + jobId, re); + } + // stop the scheduler + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Stopping timed event " + jobId); + } + final Scheduler localScheduler = this.scheduler; + if ( localScheduler != null ) { + try { + localScheduler.removeJob(jobId); + } catch (NoSuchElementException nsee) { + // this can happen if the job is scheduled on another node + // so we can just ignore this + } + } + } + } +} Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Properties; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.felix.scr.annotations.Services; +import org.apache.sling.commons.osgi.OsgiUtil; +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.event.EventUtil; +import org.apache.sling.event.impl.EnvironmentComponent; +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue; +import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue; +import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue; +import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue; +import org.apache.sling.event.impl.support.Environment; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.JobUtil; +import org.apache.sling.event.jobs.JobsIterator; +import org.apache.sling.event.jobs.Queue; +import org.apache.sling.event.jobs.Statistics; +import org.osgi.service.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An event handler for special job events. + * + * We schedule this event handler to run in the background + * and clean up obsolete queues. + * + */ +@Component(label="%job.events.name", + description="%job.events.description", + metatype=true,immediate=true) +@Services({ + @Service(value=Runnable.class), + @Service(value=JobManager.class) +}) +@Properties({ + @Property(name="scheduler.period", longValue=300, + label="%jobscheduler.period.name", + description="%jobscheduler.period.description"), + @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true), + @Property(name=ConfigurationConstants.PROP_PRIORITY, + value=ConfigurationConstants.DEFAULT_PRIORITY, + options={@PropertyOption(name="NORM",value="Norm"), + @PropertyOption(name="MIN",value="Min"), + @PropertyOption(name="MAX",value="Max")}), + @Property(name=ConfigurationConstants.PROP_RETRIES, + intValue=ConfigurationConstants.DEFAULT_RETRIES), + @Property(name=ConfigurationConstants.PROP_RETRY_DELAY, + longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY), + @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL, + intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL) +}) +public class DefaultJobManager + implements Runnable, JobManager { + + /** Default logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** The environment component. */ + @Reference + private EnvironmentComponent environment; + + /** The configuration manager. */ + @Reference + private QueueConfigurationManager configManager; + + /** The scheduler service. */ + @Reference + private Scheduler scheduler; + + /** All active queues. */ + private final Map queues = new ConcurrentHashMap(); + + /** Main configuration. */ + private InternalQueueConfiguration mainConfiguration; + + /** Base statistics. */ + private final StatisticsImpl baseStatistics = new StatisticsImpl(); + + /** Current statistics. */ + private StatisticsImpl currentStatistics; + + /** Last update for current statistics. */ + private long lastUpdatedStatistics; + + /** All existing events. */ + private final Map allEvents = new HashMap(); + + /** All existing events by topic. */ + private final Map> allEventsByTopic = new HashMap>(); + + /** + * Activate this component. + * @param props Configuration properties + */ + @Activate + protected void activate(final Map props) { + this.update(props); + logger.info("Apache Sling Job Event Handler started on instance {}", Environment.APPLICATION_ID); + } + + /** + * Configure this component. + * @param props Configuration properties + */ + @Modified + protected void update(final Map props) { + // create a new dictionary with the missing info and do some sanety puts + final Map queueProps = new HashMap(props); + queueProps.remove(ConfigurationConstants.PROP_APP_IDS); + queueProps.put(ConfigurationConstants.PROP_TOPICS, "*"); + queueProps.put(ConfigurationConstants.PROP_NAME, "
"); + queueProps.put(ConfigurationConstants.PROP_RUN_LOCAL, false); + queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED); + + // check max parallel - this should never be lower than 2! + final int maxParallel = OsgiUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL), + ConfigurationConstants.DEFAULT_MAX_PARALLEL); + if ( maxParallel < 2 ) { + this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2", + maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL); + queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2); + } + this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps); + } + + /** + * Dectivate this component. + */ + @Deactivate + protected void deactivate() { + final Iterator i = this.queues.values().iterator(); + while ( i.hasNext() ) { + final AbstractJobQueue jbq = i.next(); + jbq.close(); + } + this.queues.clear(); + logger.info("Apache Sling Job Event Handler stopped on instance {}", Environment.APPLICATION_ID); + } + + /** + * This method is invoked periodically by the scheduler. + * It searches for idle queues and stops them after a timeout. If a queue + * is idle for two consecutive clean up calls, it is removed. + * @see java.lang.Runnable#run() + */ + public void cleanup() { + // check for idle queue + // we synchronize to avoid creating a queue which is about to be removed during cleanup + synchronized ( this ) { + final Iterator> i = this.queues.entrySet().iterator(); + while ( i.hasNext() ) { + final Map.Entry current = i.next(); + final AbstractJobQueue jbq = current.getValue(); + if ( jbq.isMarkedForCleanUp() ) { + // close + jbq.close(); + // copy statistics + this.baseStatistics.add(jbq); + // remove + i.remove(); + } else { + // mark to be removed during next cycle + jbq.markForCleanUp(); + } + } + } + } + + /** + * Process a new job event. + * This method first searches the corresponding queue - if such a queue + * does not exist yet, it is created and started. + * @param event The job event + */ + public void process(final JobEvent event) { + // get the queue configuration + InternalQueueConfiguration config = configManager.getQueueConfiguration(event); + + // if no queue config is found, we either create a new queue or use the main queue + if ( config == null ) { + final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME); + if ( customQueueName != null ) { + synchronized ( this ) { + final AbstractJobQueue queue = this.queues.get(customQueueName); + if ( queue != null ) { + config = queue.getConfiguration(); + } else { + config = new InternalQueueConfiguration(event.event); + } + event.queueName = customQueueName; + } + } else { + config = this.mainConfiguration; + event.queueName = this.mainConfiguration.getName(); + } + } + + // get the queue name + final String queueName = event.queueName; + + if ( config.isSkipped(event) ) { + if ( logger.isDebugEnabled() ) { + logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event)); + } + return; + } + + // get or create queue + AbstractJobQueue queue = null; + // we synchronize to avoid creating a queue which is about to be removed during cleanup + synchronized ( this ) { + queue = this.queues.get(queueName); + // check for reconfiguration, we really do an identity check here(!) + if ( queue != null && queue.getConfiguration() != config ) { + // remove the queue with the old name + this.queues.remove(queueName); + // check if we can close or have to rename + queue.markForCleanUp(); + if ( queue.isMarkedForCleanUp() ) { + // close + queue.close(); + // copy statistics + this.baseStatistics.add(queue); + } else { + // notify queue + queue.rename(queueName + ""); + // readd with new name + this.queues.put(queue.getName(), queue); + } + // we use a new queue with the configuration + queue = null; + } + if ( queue == null ) { + if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) { + queue = new OrderedJobQueue(queueName, config, this.environment); + } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) { + queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler); + } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) { + queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler); + } + if ( queue == null ) { + // this is just a sanety check, actually we can never get here + logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueName, EventUtil.toString(event.event)); + return; + } + queues.put(queueName, queue); + queue.start(); + } + } + + // and put event + queue.process(event); + } + + /** + * This method is invoked periodically by the scheduler. + * @see java.lang.Runnable#run() + */ + public void run() { + this.cleanup(); + } + + /** + * Return our internal statistics object. + * We recalculate this every 1.5sec (if requested) + * + * @see org.apache.sling.event.jobs.JobManager#getStatistics() + */ + public synchronized Statistics getStatistics() { + final long now = System.currentTimeMillis(); + if ( this.currentStatistics == null || this.lastUpdatedStatistics + 1500 < now ) { + this.currentStatistics = this.baseStatistics.copy(); + for(final AbstractJobQueue jq : this.queues.values() ) { + this.currentStatistics.add(jq); + } + } + return this.currentStatistics; + } + + /** + * @see org.apache.sling.event.jobs.JobManager#getQueue(java.lang.String) + */ + public Queue getQueue(final String name) { + return this.queues.get(name); + } + + /** + * @see org.apache.sling.event.jobs.JobManager#getQueues() + */ + public Iterable getQueues() { + final Iterator jqI = this.queues.values().iterator(); + return new Iterable() { + + public Iterator iterator() { + return new Iterator() { + + public boolean hasNext() { + return jqI.hasNext(); + } + + public Queue next() { + return jqI.next(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + public InternalQueueConfiguration getMainQueueConfiguration() { + return this.mainConfiguration; + } + + /** + * Add a job to all jobs. + */ + public void notifyAddJob(final JobEvent job) { + final String key = job.uniqueId; + final String topic = (String)job.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC); + final JobEvent oldJob; + synchronized ( this.allEvents ) { + oldJob = this.allEvents.put(key, job); + } + List l; + synchronized ( this.allEventsByTopic ) { + l = this.allEventsByTopic.get(topic); + if ( l == null ) { + l = new ArrayList(); + this.allEventsByTopic.put(topic, l); + } + } + synchronized ( l ) { + if ( oldJob != null ) { + l.remove(oldJob); + } + l.add(job); + } + } + + /** + * Remove a job from all jobs. + */ + public void notifyRemoveJob(final String key) { + final JobEvent oldJob; + synchronized ( this.allEvents ) { + oldJob = this.allEvents.remove(key); + } + if ( oldJob != null ) { + final String topic = (String)oldJob.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC); + final List l; + synchronized ( this.allEventsByTopic ) { + l = this.allEventsByTopic.get(topic); + } + if ( l != null ) { + synchronized ( l ) { + l.remove(oldJob); + } + } + } + } + + /** + * Job started + */ + public void notifyActiveJob(final String key) { + final JobEvent job; + synchronized ( this.allEvents ) { + job = this.allEvents.get(key); + } + if ( job != null ) { + job.started = 1; + } + } + + /** + * Check the requested job type + */ + private boolean checkType(final QueryType type, final JobEvent event) { + if ( type == QueryType.ALL ) { + return true; + } + if ( type == QueryType.ACTIVE && event.started == 1 ) { + return true; + } + if ( type == QueryType.QUEUED && event.started == -1 ) { + return true; + } + return false; + } + + private enum Operation { + LESS, + LESS_OR_EQUALS, + EQUALS, + GREATER_OR_EQUALS, + GREATER + } + /** + * Check if the job matches the template + */ + private boolean match(final JobEvent job, final Map template) { + if ( template != null ) { + for(final Map.Entry current : template.entrySet()) { + final String key = current.getKey(); + final char firstChar = key.length() > 0 ? key.charAt(0) : 0; + final String propName; + final Operation op; + if ( firstChar == '=' ) { + propName = key.substring(1); + op = Operation.EQUALS; + } else if ( firstChar == '<' ) { + final char secondChar = key.length() > 1 ? key.charAt(1) : 0; + if ( secondChar == '=' ) { + op = Operation.LESS_OR_EQUALS; + propName = key.substring(2); + } else { + op = Operation.LESS; + propName = key.substring(1); + } + } else if ( firstChar == '>' ) { + final char secondChar = key.length() > 1 ? key.charAt(1) : 0; + if ( secondChar == '=' ) { + op = Operation.GREATER_OR_EQUALS; + propName = key.substring(2); + } else { + op = Operation.GREATER; + propName = key.substring(1); + } + } else { + propName = key; + op = Operation.EQUALS; + } + final Object value = current.getValue(); + + if ( op == Operation.EQUALS ) { + if ( !value.equals(job.event.getProperty(propName)) ) { + return false; + } + } else { + if ( value instanceof Comparable ) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + final int result = ((Comparable)value).compareTo(job.event.getProperty(propName)); + if ( op == Operation.LESS && result != -1 ) { + return false; + } else if ( op == Operation.LESS_OR_EQUALS && result == 1 ) { + return false; + } else if ( op == Operation.GREATER_OR_EQUALS && result == -1 ) { + return false; + } else if ( op == Operation.GREATER && result != 1 ) { + return false; + } + } else { + // if the value is not comparable we simply don't match + return false; + } + } + } + } + return true; + } + + private void queryCollection( + final List result, + final QueryType type, + final Collection collection, + final Map... filterProps) { + synchronized ( collection ) { + final Iterator iter = collection.iterator(); + while ( iter.hasNext() ) { + final JobEvent job = iter.next(); + boolean add = checkType(type, job); + if ( add && filterProps != null && filterProps.length != 0 ) { + add = false; + for (Map template : filterProps) { + add = this.match(job, template); + if ( add ) { + break; + } + } + } + if ( add ) { + result.add(job.event); + } + } + } + } + /** + * @see org.apache.sling.event.jobs.JobManager#queryJobs(QueryType, java.lang.String, java.util.Map...) + */ + public JobsIterator queryJobs(final QueryType type, + final String topic, + final Map... filterProps) { + final List result = new ArrayList(); + if ( topic != null ) { + final List l; + synchronized ( this.allEventsByTopic ) { + l = this.allEventsByTopic.get(topic); + } + if ( l != null ) { + queryCollection(result, type, l, filterProps); + } + } else { + final Set> topics; + synchronized ( this.allEventsByTopic ) { + topics = new HashSet>(this.allEventsByTopic.values()); + } + for(final Collection l : topics) { + queryCollection(result, type, l, filterProps); + } + } + return new JobsIteratorImpl(result); + } + + /** + * @see org.apache.sling.event.jobs.JobManager#findJob(java.lang.String, java.util.Map) + */ + public Event findJob(final String topic, final Map template) { + Event result = null; + if ( topic != null ) { + final List l; + synchronized ( this.allEventsByTopic ) { + l = this.allEventsByTopic.get(topic); + } + if ( l != null ) { + synchronized ( l ) { + final Iterator iter = l.iterator(); + while ( result == null && iter.hasNext() ) { + final JobEvent job = iter.next(); + if ( match(job, template) ) { + result = job.event; + } + } + } + } + } + return result; + } + + /** + * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String) + */ + public boolean removeJob(final String jobId) { + final JobEvent job; + synchronized ( this.allEvents ) { + job = this.allEvents.get(jobId); + } + boolean result = true; + if ( job != null ) { + result = job.remove(); + if ( result ) { + this.notifyRemoveJob(jobId); + } + } + return result; + } + + /** + * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String) + */ + public void forceRemoveJob(final String jobId) { + while ( !this.removeJob(jobId) ) { + // instead of using complicated syncs, waits and notifies we simply poll + try { + Thread.sleep(80); + } catch (final InterruptedException ignore) { + this.ignoreException(ignore); + } + } + } + + /** + * Helper method which just logs the exception in debug mode. + * @param e + */ + private void ignoreException(final Exception e) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Ignored exception " + e.getMessage(), e); + } + } +} Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java?rev=1021247&view=auto ============================================================================== --- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java (added) +++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java Mon Oct 11 06:54:12 2010 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs; + +import org.osgi.service.event.Event; + +/** + * This object encapsulates all information about a job. + */ +public abstract class JobEvent { + + public Event event; + public final String uniqueId; + + public String queueName; + + public long queued = -1; + public long started = -1; + + public JobEvent(final Event e, final String uniqueId) { + this.event = e; + this.uniqueId = uniqueId; + } + + public abstract boolean lock(); + public abstract void unlock(); + public abstract void finished(); + public abstract boolean reschedule(); + public abstract boolean remove(); + + @Override + public int hashCode() { + return this.uniqueId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if ( ! (obj instanceof JobEvent) ) { + return false; + } + return this.uniqueId.equals(((JobEvent)obj).uniqueId); + } +} \ No newline at end of file Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java ------------------------------------------------------------------------------ svn:mime-type = text/plain