camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/4] git commit: CAMEL-6635: PollingConsumer from a scheduled consumer such as file/ftp can use a regular thread pool instead of being scheduled
Date Wed, 14 Aug 2013 11:01:03 GMT
Updated Branches:
  refs/heads/master 99365506f -> 12fd86170


CAMEL-6635: PollingConsumer from a scheduled consumer such as file/ftp can use a regular thread
pool instead of being scheduled


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f48fb0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f48fb0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f48fb0

Branch: refs/heads/master
Commit: 36f48fb0942630cb1179b2eb0bb474affb5b3742
Parents: 9936550
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Wed Aug 14 11:43:10 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Aug 14 11:58:10 2013 +0200

----------------------------------------------------------------------
 .../DefaultScheduledPollConsumerScheduler.java  |   7 ++
 .../camel/impl/EventDrivenPollingConsumer.java  |   5 +-
 .../camel/impl/ScheduledPollConsumer.java       |  29 +++---
 .../camel/impl/ScheduledPollEndpoint.java       |   1 +
 .../SingleScheduledPollConsumerScheduler.java   | 103 +++++++++++++++++++
 .../spi/ScheduledPollConsumerScheduler.java     |   7 ++
 .../file/FileConsumerCustomSchedulerTest.java   |   5 +
 .../QuartzScheduledPollConsumerScheduler.java   |  12 +++
 8 files changed, 152 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 28d1cc0..8c0af76 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -98,6 +98,13 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
     }
 
     @Override
+    public void unscheduleTask() {
+        if (future != null) {
+            future.cancel(false);
+        }
+    }
+
+    @Override
     public void startScheduler() {
         // only schedule task if we have not already done that
         if (future == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index b257f28..296c8c4 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -143,10 +143,9 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport
implement
         if (consumer instanceof PollingConsumerPollingStrategy) {
             PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
             strategy.onInit();
-        } else {
-            // for regular consumers start it
-            ServiceHelper.startService(consumer);
         }
+
+        ServiceHelper.startService(consumer);
     }
 
     protected void doStop() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 2bb28b3..1670354 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -27,7 +27,6 @@ import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
-import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
 import org.apache.camel.spi.ScheduledPollConsumerScheduler;
@@ -398,7 +397,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements
R
             scheduler = new DefaultScheduledPollConsumerScheduler();
         }
         scheduler.setCamelContext(getEndpoint().getCamelContext());
-        scheduler.scheduleTask(this, this);
+
+        if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) {
+            // schedule task if its not the single scheduled
+            scheduler.scheduleTask(this, this);
+        }
 
         // configure scheduler with options from this consumer
         Map<String, Object> properties = new HashMap<String, Object>();
@@ -454,28 +457,26 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
implements R
 
     @Override
     public void onInit() throws Exception {
-        // noop
+        // use a single scheduler so we do not have it running it periodically when we use
+        // this consumer as a EventDrivenPollingConsumer
+        scheduler = new SingleScheduledPollConsumerScheduler(this);
     }
 
     @Override
     public long beforePoll(long timeout) throws Exception {
-        LOG.trace("Before poll {}", getEndpoint());
-        // resume or start our self
-        if (!ServiceHelper.resumeService(this)) {
-            ServiceHelper.startService(this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Before poll {}", getEndpoint());
         }
-
-        // ensure at least timeout is as long as one poll delay
-        return Math.max(timeout, getDelay());
+        scheduler.scheduleTask(this, this);
+        return timeout;
     }
 
     @Override
     public void afterPoll() throws Exception {
-        LOG.trace("After poll {}", getEndpoint());
-        // suspend or stop our self
-        if (!ServiceHelper.suspendService(this)) {
-            ServiceHelper.stopService(this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("After poll {}", getEndpoint());
         }
+        scheduler.unscheduleTask();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
index 5ccbedb..191f9d1 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.util.IntrospectionSupport;
 
 /**

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
new file mode 100644
index 0000000..3d9e22e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ScheduledPollConsumerScheduler;
+
+/**
+ * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but
uses a regular single-threaded {@link ExecutorService}
+ * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, Runnable)} is
invoked.
+ * <p/>
+ * This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using
the {@link EventDrivenPollingConsumer}
+ * bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single
threaded regular thread pool
+ * to execute the poll task on-demand, instead of using the usual scheduled thread pool which
does not fit well with a
+ * on-demand poll attempt.
+ */
+public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport
implements ScheduledPollConsumerScheduler {
+
+    private final Consumer consumer;
+    private CamelContext camelContext;
+    private ExecutorService executorService;
+    private Future future;
+
+    public SingleScheduledPollConsumerScheduler(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void scheduleTask(Consumer consumer, Runnable task) {
+        if (isSchedulerStarted()) {
+            future = executorService.submit(task);
+        }
+    }
+
+    @Override
+    public void unscheduleTask() {
+        if (future != null) {
+            future.cancel(false);
+            future = null;
+        }
+    }
+
+    @Override
+    public void startScheduler() {
+        // noop
+    }
+
+    @Override
+    public boolean isSchedulerStarted() {
+        return executorService != null && !executorService.isShutdown();
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (executorService == null) {
+            executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this,
consumer.getEndpoint().getEndpointKey());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            future.cancel(false);
+            future = null;
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        if (executorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
index fdf99d3..db9b41f 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
@@ -40,6 +40,13 @@ public interface ScheduledPollConsumerScheduler extends ShutdownableService,
Cam
     void scheduleTask(Consumer consumer, Runnable task);
 
     /**
+     * Attempts to unschedules the last task which was scheduled.
+     * <p/>
+     * An implementation may not implement this method.
+     */
+    void unscheduleTask();
+
+    /**
      * Starts the scheduler.
      * <p/>
      * If the scheduler is already started, then this is a noop method call.

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
index cfa6b03..ca22bf0 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
@@ -88,6 +88,11 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport
{
             };
         }
 
+        @Override
+        public void unscheduleTask() {
+            // noop
+        }
+
         public int getCounter() {
             return counter;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index e4e3c05..fec05c5 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -63,6 +63,18 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport
impleme
     }
 
     @Override
+    public void unscheduleTask() {
+        if (trigger != null) {
+            LOG.debug("Unscheduling trigger: {}", trigger.getKey());
+            try {
+                quartzScheduler.unscheduleJob(trigger.getKey());
+            } catch (SchedulerException e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
     public void startScheduler() {
         // the quartz component starts the scheduler
     }


Mime
View raw message