camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r831812 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/
Date Mon, 02 Nov 2009 07:32:02 GMT
Author: davsclaus
Date: Mon Nov  2 07:32:02 2009
New Revision: 831812

URL: http://svn.apache.org/viewvc?rev=831812&view=rev
Log:
CAMEL-1048: Added file based unit tests for suspend and resume triggered by a route policy.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=831812&r1=831811&r2=831812&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Mon Nov  2 07:32:02 2009
@@ -48,7 +48,7 @@
     private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
     private boolean useFixedDelay;
     private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
-    private boolean suspended;
+    private volatile boolean suspended;
 
     public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -76,6 +76,9 @@
      */
     public void run() {
         if (suspended) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Cannot start to poll: " + this.getEndpoint() + " as its suspended");
+            }
             return;
         }
 
@@ -86,7 +89,7 @@
             try {
                 // eager assume we are done
                 done = true;
-                if (isRunAllowed()) {
+                if (isRunAllowed() && !isSuspended()) {
 
                     if (retryCounter == -1) {
                         if (LOG.isTraceEnabled()) {
@@ -105,6 +108,10 @@
                     poll();
                     pollStrategy.commit(this, getEndpoint());
                 }
+
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Finished polling: " + this.getEndpoint());
+                }
             } catch (Exception e) {
                 try {
                     boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter,
e);
@@ -116,10 +123,6 @@
                 }
             }
         }
-
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Finished polling: " + this.getEndpoint());
-        }
     }
 
     // Properties

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java?rev=831812&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
Mon Nov  2 07:32:02 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerSuspendAndResumeTest extends ContextTestSupport {
+
+    private MyPolicy myPolicy = new MyPolicy();
+
+    public void testConsumeSuspendAndResumeFile() throws Exception {
+        deleteDirectory("target/suspended");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        template.sendBodyAndHeader("file://target/suspended", "Bye World", Exchange.FILE_NAME,
"bye.txt");
+        template.sendBodyAndHeader("file://target/suspended", "Hello World", Exchange.FILE_NAME,
"hello.txt");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(1000);
+
+        // the route is suspended by the policy so we should only receive one
+        File file = new File("target/suspended/hello.txt").getAbsoluteFile();
+        assertEquals("The file should exists", true, file.exists());
+
+        // reset mock
+        mock.reset();
+        mock.expectedBodiesReceived("Hello World");
+
+        // now resume it
+        myPolicy.resumeConsumer();
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(500);
+
+        // and the file is now moved
+        assertEquals("The file should not exists", false, file.exists());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file://target/suspended?maxMessagesPerPoll=1&sortBy=file:name")
+                    .routePolicy(myPolicy).id("myRoute")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyPolicy extends RoutePolicySupport {
+
+        private Consumer consumer;
+
+        public void onExchangeDone(Route route, Exchange exchange) {
+            this.consumer = route.getConsumer();
+            try {
+                super.stopConsumer(consumer);
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+
+        public void resumeConsumer() throws Exception {
+            super.startConsumer(consumer);
+        }
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java?rev=831812&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
Mon Nov  2 07:32:02 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerSuspendTest extends ContextTestSupport {
+
+    public void testConsumeSuspendFile() throws Exception {
+        deleteDirectory("target/suspended");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        template.sendBodyAndHeader("file://target/suspended", "Bye World", Exchange.FILE_NAME,
"bye.txt");
+        template.sendBodyAndHeader("file://target/suspended", "Hello World", Exchange.FILE_NAME,
"hello.txt");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(1000);
+
+        // the route is suspended by the policy so we should only receive one
+        File file = new File("target/suspended/hello.txt").getAbsoluteFile();
+        assertEquals("The file should exists", true, file.exists());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                MyPolicy myPolicy = new MyPolicy();
+                from("file://target/suspended?maxMessagesPerPoll=1&sortBy=file:name")
+                    .routePolicy(myPolicy).id("myRoute")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyPolicy extends RoutePolicySupport {
+
+        public void onExchangeDone(Route route, Exchange exchange) {
+            try {
+                super.stopConsumer(route.getConsumer());
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message