camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1143974 - in /camel/branches/camel-2.7.x: ./ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/test/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/impl/ camel-core/src/test/java/org/apache/came...
Date Thu, 07 Jul 2011 18:57:25 GMT
Author: dkulp
Date: Thu Jul  7 18:57:25 2011
New Revision: 1143974

URL: http://svn.apache.org/viewvc?rev=1143974&view=rev
Log:
Merged revisions 1136290 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1136290 | davsclaus | 2011-06-16 01:22:02 -0400 (Thu, 16 Jun 2011) | 1 line
  
  CAMEL-4105: ScheduledPollConsumer improved run thread to catch Error to avoid thread from
dying. Added runLoggingLevel to control log level for start/complete logs for when the poll
task runs. Changed default to useFixedDelay.
........

Added:
    camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeRunLoggingLevelTest.java
      - copied unchanged from r1136290, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeRunLoggingLevelTest.java
Modified:
    camel/branches/camel-2.7.x/   (props changed)
    camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
    camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java
    camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jul  7 18:57:25 2011
@@ -1 +1 @@
-/camel/trunk:1083696,1084150,1085905,1086231,1087276,1087612,1087856,1088583,1088916-1088917,1089275,1090166,1090960-1090969,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094156,1095405,1095469,1095471,1095475-1095476,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1138285,1139163,1143925
+/camel/trunk:1083696,1084150,1085905,1086231,1087276,1087612,1087856,1088583,1088916-1088917,1089275,1090166,1090960-1090969,1091518,1091771,1091799,1092068,1092577,1092667,1093978,1094156,1095405,1095469,1095471,1095475-1095476,1097909,1097912,1097978,1098630,1099417,1100975,1102162,1102181,1104076,1124497,1127744,1127988,1131411,1134252,1134501,1135223,1135364,1136290,1138285,1139163,1143925

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1143974&r1=1143973&r2=1143974&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++ camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Thu Jul  7 18:57:25 2011
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
@@ -43,8 +44,9 @@ public abstract class ScheduledPollConsu
     private long initialDelay = 1000;
     private long delay = 500;
     private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
-    private boolean useFixedDelay;
+    private boolean useFixedDelay = true;
     private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
+    private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -65,6 +67,44 @@ public abstract class ScheduledPollConsu
      * Invoked whenever we should be polled
      */
     public void run() {
+        // avoid this thread to throw exceptions because the thread pool wont re-schedule
a new thread
+        try {
+            // log starting
+            if (LoggingLevel.ERROR == runLoggingLevel) {
+                LOG.error("Scheduled task started on:   {}", this.getEndpoint());
+            } else if (LoggingLevel.WARN == runLoggingLevel) {
+                LOG.warn("Scheduled task started on:   {}", this.getEndpoint());
+            } else if (LoggingLevel.INFO == runLoggingLevel) {
+                LOG.info("Scheduled task started on:   {}", this.getEndpoint());
+            } else if (LoggingLevel.DEBUG == runLoggingLevel) {
+                LOG.debug("Scheduled task started on:   {}", this.getEndpoint());
+            } else {
+                LOG.trace("Scheduled task started on:   {}", this.getEndpoint());
+            }
+
+            // execute scheduled task
+            doRun();
+
+            // log completed
+            if (LoggingLevel.ERROR == runLoggingLevel) {
+                LOG.error("Scheduled task completed on: {}", this.getEndpoint());
+            } else if (LoggingLevel.WARN == runLoggingLevel) {
+                LOG.warn("Scheduled task completed on: {}", this.getEndpoint());
+            } else if (LoggingLevel.INFO == runLoggingLevel) {
+                LOG.info("Scheduled task completed on: {}", this.getEndpoint());
+            } else if (LoggingLevel.DEBUG == runLoggingLevel) {
+                LOG.debug("Scheduled task completed on: {}", this.getEndpoint());
+            } else {
+                LOG.trace("Scheduled task completed on: {}", this.getEndpoint());
+            }
+
+        } catch (Error e) {
+            // must catch Error, to ensure the task is re-scheduled
+            LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint()
+ ", due: " + e.getMessage(), e);
+        }
+    }
+
+    private void doRun() {
         if (isSuspended()) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Cannot start to poll: " + this.getEndpoint() + " as its suspended");
@@ -170,6 +210,14 @@ public abstract class ScheduledPollConsu
         this.useFixedDelay = useFixedDelay;
     }
 
+    public LoggingLevel getRunLoggingLevel() {
+        return runLoggingLevel;
+    }
+
+    public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
+        this.runLoggingLevel = runLoggingLevel;
+    }
+
     public PollingConsumerPollStrategy getPollStrategy() {
         return pollStrategy;
     }
@@ -192,6 +240,9 @@ public abstract class ScheduledPollConsu
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+        ObjectHelper.notNull(executor, "executor", this);
+        ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
+
         if (isUseFixedDelay()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Scheduling poll (fixed delay) with initialDelay: " + getInitialDelay()
+ ", delay: " + getDelay()
@@ -210,6 +261,7 @@ public abstract class ScheduledPollConsu
     @Override
     protected void doStop() throws Exception {
         if (future != null) {
+            LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future);
             future.cancel(false);
         }
         super.doStop();

Modified: camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1143974&r1=1143973&r2=1143974&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
(original)
+++ camel/branches/camel-2.7.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Thu Jul  7 18:57:25 2011
@@ -91,7 +91,8 @@ public abstract class ScheduledPollEndpo
         Object timeUnit = options.remove("timeUnit");
         Object useFixedDelay = options.remove("useFixedDelay");
         Object pollStrategy = options.remove("pollStrategy");
-        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay !=
null || pollStrategy != null) {
+        Object runLoggingLevel = options.remove("runLoggingLevel");
+        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay !=
null || pollStrategy != null || runLoggingLevel != null) {
             if (consumerProperties == null) {
                 consumerProperties = new HashMap<String, Object>();
             }
@@ -110,6 +111,9 @@ public abstract class ScheduledPollEndpo
             if (pollStrategy != null) {
                 consumerProperties.put("pollStrategy", pollStrategy);
             }
+            if (runLoggingLevel != null) {
+                consumerProperties.put("runLoggingLevel", runLoggingLevel);
+            }
         }
     }
 }

Modified: camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java?rev=1143974&r1=1143973&r2=1143974&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java
(original)
+++ camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/impl/DefaultComponentValidateURITest.java
Thu Jul  7 18:57:25 2011
@@ -76,7 +76,7 @@ public class DefaultComponentValidateURI
         endpint = context.getEndpoint("file://target/foo?consumer.delay=1000&consumer.initialDelay=5000");
         assertNotNull(endpint);
 
-        endpint = context.getEndpoint("file://target/foo?consumer.delay=1000&consumer.initialDelay=5000&consumer.useFixedDelay=true");
+        endpint = context.getEndpoint("file://target/foo?consumer.delay=1000&consumer.initialDelay=5000&consumer.useFixedDelay=false");
         assertNotNull(endpint);
 
         // without consumer. prefix
@@ -86,11 +86,11 @@ public class DefaultComponentValidateURI
         endpint = context.getEndpoint("file://foo2?delay=1000&initialDelay=5000");
         assertNotNull(endpint);
 
-        endpint = context.getEndpoint("file://foo2?delay=1000&initialDelay=5000&useFixedDelay=true");
+        endpint = context.getEndpoint("file://foo2?delay=1000&initialDelay=5000&useFixedDelay=false");
         assertNotNull(endpint);
 
         // combined with and without consumer. prefix
-        endpint = context.getEndpoint("file://foo3?delay=1000&consumer.initialDelay=5000&useFixedDelay=true");
+        endpint = context.getEndpoint("file://foo3?delay=1000&consumer.initialDelay=5000&useFixedDelay=false");
         assertNotNull(endpint);
     }
 

Modified: camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java?rev=1143974&r1=1143973&r2=1143974&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java
(original)
+++ camel/branches/camel-2.7.x/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java
Thu Jul  7 18:57:25 2011
@@ -48,7 +48,7 @@ public class ManagedScheduledPollConsume
         assertEquals(1000, initialDelay.longValue());
 
         Boolean fixedDelay = (Boolean) mbeanServer.getAttribute(on, "UseFixedDelay");
-        assertEquals(Boolean.FALSE, fixedDelay);
+        assertEquals(Boolean.TRUE, fixedDelay);
 
         String timeUnit = (String) mbeanServer.getAttribute(on, "TimeUnit");
         assertEquals(TimeUnit.MILLISECONDS.toString(), timeUnit);
@@ -69,9 +69,9 @@ public class ManagedScheduledPollConsume
         assertEquals(2000, delay.longValue());
 
         // change some options
-        mbeanServer.setAttribute(on, new Attribute("UseFixedDelay", Boolean.TRUE));
+        mbeanServer.setAttribute(on, new Attribute("UseFixedDelay", Boolean.FALSE));
         fixedDelay = (Boolean) mbeanServer.getAttribute(on, "UseFixedDelay");
-        assertEquals(Boolean.TRUE, fixedDelay);
+        assertEquals(Boolean.FALSE, fixedDelay);
 
         mbeanServer.setAttribute(on, new Attribute("TimeUnit", TimeUnit.SECONDS.name()));
         timeUnit = (String) mbeanServer.getAttribute(on, "TimeUnit");



Mime
View raw message