camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1058659 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ components/camel-jt400/ components/camel-jt400/src/main/java/org/apache/camel/component/jt400/ components/camel-jt400/src/test/java/org/apache/camel/component/jt400/
Date Thu, 13 Jan 2011 16:59:16 GMT
Author: davsclaus
Date: Thu Jan 13 16:59:15 2011
New Revision: 1058659

URL: http://svn.apache.org/viewvc?rev=1058659&view=rev
Log:
CAMEL-3540: Fixed camel-jt400 receive using timeout should be in seconds. Thanks to Joao Loureiro
for the patch.

Added:
    camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
    camel/trunk/components/camel-jt400/   (props changed)
    camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
    camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
Thu Jan 13 16:59:15 2011
@@ -19,6 +19,8 @@ package org.apache.camel.impl;
 import org.apache.camel.Endpoint;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A useful base class for implementations of {@link PollingConsumer}
@@ -26,6 +28,7 @@ import org.apache.camel.spi.ExceptionHan
  * @version $Revision$
  */
 public abstract class PollingConsumerSupport extends ServiceSupport implements PollingConsumer
{
+    protected final transient Log log = LogFactory.getLog(getClass());
     private final Endpoint endpoint;
     private ExceptionHandler exceptionHandler;
 

Propchange: camel/trunk/components/camel-jt400/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jan 13 16:59:15 2011
@@ -5,3 +5,5 @@
 target
 .settings
 eclipse-classes
+*.i??
+classes

Modified: camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
--- camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
(original)
+++ camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
Thu Jan 13 16:59:15 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jt400;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import com.ibm.as400.access.AS400;
 import com.ibm.as400.access.AS400SecurityException;
@@ -34,7 +35,7 @@ import org.apache.camel.impl.PollingCons
 
 
 /**
- * {@link PollingConsumer} that polls a data queue for data
+ * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
  */
 public class Jt400DataQueueConsumer extends PollingConsumerSupport {
 
@@ -51,6 +52,7 @@ public class Jt400DataQueueConsumer exte
     @Override
     protected void doStart() throws Exception {
         if (!endpoint.getSystem().isConnected()) {
+            log.info("Connecting to " + endpoint);
             endpoint.getSystem().connectService(AS400.DATAQUEUE);
         }
     }
@@ -58,21 +60,16 @@ public class Jt400DataQueueConsumer exte
     @Override
     protected void doStop() throws Exception {
         if (endpoint.getSystem().isConnected()) {
+            log.info("Disconnecting from " + endpoint);
             endpoint.getSystem().disconnectAllServices();
         }
     }
 
-    /**
-     * {@link Jt400DataQueueConsumer#receive(long)}
-     */
     public Exchange receive() {
         // -1 to indicate a blocking read from data queue
         return receive(-1);
     }
 
-    /**
-     * {@link Jt400DataQueueConsumer#receive(long)}
-     */
     public Exchange receiveNoWait() {
         return receive(0);
     }
@@ -93,10 +90,18 @@ public class Jt400DataQueueConsumer exte
         try {
             DataQueueEntry entry;
             if (timeout >= 0) {
-                entry = queue.read((int)timeout);
+                int seconds = (int)timeout / 1000;
+                if (log.isTraceEnabled()) {
+                    log.trace("Reading from data queue: " + queue.getName() + " with " +
seconds + " seconds timeout");
+                }
+                entry = queue.read(seconds);
             } else {
-                entry = queue.read();
+                if (log.isTraceEnabled()) {
+                    log.trace("Reading from data queue: " + queue.getName() + " with no timeout");
+                }
+                entry = queue.read(-1);
             }
+
             Exchange exchange = new DefaultExchange(endpoint.getCamelContext());
             if (entry != null) {
                 if (endpoint.getFormat() == Format.binary) {
@@ -107,18 +112,19 @@ public class Jt400DataQueueConsumer exte
                 return exchange;
             }
         } catch (AS400SecurityException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         } catch (ErrorCompletingRequestException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         } catch (IOException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         } catch (IllegalObjectTypeException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         } catch (InterruptedException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         } catch (ObjectDoesNotExistException e) {
-            throw new RuntimeCamelException("Unable to read from data queue: " + e.getMessage(),
e);
+            throw new RuntimeCamelException("Unable to read from data queue: " + queue.getName(),
e);
         }
         return null;
     }
+
 }

Modified: camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
--- camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
(original)
+++ camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
Thu Jan 13 16:59:15 2011
@@ -54,6 +54,7 @@ public class Jt400DataQueueProducer exte
     @Override
     protected void doStart() throws Exception {
         if (!endpoint.getSystem().isConnected()) {
+            log.info("Connecting to " + endpoint);
             endpoint.getSystem().connectService(AS400.DATAQUEUE);
         }
     }
@@ -61,6 +62,7 @@ public class Jt400DataQueueProducer exte
     @Override
     protected void doStop() throws Exception {
         if (endpoint.getSystem().isConnected()) {
+            log.info("Disconnecting from " + endpoint);
             endpoint.getSystem().disconnectAllServices();
         }
     }

Added: camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java?rev=1058659&view=auto
==============================================================================
--- camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
(added)
+++ camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
Thu Jan 13 16:59:15 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jt400;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test case for {@link Jt400DataQueueConsumer}.
+ * <p>
+ * So that timeout semantics can be tested, an URI to an empty data queue on an
+ * AS400 system should be provided (in a resource named
+ * <code>"jt400test.properties"</code>, in a property with key
+ * <code>"org.apache.camel.component.jt400.emptydtaq.uri"</code>).
+ * </p>
+ *
+ * @version $Revision$
+ */
+@Ignore("Test manual")
+public class Jt400DataQueueConsumerTest extends TestCase {
+
+    /**
+     * The deviation of the actual timeout value that we permit in our timeout
+     * tests.
+     */
+    private static final long TIMEOUT_TOLERANCE = 300L;
+
+    /**
+     * Timeout value in milliseconds used to test <code>receive(long)</code>.
+     */
+    private static final long TIMEOUT_VALUE = 3999L;
+
+    /**
+     * The amount of time in milliseconds to pass so that a call is assumed to
+     * be a blocking call.
+     */
+    private static final long BLOCKING_THRESHOLD = 5000L;
+
+    /**
+     * The consumer instance used in the tests.
+     */
+    private Jt400DataQueueConsumer consumer;
+
+    /**
+     * Flag that indicates whether <code>receive()</code> has returned from
+     * call.
+     */
+    private boolean receiveFlag;
+
+    @Before
+    public void setUp() throws Exception {
+        // Load endpoint URI
+        InputStream is = getClass().getResourceAsStream("jt400test.properties");
+        Properties props = new Properties();
+        String endpointURI;
+
+        props.load(is);
+        endpointURI = props.getProperty("org.apache.camel.component.jt400.emptydtaq.uri");
+
+        // Instantiate consumer
+        CamelContext camel = new DefaultCamelContext();
+        Jt400Component component = new Jt400Component();
+
+        component.setCamelContext(camel);
+        consumer = (Jt400DataQueueConsumer) component.createEndpoint(endpointURI).createPollingConsumer();
+        camel.start();
+    }
+
+    /**
+     * Tests whether <code>receive(long)</code> honours the <code>timeout</code>
parameter.
+     */
+    @Test(timeout = TIMEOUT_VALUE + TIMEOUT_TOLERANCE)
+    public void testReceiveLong() {
+        consumer.receive(TIMEOUT_VALUE);
+    }
+
+    /**
+     * Tests whether receive() blocks indefinitely.
+     */
+    @Test
+    public void testReceive() throws InterruptedException {
+        new Thread(new Runnable() {
+            public void run() {
+                consumer.receive();
+                receiveFlag = true;
+            }
+        }).start();
+
+        final long startTime = System.currentTimeMillis();
+        while (!receiveFlag) {
+            if ((System.currentTimeMillis() - startTime) > BLOCKING_THRESHOLD) {
+                /* Passed test. */
+                return;
+            }
+            Thread.sleep(50L);
+        }
+        assertTrue("Method receive() has returned from call.", false);
+    }
+
+}



Mime
View raw message