incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1021247 [5/6] - in /sling/branches/eventing-3.0: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/...
Date Mon, 11 Oct 2010 06:54:14 GMT
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * An ordered job queue is processing the queue FIFO in a serialized
+ * way. If a job fails it is rescheduled and the reschedule is processed
+ * next - this basically means that failing jobs block the queue
+ * until they are finished!
+ */
+public final class OrderedJobQueue extends AbstractJobQueue {
+
+    /** The job event for rescheduling. */
+    private JobEvent jobEvent;
+
+    /** Marker indicating that this queue is currently sleeping. */
+    private volatile boolean isSleeping = false;
+
+    /** The sleeping thread. */
+    private volatile Thread sleepingThread;
+
+    /** The queue. */
+    private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+    public OrderedJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env) {
+        super(name, config, env);
+    }
+
+    @Override
+    public String getStatusInfo() {
+        return super.getStatusInfo() + ", isSleeping=" + this.isSleeping;
+    }
+
+    @Override
+    protected JobEvent start(final JobEvent processInfo) {
+        JobEvent rescheduleInfo = null;
+
+        // if we are ordered we simply wait for the finish
+        if ( this.executeJob(processInfo) ) {
+            rescheduleInfo = this.waitForFinish();
+        }
+        return rescheduleInfo;
+    }
+
+    private void setSleeping(boolean flag) {
+        this.isSleeping = flag;
+        if ( !flag ) {
+            this.sleepingThread = null;
+        }
+    }
+
+    private void setSleeping(Thread sleepingThread) {
+        this.sleepingThread = sleepingThread;
+        this.setSleeping(true);
+    }
+
+    @Override
+    public void resume() {
+        if ( this.isSleeping ) {
+            final Thread thread = this.sleepingThread;
+            if ( thread != null ) {
+                thread.interrupt();
+            }
+        }
+        super.resume();
+    }
+
+    /**
+     * Wait for the job to be finished.
+     * This is called if the queue is ordered.
+     */
+    private JobEvent waitForFinish() {
+        synchronized ( this ) {
+            this.isWaiting = true;
+            this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
+            while ( this.isWaiting ) {
+                try {
+                    this.wait();
+                } catch (InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+            this.logger.debug("Job queue {} is continuing.", this.queueName);
+            final JobEvent object = this.jobEvent;
+            this.jobEvent = null;
+            return object;
+        }
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        try {
+            this.queue.put(event);
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        try {
+            return this.queue.take();
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        return this.queue.isEmpty();
+    }
+
+    @Override
+    protected void notifyFinished(final JobEvent rescheduleInfo) {
+        this.jobEvent = rescheduleInfo;
+        this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+        this.isWaiting = false;
+        synchronized ( this ) {
+            this.notify();
+        }
+    }
+
+    @Override
+    protected JobEvent reschedule(final JobEvent info) {
+        // we just sleep for the delay time - if none, we continue and retry
+        // this job again
+        long delay = this.configuration.getRetryDelayInMs();
+        if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+        }
+        if ( delay > 0 ) {
+            setSleeping(Thread.currentThread());
+            try {
+                this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                this.ignoreException(e);
+            } finally {
+                setSleeping(false);
+            }
+        }
+        return info;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.queue.clear();
+        super.clear();
+    }
+
+    @Override
+    public void removeAll() {
+        this.jobEvent = null;
+        super.removeAll();
+    }
+}
+

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+
+/**
+ * The default parallel job queue processing the entries FIFO.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class ParallelJobQueue extends AbstractParallelJobQueue {
+
+    /** The queue. */
+    private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+    public ParallelJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env, scheduler);
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        try {
+            this.queue.put(event);
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        try {
+            return this.queue.take();
+        } catch (final InterruptedException e) {
+            // this should never happen
+            this.ignoreException(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        return this.queue.isEmpty();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        this.queue.clear();
+        super.clear();
+    }
+}
+

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * This queue acts similar to the parallel job queue. Except that
+ * new jobs are selected based on a round robin topic selection scheme.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class TopicRoundRobinJobQueue extends AbstractParallelJobQueue {
+
+    /** The topic set. */
+    private final List<String> topics = new ArrayList<String>();
+
+    /** The topic map. */
+    private final Map<String, List<JobEvent>> topicMap = new HashMap<String, List<JobEvent>>();
+
+    /** Topic index. */
+    private int topicIndex;
+
+    /** Event count. */
+    private int eventCount;
+
+    private boolean isWaitingForNext = false;
+
+    public TopicRoundRobinJobQueue(final String name,
+                           final InternalQueueConfiguration config,
+                           final EnvironmentComponent env,
+                           final Scheduler scheduler) {
+        super(name, config, env, scheduler);
+    }
+
+    @Override
+    public String getStatusInfo() {
+        return super.getStatusInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+    }
+
+    @Override
+    protected boolean canBeMarkedForCleanUp() {
+        boolean result = super.canBeMarkedForCleanUp();
+        if ( result ) {
+            result = !this.isWaitingForNext;
+        }
+        return result;
+    }
+
+    @Override
+    protected void put(final JobEvent event) {
+        final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        synchronized ( this.topicMap ) {
+            List<JobEvent> events = this.topicMap.get(topic);
+            if ( events == null ) {
+                events = new LinkedList<JobEvent>();
+                this.topicMap.put(topic, events);
+                this.topics.add(topic);
+            }
+            events.add(event);
+            this.eventCount++;
+            if ( this.isWaitingForNext ) {
+                this.isWaitingForNext = false;
+                // wake up take()
+                this.topicMap.notify();
+            }
+        }
+    }
+
+    @Override
+    protected JobEvent take() {
+        JobEvent e = null;
+        synchronized ( this.topicMap ) {
+            if ( this.eventCount == 0 ) {
+                // wait for a new event
+                this.isWaitingForNext = true;
+                while ( this.isWaitingForNext ) {
+                    try {
+                        this.wait();
+                    } catch (final InterruptedException ie) {
+                        this.ignoreException(ie);
+                    }
+                }
+            }
+            if ( this.eventCount > 0 ) {
+                while ( e == null ) {
+                    final String topic = this.topics.get(this.topicIndex);
+                    final List<JobEvent> events = this.topicMap.get(topic);
+                    if ( events.size() > 0 ) {
+                        e = events.remove(0);
+                    }
+                    this.topicIndex++;
+                    if ( this.topicIndex == this.topics.size() ) {
+                        this.topicIndex = 0;
+                    }
+                }
+                this.eventCount--;
+            }
+        }
+        return e;
+    }
+
+    @Override
+    protected boolean isEmpty() {
+        synchronized ( this.topicMap ) {
+            return this.eventCount == 0;
+        }
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#clear()
+     */
+    public void clear() {
+        synchronized ( this.topicMap ) {
+            this.eventCount = 0;
+            this.topics.clear();
+            this.topicMap.clear();
+        }
+        super.clear();
+    }
+}
+

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import org.apache.sling.commons.threads.ThreadPool;
+
+/**
+ * This class provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+public class Environment {
+
+    /** Global application id. */
+    public static String APPLICATION_ID;
+
+    /** Global thread pool. */
+    public static volatile ThreadPool THREAD_POOL;
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+
+/**
+ * The job manager is the heart of the job event handling.
+ * It can be used to manage and monitor the queues.
+ * @since 3.0
+ */
+public interface JobManager {
+
+    /**
+     * Return statistics information about all queues.
+     */
+    Statistics getStatistics();
+
+    /**
+     * Return a queue with a specific name (if running)
+     * @param name The queue name
+     * @return The queue or <code>null</code>
+     */
+    Queue getQueue(String name);
+
+    /**
+     * Return an iterator for all available queues.
+     */
+    Iterable<Queue> getQueues();
+
+    /**
+     * The requested job types for the query.
+     * This can either be all jobs, all activated (started) or all queued jobs.
+     */
+    enum QueryType {
+        ALL,
+        ACTIVE,
+        QUEUED
+    }
+
+    /**
+     * Return all jobs either running or scheduled.
+     *
+     * @param type Required parameter for the type: either all jobs, only queued or only started can be returned.
+     * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+     * @param templates A list of filter property maps. Each map acts like a template. The searched job
+     *                    must match the template (AND query). By providing several maps, different filters
+     *                    are possible (OR query).
+     * @return A non null collection.
+     */
+    JobsIterator queryJobs(QueryType type, String topic, Map<String, Object>... templates);
+
+    /**
+     * Find a job - either scheduled or active.
+     * This method searches for an event with the given topic and filter properties. If more than one
+     * job matches, the first one found is returned which could be any of the matching jobs.
+     *
+     * @param topic Topic is required.
+     * @param template The map acts like a template. The searched job
+     *                    must match the template (AND query).
+     * @return An event or <code>null</code>
+     */
+    Event findJob(String topic, Map<String, Object> template);
+
+    /**
+     * Cancel this job.
+     * Cancelling a job might fail if the job is currently in processing.
+     * @param jobId The unique identifer as found in the property {@link JobUtil#JOB_ID}.
+     * @return <code>true</code> if the job could be cancelled or does not exist anymore.
+     *         <code>false</code> otherwise.
+     */
+    boolean removeJob(String jobId);
+
+    /**
+     * Cancel this job.
+     * This method acts like {@link #removeJob(String)} with the exception that it waits
+     * for a job to finish. The job will be removed when this method returns - however
+     * this method blocks until the job is finished!
+     * @param jobId The unique identifer as found in the property {@link JobUtil#JOB_ID}.
+     */
+    void forceRemoveJob(String jobId);
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import org.osgi.service.event.Event;
+
+/**
+ * A job processor processes a job in the background.
+ * It is used by {@link JobUtil#processJob(Event, JobProcessor)}.
+ * @since 3.0
+ */
+public interface JobProcessor {
+
+    /**
+     * Execute the job.
+     * If the job fails with a thrown exception/throwable, the process will not be rescheduled.
+     *
+     * @param job The event containing the job description.
+     * @return True if the job could be finished (either successful or by an error).
+     *         Return false if the job should be rescheduled.
+     */
+    boolean process(Event job);
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.support.Environment;
+import org.osgi.service.event.Event;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>Job</code> class is an utility class for
+ * creating and processing jobs.
+ * @since 3.0
+ */
+public abstract class JobUtil {
+
+    /** The job topic property. */
+    public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+
+    /** The property for the unique event name. Value is of type String (This is optional). */
+    public static final String PROPERTY_JOB_NAME = "event.job.id";
+
+    /** The property to set if a job can be run parallel to any other job.
+     * The following values are supported:
+     * - boolean value <code>true</code> and <code>false</code>
+     * - string value <code>true</code> and <code>false</code>
+     * - integer value higher than 1 - if this is specified jobs are run in
+     * parallel but never more than the specified number.
+     *
+     * We might want to use different values in the future for enhanced
+     * parallel job handling.
+     *
+     * This value is only used, if {@link JobUtil#PROPERTY_JOB_QUEUE_NAME} is
+     * specified and the referenced queue is not started yet.
+     */
+    public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
+
+    /** The property to set if a job should only be run on the same app it has been created. */
+    public static final String PROPERTY_JOB_RUN_LOCAL = "event.job.run.local";
+
+    /** The property to track the retry count for jobs. Value is of type Integer. */
+    public static final String PROPERTY_JOB_RETRY_COUNT = "event.job.retrycount";
+
+    /** The property for setting the maximum number of retries. Value is of type Integer. */
+    public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
+
+    /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
+    public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
+
+    /** The property to set to put the jobs into a separate job queue. This property
+     * specifies the name of the job queue. If the job queue does not exists yet
+     * a new queue is created.
+     * If a ordered job queue is used, the jobs are never executed in parallel
+     * from this queue! For non ordered queues the {@link #PROPERTY_JOB_PARALLEL}
+     * with an integer value higher than 1 can be used to specify the maximum number
+     * of parallel jobs for this queue.
+     */
+    public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
+
+    /** If this property is set with any value, the queue processes the jobs in the same
+     * order as they have arrived.
+     * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified
+     * and the job queue has not been started yet.
+     */
+    public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
+
+    /** This property allows to override the priority for the thread used to start this job.
+     * The property is evaluated by the {@link #processJob(Event, JobProcessor)} method.
+     * If another way of executing the job is used, it is up to the client to ensure
+     * the job priority.
+     * For possible values see {@link JobPriority}.
+     */
+    public static final String PROPERTY_JOB_PRIORITY = "event.job.priority";
+
+    /**
+     * The priority for jobs.
+     */
+    public enum JobPriority {
+        NORM,
+        MIN,
+        MAX
+    }
+
+    /** The topic for jobs. */
+    public static final String TOPIC_JOB = "org/apache/sling/event/job";
+
+    /**
+     * This is a unique identifer which can be used to cancel the job.
+     */
+    public static final String JOB_ID = "slingevent:eventId";
+
+    /**
+     * Notification events for jobs.
+     */
+
+    /** Asynchronous notification event when a job is started.
+     * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+     * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+     * timestamp of the event (as a Long).
+     */
+    public static final String TOPIC_JOB_STARTED = "org/apache/sling/event/notification/job/START";
+
+    /** Asynchronous notification event when a job is finished.
+     * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+     * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+     * timestamp of the event (as a Long).
+     */
+    public static final String TOPIC_JOB_FINISHED = "org/apache/sling/event/notification/job/FINISHED";
+
+    /** Asynchronous notification event when a job failed.
+     * If a job execution fails, it is rescheduled for another try.
+     * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+     * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+     * timestamp of the event (as a Long).
+     */
+    public static final String TOPIC_JOB_FAILED = "org/apache/sling/event/notification/job/FAILED";
+
+    /** Asynchronous notification event when a job is cancelled.
+     * If a job execution is cancelled it is not rescheduled.
+     * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+     * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+     * timestamp of the event (as a Long).
+     */
+    public static final String TOPIC_JOB_CANCELLED = "org/apache/sling/event/notification/job/CANCELLED";
+
+    /** Property containing the job event. */
+    public static final String PROPERTY_NOTIFICATION_JOB = "event.notification.job";
+
+    /**
+     * Is this a job event?
+     * This method checks if the event contains the {@link #PROPERTY_JOB_TOPIC}
+     * property.
+     * @param event The event to check.
+     * @return <code>true></code> if this is a job event.
+     */
+    public static boolean isJobEvent(final Event event) {
+        return event.getProperty(PROPERTY_JOB_TOPIC) != null;
+    }
+
+    /**
+     * Check if this a job event and return the notifier context.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+        // check if this is a job event
+        if ( !isJobEvent(job) ) {
+            return null;
+        }
+        final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+        if ( ctx == null ) {
+            throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+        }
+        return ctx;
+    }
+
+    /**
+     * Send an acknowledge.
+     * This signals the job handler that someone is starting to process the job. This method
+     * should be invoked as a first command during job processing.
+     * If this method returns <code>false</code> this means someone else is already
+     * processing this job, and the caller should not process the event anymore.
+     * @return Returns <code>true</code> if the acknowledge could be sent
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static boolean acknowledgeJob(final Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            if ( !ctx.notifier.sendAcknowledge(job) ) {
+                // if we don't get an ack, someone else is already processing this job.
+                // we process but do not notify the job event handler.
+                LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Notify a finished job.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static void finishedJob(final Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            ctx.notifier.finishedJob(job, false);
+        }
+    }
+
+    /**
+     * Notify a failed job.
+     * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static boolean rescheduleJob(final Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            return ctx.notifier.finishedJob(job, true);
+        }
+        return false;
+    }
+
+    /**
+     * Process a job in the background and notify its success.
+     * This method also sends an acknowledge message to the job event handler.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+     */
+    public static void processJob(final Event job, final JobProcessor processor) {
+        // first check for a notifier context to send an acknowledge
+        boolean notify = true;
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            if ( !ctx.notifier.sendAcknowledge(job) ) {
+                // if we don't get an ack, someone else is already processing this job.
+                // we process but do not notify the job event handler.
+                LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
+                notify = false;
+            }
+        }
+        final JobPriority priority = (JobPriority) job.getProperty(PROPERTY_JOB_PRIORITY);
+        final boolean notifyResult = notify;
+
+        final Runnable task = new Runnable() {
+
+            /**
+             * @see java.lang.Runnable#run()
+             */
+            public void run() {
+                final Thread currentThread = Thread.currentThread();
+                // update priority and name
+                final String oldName = currentThread.getName();
+                final int oldPriority = currentThread.getPriority();
+
+                currentThread.setName(oldName + "-" + job.getProperty(PROPERTY_JOB_QUEUE_NAME) + "(" + job.getProperty(PROPERTY_JOB_TOPIC) + ")");
+                if ( priority != null ) {
+                    switch ( priority ) {
+                        case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+                                    break;
+                        case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
+                                    break;
+                        case MAX  : currentThread.setPriority(Thread.MAX_PRIORITY);
+                                    break;
+                    }
+                }
+                boolean result = false;
+                try {
+                    result = processor.process(job);
+                } catch (Throwable t) { //NOSONAR
+                    LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+                    // we don't reschedule if an exception occurs
+                    result = true;
+                } finally {
+                    currentThread.setPriority(oldPriority);
+                    currentThread.setName(oldName);
+                    if ( notifyResult ) {
+                        if ( result ) {
+                            JobUtil.finishedJob(job);
+                        } else {
+                            JobUtil.rescheduleJob(job);
+                        }
+                    }
+                }
+            }
+
+        };
+        // check if the thread pool is available
+        final ThreadPool pool = Environment.THREAD_POOL;
+        if ( pool != null ) {
+            pool.execute(task);
+        } else {
+            // if we don't have a thread pool, we create the thread directly
+            // (this should never happen for jobs, but is a safe fallback and
+            // allows to call this method for other background processing.
+            new Thread(task).start();
+        }
+    }
+
+    private JobUtil() {
+        // avoid instantiation
+    }
+}
\ No newline at end of file

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import java.util.Iterator;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This <code>Iterator</code> allows to iterate over {@link Event}s.
+ * In addition to an iterator it might return the number of elements
+ * in the collection and allows to skip several elements.
+ * @since 3.0
+ */
+public interface JobsIterator extends Iterator<Event>, Iterable<Event> {
+
+    /**
+     * Skip a number of jobs.
+     * @param skipNum the non-negative number of elements to skip
+     * @throws java.util.NoSuchElementException
+     *          if skipped past the last job in the iterator.
+     */
+    void skip(long skipNum);
+
+    /**
+     * Returns the total number of jobs. In some cases a precise information
+     * is not available. In these cases -1 is returned.
+     */
+    long getSize();
+
+    /**
+     * Returns the current position within the iterator. The number returned is
+     * the 0-based index of the next job.
+     */
+    long getPosition();
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+
+/**
+ * This is a job queue processing job events.
+ * @since 3.0
+ */
+public interface Queue {
+
+    /**
+     * Get the queue name.
+     */
+    String getName();
+
+    /**
+     * Return statistics information about this queue.
+     */
+    Statistics getStatistics();
+
+    /**
+     * Return some information about the current status of the queue.
+     */
+    String getStatusInfo();
+
+    /**
+     * Get the corresponding configuration.
+     */
+    QueueConfiguration getConfiguration();
+
+    /**
+     * Suspend the queue - when a queue is suspended it stops processing
+     * jobs - however already started jobs are finished (but not rescheduled).
+     * Depending on the queue implementation, the queue is only suspended
+     * for a specific time.
+     * A queue can be resumed with {@link #resume()}.
+     */
+    void suspend();
+
+    /**
+     * Resume a suspended queue. {@link #suspend()}. If the queue is not
+     * suspended, calling this method has no effect.
+     * Depending on the queue implementation, if a job failed a job queue might
+     * sleep for a configured time, before a new job is processed. By calling this
+     * method, the job queue can be woken up and force an immediate reprocessing.
+     * This feature is only supported by ordered queues at the moment. If a queue
+     * does not support this feature, calling this method has only an effect if
+     * the queue is really supsended.
+     */
+    void resume();
+
+    /**
+     * Is the queue currently suspended?
+     */
+    boolean isSuspended();
+
+    /**
+     * Remove all outstanding jobs from the queue. This does not delete
+     * the jobs. The jobs are either processed by a different cluster node
+     * or on restart.
+     */
+    void clear();
+
+    /**
+     * Remove all outstanding jobs and delete them. This actually cancels
+     * all outstanding jobs (but no notifications are send).
+     */
+    void removeAll();
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+
+/**
+ * The configuration of a queue.
+ * @since 3.0
+ */
+public interface QueueConfiguration {
+
+    /** The queue type. */
+    static enum Type {
+        UNORDERED,
+        ORDERED,
+        TOPIC_ROUND_ROBIN,
+        IGNORE
+    }
+
+    /**
+     * Return the retry delay in ms
+     */
+    long getRetryDelayInMs();
+
+    /**
+     * Return the max number of retries, -1 for endless retry!
+     */
+    int getMaxRetries();
+
+    /**
+     * Return the queue type.
+     */
+    Type getType();
+
+    /**
+     * Return the thread priority for the job thread.
+     */
+    JobUtil.JobPriority getPriority();
+
+    /**
+     * Return the max number of parallel processes.
+     */
+    int getMaxParallel();
+
+    /**
+     * Is this a local running queue (= processing only
+     * jobs started on the same instance.)
+     */
+    boolean isLocalQueue();
+
+    /**
+     * Application ids - returns an array of application
+     * ids if this queue is bound to some cluster nodes.
+     */
+    String[] getApplicationIds();
+
+    /**
+     * The list of topics this queue is bound to.
+     */
+    String[] getTopics();
+
+    /**
+     * Get the ranking of this configuration.
+     */
+    int getRanking();
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+/**
+ * Statistic information.
+ * This information is not preserved between restarts of the service.
+ * Once a service is restarted, the counters start at zero!
+ * @since 3.0
+ */
+public interface Statistics {
+
+    /**
+     * The time this service has been started
+     */
+    long getStartTime();
+
+    /**
+     * Number of successfully finished jobs.
+     */
+    long getNumberOfFinishedJobs();
+
+    /**
+     * Number of permanently failing or cancelled jobs.
+     */
+    long getNumberOfCancelledJobs();
+
+    /**
+     * Number of failing jobs.
+     */
+    long getNumberOfFailedJobs();
+
+    /**
+     * Number of already processed jobs. This adds
+     * {@link #getNumberOfFinishedJobs()}, {@link #getNumberOfCancelledJobs()}
+     * and {@link #getNumberOfFailedJobs()}
+     */
+    long getNumberOfProcessedJobs();
+
+    /**
+     * Number of jobs currently in processing.
+     */
+    long getNumberOfActiveJobs();
+
+    /**
+     * Number of jobs currently waiting in a queue.
+     */
+    long getNumberOfQueuedJobs();
+
+    /**
+     * This just adds {@link #getNumberOfActiveJobs()} and {@link #getNumberOfQueuedJobs()}
+     */
+    long getNumberOfJobs();
+
+    /**
+     * The time a job has been started last.
+     */
+    long getLastActivatedJobTime();
+
+    /**
+     * The time a job has been finished/failed/cancelled last.
+     */
+    long getLastFinishedJobTime();
+
+    /**
+     * The average waiting time of a job in the queue.
+     */
+    long getAverageWaitingTime();
+
+    /**
+     * The average processing time of a job - this only counts finished jobs.
+     */
+    long getAverageProcessingTime();
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties (added)
+++ sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties Mon Oct 11 06:54:12 2010
@@ -0,0 +1,162 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+#
+
+#
+# This file contains localization strings for configuration labels and
+# descriptions as used in the metatype.xml descriptor generated by the
+# the SCR plugin
+
+#
+# Distributing Event Handler
+dist.events.name = Apache Sling Distributing Event Handler 
+dist.events.description = Distributes local OSGi Event Admin events to \
+ other nodes of the same cluster. The events are written to the JCR \
+ repository for distribution to other nodes while events written to the \
+ repository are picked up and distributed locally through the OSGi Event Admin \
+ Service.   
+
+scheduler.period.name = Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+
+
+#
+# Queue Configuration and Job Event Handler
+queue.name = Apache Sling Job Queue Configuration
+queue.description = The configuration of a job processing queue.
+
+queue.name.name = Name
+queue.name.description = The name of the queue. If matching is used \
+ the token \{0\} can be used to substitute the real value.
+
+queue.type.name = Type
+queue.type.description = The queue type.
+
+queue.topics.name = Topics
+queue.topics.description = This value is required and lists the topics processed by \
+ this queue. The value is a list of strings. If a string ends with a dot, \
+ all topics in exactly this package match. If the string ends with a star, \
+ all topics in this package and all subpackages match. If the string neither \
+ ends with a dot nor with a star, this is assumed to define an exact topic.
+
+queue.priority.name = Priority
+queue.priority.description = The priority for the threads from this queue. Default is norm.
+
+queue.retries.name = Maximum Retries
+queue.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+queue.retrydelay.name = Retry Delay
+queue.retrydelay.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 2 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+queue.maxparallel.name = Maximum Parallel Jobs
+queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
+ A value of -1 is substituted with the number of available processors.
+
+queue.runlocal.name = Run Local
+queue.runlocal.description = Jobs for this queue are only processed on the cluster node \
+ where the job has been started.
+ 
+queue.applicationids.name = Application Ids
+queue.applicationids.description = An optional list of application ids. If configured, \
+ jobs for this queue are only processed on those cluster nodes.
+
+ 
+#
+# Job Event Handler
+job.events.name = Apache Sling Job Event Handler 
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the repository and distributed \
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
+
+jobscheduler.period.name = Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which unused \
+ queues are stopped. The default value is 5 minutes (300 seconds).
+
+
+#
+# Persistence Handler
+job.persistence.name = Apache Sling Job Persistence Manager
+job.persistence.description = This service persists and loads jobs from the repository.
+
+persscheduler.period.name = Event Cleanup Internal
+persscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+sleep.time.name = Retry Interval
+sleep.time.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to \
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+
+max.load.jobs.name = Max Load Jobs
+max.load.jobs.description = The maximum amount of jobs being loaded from the repository on startup. \
+ Default is 1000 jobs.
+
+load.threshold.name = Load Threshold
+load.threshold.description = If the queue is lower than this threshold the repository is checked \
+ for events. The default value is 400. This works together with the maximum load jobs.
+
+load.delay.name = Background Load Delay
+load.delay.description = The background loader waits this time of seconds after startup before \
+ loading events from the repository. Default value is 30 seconds.
+
+load.checkdelay.name = Background Check Delay
+load.checkdelay.description = The background loader sleeps this time of seconds before \
+ checking the repository for jobs. Default value is 240 seconds.
+
+#
+# Event Pool
+event.pool.name = Apache Sling Event Thread Pool 
+event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+
+minPoolSize.name = Min Pool Size
+minPoolSize.description = The minimum pool size. The minimum pool size should be \
+ higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
+ allows to process 10 events in parallel.
+
+maxPoolSize.name = Max Pool Size
+maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
+ or equal to the minimum pool size.
+
+priority.name = Priority
+priority.description = The priority for the threads from this pool. Default is norm.

Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
    svn:keywords = Id

Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd (added)
+++ sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd Mon Oct 11 06:54:12 2010
@@ -0,0 +1,42 @@
+//
+//  Licensed to the Apache Software Foundation (ASF) under one
+//  or more contributor license agreements.  See the NOTICE file
+//  distributed with this work for additional information
+//  regarding copyright ownership.  The ASF licenses this file
+//  to you under the Apache License, Version 2.0 (the
+//  "License"); you may not use this file except in compliance
+//  with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing,
+//  software distributed under the License is distributed on an
+//  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//  KIND, either express or implied.  See the License for the
+//  specific language governing permissions and limitations
+//  under the License.
+//
+
+<slingevent='http://sling.apache.org/jcr/event/1.0'>
+<nt='http://www.jcp.org/jcr/nt/1.0'>
+<mix='http://www.jcp.org/jcr/mix/1.0'>
+
+[slingevent:Event] > nt:unstructured, nt:hierarchyNode
+  - slingevent:topic (string)
+  - slingevent:application (string)
+  - slingevent:created (date)
+  - slingevent:properties (binary)
+  
+[slingevent:Job] > slingevent:Event, mix:lockable
+  - slingevent:processor (string)
+  - slingevent:id (string)
+  - slingevent:finished (date)
+ 
+[slingevent:TimedEvent] > slingevent:Event, mix:lockable
+  - slingevent:processor (string)
+  - slingevent:id (string)
+  - slingevent:expression (string)
+  - slingevent:date (date)
+  - slingevent:period (long)
+
+  

Propchange: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
------------------------------------------------------------------------------
    svn:keywords = Id

Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Properties;
+
+import javax.jcr.PropertyType;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+/**
+ * Tests for the EventUtil utility methods.
+ */
+@RunWith(JMock.class)
+public class EventUtilTest {
+
+    protected Mockery context;
+
+    public EventUtilTest() {
+        this.context = new JUnit4Mockery();
+    }
+
+    @Test public void testDistributeFlag() {
+        final Event distributableEvent = EventUtil.createDistributableEvent("some/topic", null);
+        assertTrue(EventUtil.shouldDistribute(distributableEvent));
+        final Event nonDistributableEvent = new Event("another/topic", (Dictionary<String, Object>)null);
+        assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a", "a");
+        props.put("b", "b");
+        final Event distributableEvent2 = EventUtil.createDistributableEvent("some/topic", props);
+        assertTrue(EventUtil.shouldDistribute(distributableEvent2));
+        // we should have four properties: 2 custom, one for the dist flag and the fourth for the topic
+        assertEquals(4, distributableEvent2.getPropertyNames().length);
+        assertEquals("a", distributableEvent2.getProperty("a"));
+        assertEquals("b", distributableEvent2.getProperty("b"));
+    }
+
+    @Test public void testLocalFlag() {
+        final Event localEvent = new Event("local/event", (Dictionary<String, Object>)null);
+        assertTrue(EventUtil.isLocal(localEvent));
+        final Properties props = new Properties();
+        props.put(EventUtil.PROPERTY_APPLICATION, "application1");
+        final Event remoteEvent = new Event("remote/event", (Dictionary<Object, Object>)props);
+        assertFalse(EventUtil.isLocal(remoteEvent));
+    }
+
+    protected Value getValueOfType(final int type, String name) {
+        final Value v = this.context.mock(Value.class, name);
+        this.context.checking(new Expectations() {{
+            allowing(v).getType();will(returnValue(type));
+        }});
+        return v;
+    }
+
+    @Test public void testGetNodePropertyValue() {
+        final ValueFactory factory = this.context.mock(ValueFactory.class);
+        this.context.checking(new Expectations() {{
+            allowing(factory).createValue(true);
+            will(returnValue(getValueOfType(PropertyType.BOOLEAN, "booleanValue1")));
+            allowing(factory).createValue(false);
+            will(returnValue(getValueOfType(PropertyType.BOOLEAN, "booleanValue2")));
+            allowing(factory).createValue(with(any(Long.class)));
+            will(returnValue(getValueOfType(PropertyType.LONG, "longValue")));
+            allowing(factory).createValue(with(any(String.class)));
+            will(returnValue(getValueOfType(PropertyType.STRING, "stringValue")));
+            allowing(factory).createValue(with(any(Calendar.class)));
+            will(returnValue(getValueOfType(PropertyType.DATE, "dateValue")));
+        }});
+        // boolean
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, true).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, false).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
+        assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
+        // long
+        assertEquals(PropertyType.LONG, JCRHelper.getNodePropertyValue(factory, (long)5).getType());
+        // int = not possible
+        assertEquals(null, JCRHelper.getNodePropertyValue(factory, 5));
+        // string
+        assertEquals(PropertyType.STRING, JCRHelper.getNodePropertyValue(factory, "something").getType());
+        // calendar
+        assertEquals(PropertyType.DATE, JCRHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+    }
+}

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Dictionary;
+
+import org.jmock.Expectations;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractRepositoryEventHandlerTest extends AbstractTest {
+
+    protected volatile AbstractRepositoryEventHandler handler;
+
+    protected abstract AbstractRepositoryEventHandler createHandler();
+
+    protected void activate(final EventAdmin ea) throws Throwable {
+        super.activate(ea);
+        this.handler = this.createHandler();
+
+        handler.environment = this.environment;
+
+        // lets set up the bundle context
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
+
+        // lets set up the component configuration
+        final Dictionary<String, Object> componentConfig = this.getComponentConfig();
+
+        // lets set up the compnent context
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext" + activateCount);
+        this.getMockery().checking(new Expectations() {{
+            allowing(componentContext).getBundleContext();
+            will(returnValue(bundleContext));
+            allowing(componentContext).getProperties();
+            will(returnValue(componentConfig));
+        }});
+
+        this.handler.activate(componentContext);
+
+        // the session is initialized in the background, so let's sleep some seconds
+        try {
+            Thread.sleep(2 * 1000);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+    }
+
+    protected void deactivate() throws Throwable {
+        // lets set up the bundle context with the sling id
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
+
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext" + activateCount);
+        this.getMockery().checking(new Expectations() {{
+            allowing(componentContext).getBundleContext();
+            will(returnValue(bundleContext));
+        }});
+        this.handler.deactivate(componentContext);
+        this.handler = null;
+        super.deactivate();
+    }
+}

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.Set;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import junitx.util.PrivateAccessor;
+
+import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
+import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.settings.SlingSettingsService;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractTest {
+
+    protected static final String REPO_PATH = "/test/events";
+    protected static final String SLING_ID = "4711";
+
+    protected static Session session;
+
+    protected abstract Mockery getMockery();
+
+    protected EnvironmentComponent environment;
+
+    protected Hashtable<String, Object> getComponentConfig() {
+        final Hashtable<String, Object> config = new Hashtable<String, Object>();
+        config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
+
+        return config;
+    }
+
+    @org.junit.BeforeClass public static void setupRepository() throws Exception {
+        RepositoryTestUtil.startRepository();
+        final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
+        session = repository.loginAdministrative(repository.getDefaultWorkspace());
+        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+        assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+        if ( session.itemExists(REPO_PATH) ) {
+            session.getItem(REPO_PATH).remove();
+            session.save();
+        }
+    }
+
+    @org.junit.AfterClass public static void shutdownRepository() throws Exception {
+        if ( session != null ) {
+            session.logout();
+            session = null;
+        }
+        RepositoryTestUtil.stopRepository();
+    }
+
+    @org.junit.Before public void setup() throws Throwable {
+        // activate
+        this.activate(null);
+    }
+
+    protected int activateCount = 1;
+
+    protected void activate(final EventAdmin ea) throws Throwable {
+        this.environment = new EnvironmentComponent();
+        PrivateAccessor.setField(this.environment, "repository", RepositoryTestUtil.getSlingRepository());
+        PrivateAccessor.setField(this.environment, "classLoaderManager", new DynamicClassLoaderManager() {
+
+            public ClassLoader getDynamicClassLoader() {
+                return this.getClass().getClassLoader();
+            }
+        });
+
+        // the event admin
+        if ( ea != null ) {
+            PrivateAccessor.setField(this.environment, "eventAdmin", ea);
+        } else {
+            final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
+            PrivateAccessor.setField(this.environment, "eventAdmin", eventAdmin);
+            this.getMockery().checking(new Expectations() {{
+                allowing(eventAdmin).postEvent(with(any(Event.class)));
+                allowing(eventAdmin).sendEvent(with(any(Event.class)));
+            }});
+        }
+        // sling settings service
+        PrivateAccessor.setField(this.environment, "settingsService", new SlingSettingsService() {
+            public String getSlingId() {
+                return SLING_ID;
+            }
+
+            public URL getSlingHome() {
+                return null;
+            }
+
+            public String getSlingHomePath() {
+                return null;
+            }
+
+            public Set<String> getRunModes() {
+                return Collections.<String> emptySet();
+            }
+        });
+
+        // we need a thread pool
+        PrivateAccessor.setField(this.environment, "threadPool", new ThreadPoolImpl());
+        this.environment.activate();
+    }
+
+    protected void deactivate() throws Throwable {
+        this.environment.deactivate();
+        this.environment = null;
+        activateCount++;
+    }
+
+    protected void setEventAdmin(final EventAdmin ea) throws Exception {
+        PrivateAccessor.setField(this.environment, "eventAdmin", ea);
+    }
+
+    @org.junit.After public void shutdown() throws Throwable {
+        this.deactivate();
+        try {
+            // delete all child nodes to get a clean repository again
+            final Node rootNode = (Node) session.getItem(REPO_PATH);
+            final NodeIterator iter = rootNode.getNodes();
+            while ( iter.hasNext() ) {
+                final Node child = iter.nextNode();
+                child.remove();
+            }
+            session.save();
+        } catch ( RepositoryException re) {
+            // we ignore this for the test
+        }
+    }
+
+    @org.junit.Test public void testPathCreation() throws RepositoryException {
+        assertTrue(session.itemExists(REPO_PATH));
+    }
+
+    final class ThreadPoolImpl implements ThreadPool {
+
+        public void execute(Runnable runnable) {
+            final Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        public String getName() {
+            return "default";
+        }
+
+        public ThreadPoolConfig getConfiguration() {
+            return new ModifiableThreadPoolConfig();
+        }
+
+    }
+}

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Simplified version of the cyclic barrier class for testing. */
+public class Barrier extends CyclicBarrier {
+
+    public Barrier(int parties) {
+        super(parties);
+    }
+
+    public void block() {
+        try {
+            this.await();
+        } catch (InterruptedException e) {
+            // ignore
+        } catch (BrokenBarrierException e) {
+            // ignore
+        }
+    }
+
+    public boolean block(int seconds) {
+        try {
+            this.await(seconds, TimeUnit.SECONDS);
+            return true;
+        } catch (InterruptedException e) {
+            // ignore
+        } catch (BrokenBarrierException e) {
+            // ignore
+        } catch (TimeoutException e) {
+            // ignore
+        }
+        this.reset();
+        return false;
+    }
+}

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message