sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r1615054 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: BackgroundLoader.java JobManagerConfiguration.java console/WebConsolePlugin.java
Date Fri, 01 Aug 2014 09:16:06 GMT
Author: cziegeler
Date: Fri Aug  1 09:16:05 2014
New Revision: 1615054

URL: http://svn.apache.org/r1615054
Log:
SLING-3812 : Avoid search when loading jobs

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java?rev=1615054&r1=1615053&r2=1615054&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
Fri Aug  1 09:16:05 2014
@@ -18,9 +18,13 @@
  */
 package org.apache.sling.event.impl.jobs;
 
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -85,16 +89,21 @@ public class BackgroundLoader implements
     /** Boolean to detect the initial start. */
     private boolean firstRun = true ;
 
+    /** Use search or traverse? */
+    private boolean useSearch;
+
     /**
      * Create and activate the loader.
      */
     public BackgroundLoader(final JobManagerImpl jobManagerImpl,
             final JobManagerConfiguration configuration2,
             final ResourceResolverFactory resourceResolverFactory2) {
+        this.useSearch = JobManagerConfiguration.DEFAULT_BACKGROUND_LOAD_SEARCH;
         this.resourceResolverFactory = resourceResolverFactory2;
         this.configuration = configuration2;
         this.jobManager = jobManagerImpl;
         this.active.set(true);
+        logger.debug("Activating Sling Job Background Loader");
         // start background thread
         final Thread loaderThread = new Thread(this, "Apache Sling Job Background Loader");
         loaderThread.setDaemon(true);
@@ -105,6 +114,7 @@ public class BackgroundLoader implements
      * Deactivate the loader.
      */
     public void deactivate() {
+        logger.debug("Deactivating Sling Job Background Loader");
         this.active.set(false);
         // make sure to stop background thread
         synchronized ( this.loadLock ) {
@@ -132,6 +142,7 @@ public class BackgroundLoader implements
      */
     public void start() {
         synchronized ( this.loadLock ) {
+            logger.debug("Starting Sling Job Background Loader");
             this.running = true;
             // make sure to clear out old information
             this.actionQueue.clear();
@@ -148,6 +159,7 @@ public class BackgroundLoader implements
      */
     public void stop() {
         synchronized ( this.loadLock ) {
+            logger.debug("Stopping Sling Job Background Loader");
             this.running = false;
         }
         // stop action queue
@@ -164,6 +176,7 @@ public class BackgroundLoader implements
      */
     public void restart() {
         if ( this.isRunning() ) {
+            logger.debug("Restarting Sling Job Background Loader");
             this.stop();
             this.start();
         }
@@ -171,90 +184,100 @@ public class BackgroundLoader implements
 
     @Override
     public void run() {
-        while ( this.active.get() ) {
-            final long startTime;
-            // we have to wait to get started
-            synchronized ( this.loadLock ) {
-                while ( this.active.get() && !this.running ) {
-                    try {
-                        this.loadLock.wait();
-                    } catch (final InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        this.active.set(false);
+        logger.debug("Started Sling Job Background Thread");
+        try {
+            while ( this.active.get() ) {
+                final long startTime;
+                // we have to wait to get started
+                synchronized ( this.loadLock ) {
+                    while ( this.active.get() && !this.running ) {
+                        logger.debug("Sling Job Background Thread is waiting to be started");
+                        try {
+                            this.loadLock.wait();
+                        } catch (final InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            this.active.set(false);
+                        }
                     }
+                    startTime = System.currentTimeMillis();
                 }
-                startTime = System.currentTimeMillis();
-            }
 
-            // give the system some time to start
-            if ( this.isRunning() ) {
-                synchronized ( this.stopLock ) {
+                // give the system some time to start
+                if ( this.isRunning() ) {
+                    logger.debug("Sling Job Background Thread is waiting for system to be
ready");
+                    synchronized ( this.stopLock ) {
+                        try {
+                            this.stopLock.wait(1000 * this.configuration.getBackgroundLoadDelay());
+                        } catch (final InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            this.active.set(false);
+                        }
+                    }
+                }
+
+                // load jobs from the resource tree
+                if ( this.isRunning() ) {
+                    logger.debug("Sling Job Background Thread starts loading jobs");
+                    this.loadJobsInTheBackground(startTime);
+                }
+                // if we're still running we can clear the first run flag
+                if ( this.isRunning() ) {
+                    this.firstRun = false;
+                }
+                // and finally process the action queue
+                while ( this.isRunning() ) {
+                    Object nextPathOrJob = null;
                     try {
-                        this.stopLock.wait(1000 * this.configuration.getBackgroundLoadDelay());
+                        nextPathOrJob = this.actionQueue.take();
                     } catch (final InterruptedException e) {
+                        this.ignoreException(e);
                         Thread.currentThread().interrupt();
                         this.active.set(false);
                     }
-                }
-            }
-
-            // load jobs from the resource tree
-            if ( this.isRunning() ) {
-                this.loadJobsInTheBackground(startTime);
-            }
-            // if we're still running we can clear the first run flag
-            if ( this.isRunning() ) {
-                this.firstRun = false;
-            }
-            // and finally process the action queue
-            while ( this.isRunning() ) {
-                Object nextPathOrJob = null;
-                try {
-                    nextPathOrJob = this.actionQueue.take();
-                } catch (final InterruptedException e) {
-                    this.ignoreException(e);
-                    Thread.currentThread().interrupt();
-                    this.active.set(false);
-                }
-                if ( nextPathOrJob instanceof JobImpl ) {
-                    this.jobManager.process((JobImpl)nextPathOrJob);
-                } else if ( nextPathOrJob instanceof String ) {
-                    final String path = (String)nextPathOrJob;
-                    if ( !END_TOKEN.equals(path) && this.isRunning() ) {
-                        ResourceResolver resolver = null;
-                        try {
-                            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-                            final Resource resource = resolver.getResource(path);
-                            if ( resource == null ) {
-                                // this should actually never happen, just a sanity check
(see SLING-2971)
-                                logger.warn("No job resource found for path {}. Potential
job will not be processed.", path);
-                            } else {
-                                if (ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType())
) {
-                                    this.logger.debug("Reading local job from {}", path);
-                                    final JobImpl job = this.jobManager.readJob(resource);
-                                    if ( job != null ) {
-                                        if ( job.hasReadErrors() ) {
-                                            synchronized ( this.unloadedJobs ) {
-                                                this.unloadedJobs.add(path);
+                    if ( nextPathOrJob instanceof JobImpl ) {
+                        this.jobManager.process((JobImpl)nextPathOrJob);
+                    } else if ( nextPathOrJob instanceof String ) {
+                        final String path = (String)nextPathOrJob;
+                        if ( !END_TOKEN.equals(path) && this.isRunning() ) {
+                            ResourceResolver resolver = null;
+                            try {
+                                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                                final Resource resource = resolver.getResource(path);
+                                if ( resource == null ) {
+                                    // this should actually never happen, just a sanity check
(see SLING-2971)
+                                    logger.warn("No job resource found for path {}. Potential
job will not be processed.", path);
+                                } else {
+                                    if (ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType())
) {
+                                        this.logger.debug("Reading local job from {}", path);
+                                        final JobImpl job = this.jobManager.readJob(resource);
+                                        if ( job != null ) {
+                                            if ( job.hasReadErrors() ) {
+                                                synchronized ( this.unloadedJobs ) {
+                                                    this.unloadedJobs.add(path);
+                                                }
+                                            } else {
+                                                this.jobManager.process(job);
                                             }
-                                        } else {
-                                            this.jobManager.process(job);
                                         }
                                     }
                                 }
-                            }
-                        } catch ( final LoginException le ) {
-                            // administrative login should always work
-                            this.ignoreException(le);
-                        } finally {
-                            if ( resolver != null ) {
-                                resolver.close();
+                            } catch ( final LoginException le ) {
+                                // administrative login should always work
+                                this.ignoreException(le);
+                            } finally {
+                                if ( resolver != null ) {
+                                    resolver.close();
+                                }
                             }
                         }
                     }
                 }
             }
+        } catch ( final Throwable t) {
+            // make sure we at least log each unexpected exiting
+            logger.error("Unexpected error in background loader thread." + t.getMessage(),
t);
         }
+        logger.debug("Stopped Sling Job Background Thread");
     }
 
     /**
@@ -263,50 +286,240 @@ public class BackgroundLoader implements
     private void loadJobsInTheBackground(final long startTime) {
         logger.debug("Starting background loading...");
 
-        ResourceResolver resolver = null;
         long count = 0;
-        try {
-            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Calendar startDate = Calendar.getInstance();
-            startDate.setTimeInMillis(startTime);
-
-            final StringBuilder buf = new StringBuilder(64);
-
-            buf.append("//element(*,");
-            buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
-            buf.append(")[@");
-            buf.append(ISO9075.encode(Job.PROPERTY_JOB_TARGET_INSTANCE));
-            buf.append(" = '");
-            buf.append(Environment.APPLICATION_ID);
-            buf.append("' and @");
-            buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
-            buf.append(" < xs:dateTime('");
-            buf.append(ISO8601.format(startDate));
-            buf.append("')");
-            buf.append("] order by @");
-            buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
-            buf.append(" ascending");
-            if ( this.isRunning() ) {
-                final Iterator<Resource> result = resolver.findResources(buf.toString(),
"xpath");
-
-                while ( this.isRunning() && result.hasNext() ) {
-                    final Resource jobResource = result.next();
-                    if ( this.loadJobInTheBackground(jobResource) ) {
-                        count++;
+
+        if ( this.useSearch ) {
+            logger.debug("Using search for background loading...");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+                final Calendar startDate = Calendar.getInstance();
+                startDate.setTimeInMillis(startTime);
+
+                final StringBuilder buf = new StringBuilder(64);
+
+                buf.append("//element(*,");
+                buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+                buf.append(")[@");
+                buf.append(ISO9075.encode(Job.PROPERTY_JOB_TARGET_INSTANCE));
+                buf.append(" = '");
+                buf.append(Environment.APPLICATION_ID);
+                buf.append("' and @");
+                buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+                buf.append(" < xs:dateTime('");
+                buf.append(ISO8601.format(startDate));
+                buf.append("')");
+                buf.append("] order by @");
+                buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+                buf.append(" ascending");
+                if ( this.isRunning() ) {
+                    final Iterator<Resource> result = resolver.findResources(buf.toString(),
"xpath");
+
+                    while ( this.isRunning() && result.hasNext() ) {
+                        final Resource jobResource = result.next();
+                        if ( this.loadJobInTheBackground(jobResource) ) {
+                            count++;
+                        }
                     }
                 }
+            } catch (final QuerySyntaxException qse) {
+                this.ignoreException(qse);
+            } catch (final LoginException le) {
+                this.ignoreException(le);
+            } catch (final UnsupportedOperationException t ) {
+                // this is thrown by Oak if the search is taking "too long"
+                this.logger.error("Unexpected unsupported operation exception. This is most
probably because of Apache Jackrabbit Oak " +
+                                  "complaining about to long running query. Switching to
traversal now.");
+                this.useSearch = false;
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
             }
-        } catch (final QuerySyntaxException qse) {
-            this.ignoreException(qse);
-        } catch (final LoginException le) {
-            this.ignoreException(le);
-        } finally {
-            if ( resolver != null ) {
-                resolver.close();
+        }
+        if ( !useSearch ) {
+            logger.debug("Using traversal for background loading...");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+                final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
+
+                final Comparator<Resource> resourceComparator = new Comparator<Resource>()
{
+
+                    @Override
+                    public int compare(final Resource o1, final Resource o2) {
+                        final int value1 = Integer.valueOf(o1.getName());
+                        final int value2 = Integer.valueOf(o2.getName());
+                        if ( value1 < value2 ) {
+                            return -1;
+                        } else if ( value1 > value2 ) {
+                            return 1;
+                        }
+                        return 0;
+                    }
+                };
+
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final Calendar now = Calendar.getInstance();
+                    now.setTimeInMillis(startTime);
+
+                    final Iterator<Resource> topicIter = baseResource.listChildren();
+                    while ( this.isRunning() && topicIter.hasNext() ) {
+                        final Resource topicResource = topicIter.next();
+                        logger.debug("Processing topic {}", topicResource.getName());
+
+                        // now years
+                        final List<Resource> years = new ArrayList<Resource>();
+                        final Iterator<Resource> yearIter = topicResource.listChildren();
+                        while ( this.isRunning() && yearIter.hasNext() ) {
+                            final Resource yearResource = yearIter.next();
+                            years.add(yearResource);
+                            logger.debug("Found year {}", yearResource.getName());
+                        }
+                        Collections.sort(years, resourceComparator);
+
+                        for(final Resource yearResource: years) {
+                            final int year = Integer.valueOf(yearResource.getName());
+                            if ( year > now.get(Calendar.YEAR) ) {
+                                logger.debug("Skipping year {}", year);
+                                continue;
+                            }
+                            logger.debug("Processing year {}", year);
+
+                            // now months
+                            final List<Resource> months = new ArrayList<Resource>();
+                            final Iterator<Resource> monthIter = yearResource.listChildren();
+                            while ( this.isRunning() && monthIter.hasNext() ) {
+                                final Resource monthResource = monthIter.next();
+                                months.add(monthResource);
+                                logger.debug("Found month {}",  monthResource.getName());
+                            }
+                            Collections.sort(months, resourceComparator);
+
+                            for(final Resource monthResource: months) {
+                                final int month = Integer.valueOf(monthResource.getName());
+                                if ( year == now.get(Calendar.YEAR) && (month >
now.get(Calendar.MONTH) + 1)) {
+                                    logger.debug("Skipping month {}", month);
+                                    continue;
+                                }
+                                logger.debug("Processing month {}", month);
+
+                                // now days
+                                final List<Resource> days = new ArrayList<Resource>();
+                                final Iterator<Resource> dayIter = monthResource.listChildren();
+                                while ( this.isRunning() && dayIter.hasNext() ) {
+                                    final Resource dayResource = dayIter.next();
+                                    days.add(dayResource);
+                                    logger.debug("Found day {}",  dayResource.getName());
+                                }
+                                Collections.sort(days, resourceComparator);
+
+                                for(final Resource dayResource: days) {
+                                    final int day = Integer.valueOf(dayResource.getName());
+
+                                    if ( year == now.get(Calendar.YEAR)
+                                         && month == now.get(Calendar.MONTH) + 1
+                                         && day > now.get(Calendar.DAY_OF_MONTH)
) {
+                                        logger.debug("Skipping day {}", day);
+                                        continue;
+                                    }
+                                    logger.debug("Processing day {}", day);
+
+                                    // now hours
+                                    final List<Resource> hours = new ArrayList<Resource>();
+                                    final Iterator<Resource> hourIter = dayResource.listChildren();
+                                    while ( this.isRunning() && hourIter.hasNext()
) {
+                                        final Resource hourResource = hourIter.next();
+                                        hours.add(hourResource);
+                                        logger.debug("Found hour {}",  hourResource.getName());
+                                    }
+                                    Collections.sort(hours, resourceComparator);
+
+                                    for(final Resource hourResource: hours) {
+                                        final int hour = Integer.valueOf(hourResource.getName());
+
+                                        if ( year == now.get(Calendar.YEAR)
+                                             && month == now.get(Calendar.MONTH)
+ 1
+                                             && day == now.get(Calendar.DAY_OF_MONTH)
+                                             && hour > now.get(Calendar.HOUR_OF_DAY)
) {
+                                            logger.debug("Skipping hour {}", hour);
+                                            continue;
+                                        }
+                                        logger.debug("Processing hour {}", hour);
+
+                                        // now minutes
+                                        final List<Resource> minutes = new ArrayList<Resource>();
+                                        final Iterator<Resource> minuteIter = hourResource.listChildren();
+                                        while ( this.isRunning() && minuteIter.hasNext()
) {
+                                            final Resource minuteResource = minuteIter.next();
+                                            minutes.add(minuteResource);
+                                            logger.debug("Found minute {}",  minuteResource.getName());
+                                        }
+                                        Collections.sort(minutes, resourceComparator);
+
+                                        for(final Resource minuteResource: minutes) {
+                                            final int minute = Integer.valueOf(minuteResource.getName());
+
+                                            if ( year == now.get(Calendar.YEAR)
+                                                 && month == now.get(Calendar.MONTH)
+ 1
+                                                 && day == now.get(Calendar.DAY_OF_MONTH)
+                                                 && hour == now.get(Calendar.HOUR_OF_DAY)
+                                                 && minute > now.get(Calendar.MINUTE)
) {
+                                                logger.debug("Skipping minute {}", minute);
+                                                continue;
+                                            }
+                                            logger.debug("Processing minute {}", minute);
+
+                                            // now jobs
+                                            final List<JobImpl> jobs = new ArrayList<JobImpl>();
+                                            final Iterator<Resource> jobIter = minuteResource.listChildren();
+                                            while ( this.isRunning() && jobIter.hasNext()
) {
+                                                final Resource jobResource = jobIter.next();
+
+                                                final JobImpl job = this.jobManager.readJob(jobResource);
+                                                if ( job != null && job.getCreated().compareTo(now)
<= 0 ) {
+                                                    logger.debug("Found job {}", jobResource.getName());
+                                                    jobs.add(job);
+                                                } else {
+                                                    logger.debug("Skipping job {}", jobResource.getName());
+                                                }
+                                            }
+
+                                            Collections.sort(jobs, new Comparator<Job>()
{
+
+                                                @Override
+                                                public int compare(final Job o1, final Job
o2) {
+                                                    return o1.getCreated().compareTo(o2.getCreated());
+                                                }
+                                            });
+
+                                            for(final JobImpl job : jobs) {
+                                                final Resource jobResource = resolver.getResource(job.getResourcePath());
+                                                if ( jobResource != null &&  this.isRunning()
&& this.loadJobInTheBackground(jobResource) ) {
+                                                    count++;
+                                                }
+                                            }
+
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (final LoginException le) {
+                this.ignoreException(le);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
             }
         }
 
-        logger.debug("Finished background loading of {} jobs.", count);
+        logger.info("Finished background loading of {} jobs.", count);
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1615054&r1=1615053&r2=1615054&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
Fri Aug  1 09:16:05 2014
@@ -55,6 +55,9 @@ public class JobManagerConfiguration {
     /** Configuration property for the scheduled jobs path. */
     public static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
 
+    /** Default value for background loading. */
+    public static final boolean DEFAULT_BACKGROUND_LOAD_SEARCH = true;
+
     /** The jobs base path with a slash. */
     private String jobsBasePathWithSlash;
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1615054&r1=1615053&r2=1615054&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
(original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
Fri Aug  1 09:16:05 2014
@@ -213,6 +213,19 @@ public class WebConsolePlugin extends Ht
         String topics = this.jobConsumerManager.getTopics();
         if ( topics == null ) {
             topics = "";
+        } else {
+            final String[] allTopics = topics.split(",");
+            final StringBuilder sb = new StringBuilder();
+            boolean first = true;
+            for(final String t : allTopics) {
+                if ( first) {
+                    first = false;
+                } else {
+                    sb.append("<br/>");
+                }
+                sb.append(t);
+            }
+            topics = sb.toString();
         }
         Statistics s = this.jobManager.getStatistics();
         pw.printf("<tr><td>Start Time</td><td>%s</td></tr>",
formatDate(s.getStartTime()));



Mime
View raw message