camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures.
Date Thu, 17 Mar 2016 17:03:12 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 1cc925e76 -> a3e4949d3


CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3e4949d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3e4949d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3e4949d

Branch: refs/heads/camel-2.16.x
Commit: a3e4949d3a2bc220784be15a45a20a7b9a1393a5
Parents: 1cc925e
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Thu Mar 17 08:40:08 2016 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Mar 17 18:02:56 2016 +0100

----------------------------------------------------------------------
 .../camel/component/ahc/ws/WsConsumer.java      | 11 ++-
 .../camel/component/ahc/ws/WsEndpoint.java      | 81 +++++++++++++-------
 .../ahc/ws/WsProducerConsumerTest.java          | 47 +++++++++++-
 3 files changed, 111 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index ab8b5a5..808d76d 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -51,6 +51,10 @@ public class WsConsumer extends DefaultConsumer {
         sendMessageInternal(message);
     }
 
+    public void sendMessage(Throwable throwable) {
+        sendMessageInternal(throwable);
+    }
+
     public void sendMessage(byte[] message) {
         sendMessageInternal(message);
     }
@@ -68,7 +72,12 @@ public class WsConsumer extends DefaultConsumer {
 
         //TODO may set some headers with some meta info (e.g., socket info, unique-id for
correlation purpose, etc0 
         // set the body
-        exchange.getIn().setBody(message);
+
+        if (message instanceof Throwable) {
+            exchange.setException((Throwable) message);
+        } else {
+            exchange.getIn().setBody(message);
+        }
 
         // send exchange using the async routing engine
         getAsyncProcessor().process(exchange, new AsyncCallback() {

http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 86c1fd0..88e1760 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -23,9 +23,8 @@ import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfig;
 import com.ning.http.client.AsyncHttpProvider;
 import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
+import com.ning.http.client.ws.DefaultWebSocketListener;
 import com.ning.http.client.ws.WebSocket;
-import com.ning.http.client.ws.WebSocketByteListener;
-import com.ning.http.client.ws.WebSocketTextListener;
 import com.ning.http.client.ws.WebSocketUpgradeHandler;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -45,25 +44,19 @@ public class WsEndpoint extends AhcEndpoint {
     private static final boolean GRIZZLY_AVAILABLE =
         probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
 
-    private final Set<WsConsumer> consumers  = new HashSet<WsConsumer>();
+    private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
+    private final WsListener listener = new WsListener();
+    private transient WebSocket websocket;
 
-    private WebSocket websocket;
-    @UriParam
+    @UriParam(label = "producer")
     private boolean useStreaming;
+    @UriParam(label = "consumer")
+    private boolean sendMessageOnError;
 
     public WsEndpoint(String endpointUri, WsComponent component) {
         super(endpointUri, component, null);
     }
 
-    private static boolean probeClass(String name) {
-        try {
-            Class.forName(name, true, WsEndpoint.class.getClassLoader());
-            return true;
-        } catch (Throwable t) {
-            return false;
-        }
-    }
-
     @Override
     public WsComponent getComponent() {
         return (WsComponent) super.getComponent();
@@ -81,9 +74,8 @@ public class WsEndpoint extends AhcEndpoint {
 
     WebSocket getWebSocket() throws Exception {
         synchronized (this) {
-            if (websocket == null) {
-                connect();
-            }
+            // ensure we are connected
+            reConnect();
         }
         return websocket;
     }
@@ -103,6 +95,17 @@ public class WsEndpoint extends AhcEndpoint {
         this.useStreaming = useStreaming;
     }
 
+    public boolean isSendMessageOnError() {
+        return sendMessageOnError;
+    }
+
+    /**
+     * Whether to send an message if the web-socket listener received an error.
+     */
+    public void setSendMessageOnError(boolean sendMessageOnError) {
+        this.sendMessageOnError = sendMessageOnError;
+    }
+
     @Override
     protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
         AsyncHttpClient client;
@@ -124,7 +127,7 @@ public class WsEndpoint extends AhcEndpoint {
         LOG.debug("Connecting to {}", uri);
         websocket = getClient().prepareGet(uri).execute(
             new WebSocketUpgradeHandler.Builder()
-                .addWebSocketListener(new WsListener()).build()).get();
+                .addWebSocketListener(listener).build()).get();
     }
 
     @Override
@@ -133,6 +136,7 @@ public class WsEndpoint extends AhcEndpoint {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
             }
+            websocket.removeWebSocketListener(listener);
             websocket.close();
             websocket = null;
         }
@@ -141,31 +145,46 @@ public class WsEndpoint extends AhcEndpoint {
 
     void connect(WsConsumer wsConsumer) throws Exception {
         consumers.add(wsConsumer);
-
-        if (websocket == null || !websocket.isOpen()) {
-            connect();
-        }
+        reConnect();
     }
 
     void disconnect(WsConsumer wsConsumer) {
         consumers.remove(wsConsumer);
     }
 
-    class WsListener implements WebSocketTextListener, WebSocketByteListener {
+    void reConnect() throws Exception {
+        if (websocket == null || !websocket.isOpen()) {
+            String uri = getHttpUri().toASCIIString();
+            LOG.info("Reconnecting websocket: {}", uri);
+            connect();
+        }
+    }
+
+    class WsListener extends DefaultWebSocketListener {
 
         @Override
         public void onOpen(WebSocket websocket) {
-            LOG.debug("websocket opened");
+            LOG.debug("Websocket opened");
         }
 
         @Override
         public void onClose(WebSocket websocket) {
-            LOG.debug("websocket closed");
+            LOG.debug("websocket closed - reconnecting");
+            try {
+                reConnect();
+            } catch (Exception e) {
+                LOG.warn("Error re-connecting to websocket", e);
+            }
         }
 
         @Override
         public void onError(Throwable t) {
-            LOG.error("websocket on error", t);
+            LOG.debug("websocket on error", t);
+            if (isSendMessageOnError()) {
+                for (WsConsumer consumer : consumers) {
+                    consumer.sendMessage(t);
+                }
+            }
         }
 
         @Override
@@ -192,4 +211,14 @@ public class WsEndpoint extends AhcEndpoint {
         }
         return null;
     }
+
+    private static boolean probeClass(String name) {
+        try {
+            Class.forName(name, true, WsEndpoint.class.getClassLoader());
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
index 54d8b09..e3d35d6 100644
--- a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
+++ b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
@@ -87,7 +87,52 @@ public class WsProducerConsumerTest extends CamelTestSupport {
         mock.assertIsSatisfied();
     }
 
-    
+    @Test
+    public void testTwoRoutesRestartConsumer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+
+        resetMocks();
+
+        log.info("Restarting bar route");
+        context.stopRoute("bar");
+        Thread.sleep(500);
+        context.startRoute("bar");
+
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testTwoRoutesRestartProducer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+
+        resetMocks();
+
+        log.info("Restarting foo route");
+        context.stopRoute("foo");
+        Thread.sleep(500);
+        context.startRoute("foo");
+
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder[] createRouteBuilders() throws Exception {
         RouteBuilder[] rbs = new RouteBuilder[2];


Mime
View raw message