Repository: nifi
Updated Branches:
refs/heads/master 8fa35294e -> cca5c4209
Incorporated review comments.
- Added description on what session maintenance does.
- Added calling deregister when initial connection attempt fails so that a processor can retry
connecting at next onTrigger.
This closes #1597
Signed-off-by: Jeremy Dyer <jeremydyer@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cca5c420
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cca5c420
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cca5c420
Branch: refs/heads/master
Commit: cca5c420954527d44cadf08b7d23edfdfb5314e9
Parents: 0a014b4
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Tue May 2 17:12:34 2017 +0900
Committer: Jeremy Dyer <jeremydyer@apache.org>
Committed: Tue May 2 10:02:55 2017 -0400
----------------------------------------------------------------------
.../websocket/AbstractWebSocketGatewayProcessor.java | 9 ++++++++-
.../apache/nifi/websocket/jetty/JettyWebSocketClient.java | 10 ++++++++--
2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 0262d98..c749456 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -147,6 +147,10 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
@OnStopped
public void onStopped(final ProcessContext context) throws IOException {
+ deregister();
+ }
+
+ private void deregister() {
if (webSocketService == null) {
return;
}
@@ -170,11 +174,14 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
try {
registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService));
} catch (IOException|WebSocketConfigurationException e) {
+ // Deregister processor if it failed so that it can retry next onTrigger.
+ deregister();
+ context.yield();
throw new ProcessException("Failed to register processor to WebSocket service
due to: " + e, e);
}
}
- context.yield();//nothing really to do here since threading managed by smtp server
sessions
+ context.yield();//nothing really to do here since handling WebSocket messages is
done at ControllerService.
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cca5c420/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index 3d19eac..281b016 100644
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -90,7 +90,12 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService
implemen
public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
.name("session-maintenance-interval")
.displayName("Session Maintenance Interval")
- .description("The interval between session maintenance activities.")
+ .description("The interval between session maintenance activities." +
+ " A WebSocket session established with a WebSocket server can be terminated
due to different reasons" +
+ " including restarting the WebSocket server or timing out inactive sessions."
+
+ " This session maintenance activity is periodically executed in order
to reconnect those lost sessions," +
+ " so that a WebSocket client can reuse the same session id transparently
after it reconnects successfully. " +
+ " The maintenance activity is executed until corresponding processors
or this controller service is stopped.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -238,10 +243,11 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService
implemen
}
final String sessionId = activeSessions.get(clientId);
- // If this session is stil alive, do nothing.
+ // If this session is still alive, do nothing.
if (!router.containsSession(sessionId)) {
// This session is no longer active, reconnect it.
// If it fails, the sessionId will remain in activeSessions, and retries
later.
+ // This reconnect attempt is continued until user explicitly stops a
processor or this controller service.
connect(clientId, sessionId);
}
}
|