nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremyd...@apache.org
Subject [1/2] nifi git commit: Incorporated review comments.
Date Tue, 02 May 2017 14:07:21 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 8fa35294e -> cca5c4209


Incorporated review comments.

- Added description on what session maintenance does.
- Added calling deregister when initial connection attempt fails so that a processor can retry
connecting at next onTrigger.

This closes #1597

Signed-off-by: Jeremy Dyer <jeremydyer@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cca5c420
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cca5c420
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cca5c420

Branch: refs/heads/master
Commit: cca5c420954527d44cadf08b7d23edfdfb5314e9
Parents: 0a014b4
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Tue May 2 17:12:34 2017 +0900
Committer: Jeremy Dyer <jeremydyer@apache.org>
Committed: Tue May 2 10:02:55 2017 -0400

----------------------------------------------------------------------
 .../websocket/AbstractWebSocketGatewayProcessor.java      |  9 ++++++++-
 .../apache/nifi/websocket/jetty/JettyWebSocketClient.java | 10 ++++++++--
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 0262d98..c749456 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -147,6 +147,10 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
 
     @OnStopped
     public void onStopped(final ProcessContext context) throws IOException {
+        deregister();
+    }
+
+    private void deregister() {
         if (webSocketService == null) {
             return;
         }
@@ -170,11 +174,14 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
             try {
                 registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService));
             } catch (IOException|WebSocketConfigurationException e) {
+                // Deregister processor if it failed so that it can retry next onTrigger.
+                deregister();
+                context.yield();
                 throw new ProcessException("Failed to register processor to WebSocket service
due to: " + e, e);
             }
         }
 
-        context.yield();//nothing really to do here since threading managed by smtp server
sessions
+        context.yield();//nothing really to do here since handling WebSocket messages is
done at ControllerService.
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index 3d19eac..281b016 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -90,7 +90,12 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService
implemen
     public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
             .name("session-maintenance-interval")
             .displayName("Session Maintenance Interval")
-            .description("The interval between session maintenance activities.")
+            .description("The interval between session maintenance activities." +
+                    " A WebSocket session established with a WebSocket server can be terminated
due to different reasons" +
+                    " including restarting the WebSocket server or timing out inactive sessions."
+
+                    " This session maintenance activity is periodically executed in order
to reconnect those lost sessions," +
+                    " so that a WebSocket client can reuse the same session id transparently
after it reconnects successfully. " +
+                    " The maintenance activity is executed until corresponding processors
or this controller service is stopped.")
             .required(true)
             .expressionLanguageSupported(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -238,10 +243,11 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService
implemen
                 }
 
                 final String sessionId = activeSessions.get(clientId);
-                // If this session is stil alive, do nothing.
+                // If this session is still alive, do nothing.
                 if (!router.containsSession(sessionId)) {
                     // This session is no longer active, reconnect it.
                     // If it fails, the sessionId will remain in activeSessions, and retries
later.
+                    // This reconnect attempt is continued until user explicitly stops a
processor or this controller service.
                     connect(clientId, sessionId);
                 }
             }


Mime
View raw message