nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject nifi git commit: NIFI-4673 changed offending tests to be integration tests and fixed travis config to run the build only once and during appropriate script phase instead of install. Reviewed by Bende.
Date Wed, 06 Dec 2017 21:13:59 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 7e61c6333 -> a774f1df6


NIFI-4673 changed offending tests to be integration tests and fixed travis config to run the build only once and during appropriate script phase instead of install. Reviewed by Bende.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a774f1df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a774f1df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a774f1df

Branch: refs/heads/master
Commit: a774f1df69b1d7fd6c3fda2acf7e0adf883def25
Parents: 7e61c63
Author: joewitt <joewitt@apache.org>
Authored: Wed Dec 6 12:03:16 2017 -0500
Committer: joewitt <joewitt@apache.org>
Committed: Wed Dec 6 16:13:42 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                     |   7 +-
 .../standard/ITListenAndPutSyslog.java          | 175 +++++++++++
 .../standard/TestListenAndPutSyslog.java        | 175 -----------
 .../jetty/ITJettyWebSocketCommunication.java    | 306 +++++++++++++++++++
 .../ITJettyWebSocketSecureCommunication.java    |  68 +++++
 .../jetty/TestJettyWebSocketCommunication.java  | 306 -------------------
 .../TestJettyWebSocketSecureCommunication.java  |  68 -----
 7 files changed, 554 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index e5b0ccb..6bb35a1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,6 +31,7 @@ cache:
   directories:
     - $HOME/.m2
     - $HOME/.npm
+
 before_cache:
   # Remove nifi repo again to save travis from caching it
   - rm -rf $HOME/.m2/repository/org/apache/nifi/
@@ -47,9 +48,11 @@ before_install:
   # Remove nifi repo again to save travis from caching it
   - rm -rf $HOME/.m2/repository/org/apache/nifi/
 
-install:
+install: true
+    
+script:
   # Replace variables seems to be the only option to pass proper values to surefire
   # Note: The reason the sed is done as part of script is to ensure the pom hack 
   # won't affect the 'clean install' above
   - bash .travis.sh
-  - mvn -T 2C -Pcontrib-check -Ddir-only clean install
+  - mvn -T 2 clean install -Pcontrib-check -Ddir-only

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
new file mode 100644
index 0000000..5d0562d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
+ * to ListenSyslog, or PutSyslog sending to a syslog server.
+ */
+public class ITListenAndPutSyslog {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(ITListenAndPutSyslog.class);
+
+    private ListenSyslog listenSyslog;
+    private TestRunner listenSyslogRunner;
+
+    private PutSyslog putSyslog;
+    private TestRunner putSyslogRunner;
+
+    @Before
+    public void setup() {
+        this.listenSyslog = new ListenSyslog();
+        this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
+
+        this.putSyslog = new PutSyslog();
+        this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
+    }
+
+    @After
+    public void teardown() {
+        try {
+            putSyslog.onStopped();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+        try {
+            listenSyslog.onUnscheduled();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    public void testUDP() throws IOException, InterruptedException {
+        run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTCP() throws IOException, InterruptedException {
+        run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTLS() throws InitializationException, IOException, InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        configureSSLContextService(putSyslogRunner);
+        putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
+    }
+
+    @Test
+    public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        // send 7 but expect 0 because sender didn't use TLS
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
+    }
+
+    private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
+        final SSLContextService sslContextService = new StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+        return sslContextService;
+    }
+
+    /**
+     * Sends numMessages from PutSyslog to ListenSyslog.
+     */
+    private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
+        // set the same protocol on both processors
+        putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
+        listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
+
+        // set a listening port of 0 to get a random available port
+        listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
+
+        // call onScheduled to start ListenSyslog listening
+        final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
+        final ProcessContext context = listenSyslogRunner.getProcessContext();
+        listenSyslog.onScheduled(context);
+
+        // get the real port it is listening on and set that in PutSyslog
+        final int listeningPort = listenSyslog.getPort();
+        putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
+
+        // configure the message properties on PutSyslog
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2016-02-05T22:14:15.003Z";
+        final String host = "localhost";
+        final String body = "some message";
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
+        putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
+
+        // send the messages
+        for (int i=0; i < numMessages; i++) {
+            putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        }
+        putSyslogRunner.run(numMessages, false);
+
+        // trigger ListenSyslog until we've seen all the messages
+        int numTransfered = 0;
+        long timeout = System.currentTimeMillis() + 30000;
+
+        while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
+            Thread.sleep(10);
+            listenSyslog.onTrigger(context, processSessionFactory);
+            numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+        }
+        Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
+
+        if (expectedMessages > 0) {
+            // check that one of flow files has the expected content
+            MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            mockFlowFile.assertContentEquals(expectedMessage);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
deleted file mode 100644
index 29fa690..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
- * to ListenSyslog, or PutSyslog sending to a syslog server.
- */
-public class TestListenAndPutSyslog {
-
-    static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class);
-
-    private ListenSyslog listenSyslog;
-    private TestRunner listenSyslogRunner;
-
-    private PutSyslog putSyslog;
-    private TestRunner putSyslogRunner;
-
-    @Before
-    public void setup() {
-        this.listenSyslog = new ListenSyslog();
-        this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
-
-        this.putSyslog = new PutSyslog();
-        this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
-    }
-
-    @After
-    public void teardown() {
-        try {
-            putSyslog.onStopped();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
-        }
-        try {
-            listenSyslog.onUnscheduled();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
-        }
-    }
-
-    @Test
-    public void testUDP() throws IOException, InterruptedException {
-        run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
-    }
-
-    @Test
-    public void testTCP() throws IOException, InterruptedException {
-        run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
-    }
-
-    @Test
-    public void testTLS() throws InitializationException, IOException, InterruptedException {
-        configureSSLContextService(listenSyslogRunner);
-        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
-        configureSSLContextService(putSyslogRunner);
-        putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
-        run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
-    }
-
-    @Test
-    public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
-        configureSSLContextService(listenSyslogRunner);
-        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
-        // send 7 but expect 0 because sender didn't use TLS
-        run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
-    }
-
-    private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
-        final SSLContextService sslContextService = new StandardSSLContextService();
-        runner.addControllerService("ssl-context", sslContextService);
-        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
-        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
-        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
-        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
-        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-        runner.enableControllerService(sslContextService);
-        return sslContextService;
-    }
-
-    /**
-     * Sends numMessages from PutSyslog to ListenSyslog.
-     */
-    private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
-        // set the same protocol on both processors
-        putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
-        listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
-
-        // set a listening port of 0 to get a random available port
-        listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
-
-        // call onScheduled to start ListenSyslog listening
-        final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
-        final ProcessContext context = listenSyslogRunner.getProcessContext();
-        listenSyslog.onScheduled(context);
-
-        // get the real port it is listening on and set that in PutSyslog
-        final int listeningPort = listenSyslog.getPort();
-        putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
-
-        // configure the message properties on PutSyslog
-        final String pri = "34";
-        final String version = "1";
-        final String stamp = "2016-02-05T22:14:15.003Z";
-        final String host = "localhost";
-        final String body = "some message";
-        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
-
-        putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
-        putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
-        putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
-        putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
-        putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
-
-        // send the messages
-        for (int i=0; i < numMessages; i++) {
-            putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
-        }
-        putSyslogRunner.run(numMessages, false);
-
-        // trigger ListenSyslog until we've seen all the messages
-        int numTransfered = 0;
-        long timeout = System.currentTimeMillis() + 30000;
-
-        while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
-            Thread.sleep(10);
-            listenSyslog.onTrigger(context, processSessionFactory);
-            numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-        }
-        Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
-
-        if (expectedMessages > 0) {
-            // check that one of flow files has the expected content
-            MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            mockFlowFile.assertContentEquals(expectedMessage);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
new file mode 100644
index 0000000..6d3f063
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.BinaryMessageConsumer;
+import org.apache.nifi.websocket.ConnectedListener;
+import org.apache.nifi.websocket.TextMessageConsumer;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.apache.nifi.websocket.WebSocketSessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class ITJettyWebSocketCommunication {
+
+    protected int serverPort;
+    protected String serverPath = "/test";
+    protected WebSocketServerService serverService;
+    protected ControllerServiceTestContext serverServiceContext;
+    protected WebSocketClientService clientService;
+    protected ControllerServiceTestContext clientServiceContext;
+
+    protected boolean isSecure() {
+        return false;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        setupServer();
+
+        setupClient();
+    }
+
+    private void setupServer() throws Exception {
+        // Find an open port.
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            serverPort = serverSocket.getLocalPort();
+        }
+        serverService = new JettyWebSocketServer();
+        serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
+        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
+
+        customizeServer();
+
+        serverService.initialize(serverServiceContext.getInitializationContext());
+        serverService.startServer(serverServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeServer() {
+    }
+
+    private void setupClient() throws Exception {
+        clientService = new JettyWebSocketClient();
+        clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
+        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+        customizeClient();
+
+        clientService.initialize(clientServiceContext.getInitializationContext());
+        clientService.startClient(clientServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeClient() {
+    }
+
+    @After
+    public void teardown() throws Exception {
+        clientService.stopClient();
+        serverService.stopServer();
+    }
+
+    protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+    }
+
+    private boolean isWindowsEnvironment() {
+        return System.getProperty("os.name").toLowerCase().startsWith("windows");
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    @Test
+    public void testClientServerCommunicationRecovery() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        // Nothing happens if maintenance is executed while sessions are alive.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        // Restart server.
+        serverService.stopServer();
+        serverService.startServer(serverServiceContext.getConfigurationContext());
+
+        // Sessions will be recreated with the same session ids.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+        sessionIdRef.set(sessionInfo.getSessionId());
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final String receivedMessage = invocation.getArgumentAt(1, String.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedMessage, receivedMessage);
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
+        final byte[] expectedBinary = expectedMessage.getBytes();
+        final int offset = invocation.getArgumentAt(2, Integer.class);
+        final int length = invocation.getArgumentAt(3, Integer.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedBinary.length, receivedMessage.length);
+        assertEquals(expectedMessage, new String(receivedMessage));
+        assertEquals(0, offset);
+        assertEquals(expectedBinary.length, length);
+        latch.countDown();
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..249af7a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.websocket.WebSocketService;
+import org.junit.Test;
+
+
+public class ITJettyWebSocketSecureCommunication extends ITJettyWebSocketCommunication{
+
+    private final StandardSSLContextService sslContextService = new StandardSSLContextService();
+    private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+    public ITJettyWebSocketSecureCommunication() {
+        try {
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+            sslContextService.initialize(sslTestContext.getInitializationContext());
+            sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+        } catch (InitializationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected boolean isSecure() {
+        return true;
+    }
+
+    @Override
+    protected void customizeServer() {
+        serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+    }
+
+    @Override
+    protected void customizeClient() {
+        clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        super.testClientServerCommunication();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
deleted file mode 100644
index a225447..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.websocket.BinaryMessageConsumer;
-import org.apache.nifi.websocket.ConnectedListener;
-import org.apache.nifi.websocket.TextMessageConsumer;
-import org.apache.nifi.websocket.WebSocketClientService;
-import org.apache.nifi.websocket.WebSocketServerService;
-import org.apache.nifi.websocket.WebSocketSessionInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-
-public class TestJettyWebSocketCommunication {
-
-    protected int serverPort;
-    protected String serverPath = "/test";
-    protected WebSocketServerService serverService;
-    protected ControllerServiceTestContext serverServiceContext;
-    protected WebSocketClientService clientService;
-    protected ControllerServiceTestContext clientServiceContext;
-
-    protected boolean isSecure() {
-        return false;
-    }
-
-    @Before
-    public void setup() throws Exception {
-        setupServer();
-
-        setupClient();
-    }
-
-    private void setupServer() throws Exception {
-        // Find an open port.
-        try (final ServerSocket serverSocket = new ServerSocket(0)) {
-            serverPort = serverSocket.getLocalPort();
-        }
-        serverService = new JettyWebSocketServer();
-        serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
-        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
-
-        customizeServer();
-
-        serverService.initialize(serverServiceContext.getInitializationContext());
-        serverService.startServer(serverServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeServer() {
-    }
-
-    private void setupClient() throws Exception {
-        clientService = new JettyWebSocketClient();
-        clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
-        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
-
-        customizeClient();
-
-        clientService.initialize(clientServiceContext.getInitializationContext());
-        clientService.startClient(clientServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeClient() {
-    }
-
-    @After
-    public void teardown() throws Exception {
-        clientService.stopClient();
-        serverService.stopServer();
-    }
-
-    protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
-    }
-
-    private boolean isWindowsEnvironment() {
-        return System.getProperty("os.name").toLowerCase().startsWith("windows");
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        // Expectations.
-        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
-        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
-        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
-        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
-        final String textMessageFromClient = "Message from client.";
-        final String textMessageFromServer = "Message from server.";
-
-        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
-        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
-        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
-                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        serverService.registerProcessor(serverPath, serverProcessor);
-
-        final String clientId = "client1";
-
-        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
-        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
-        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
-                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        clientService.registerProcessor(clientId, clientProcessor);
-
-        clientService.connect(clientId);
-
-        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-
-        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
-        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
-        clientService.deregisterProcessor(clientId, clientProcessor);
-        serverService.deregisterProcessor(serverPath, serverProcessor);
-    }
-
-    @Test
-    public void testClientServerCommunicationRecovery() throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        // Expectations.
-        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
-        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
-        final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
-        final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
-        final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
-        final String textMessageFromClient = "Message from client.";
-        final String textMessageFromServer = "Message from server.";
-
-        final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
-        final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
-        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
-                .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
-                .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        serverService.registerProcessor(serverPath, serverProcessor);
-
-        final String clientId = "client1";
-
-        final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
-        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
-        final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
-        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
-                .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
-                .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
-        clientService.registerProcessor(clientId, clientProcessor);
-
-        clientService.connect(clientId);
-
-        assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
-        // Nothing happens if maintenance is executed while sessions are alive.
-        ((JettyWebSocketClient) clientService).maintainSessions();
-
-        // Restart server.
-        serverService.stopServer();
-        serverService.startServer(serverServiceContext.getConfigurationContext());
-
-        // Sessions will be recreated with the same session ids.
-        ((JettyWebSocketClient) clientService).maintainSessions();
-
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-        assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
-        assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
-        clientService.deregisterProcessor(clientId, clientProcessor);
-        serverService.deregisterProcessor(serverPath, serverProcessor);
-    }
-
-    protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-        sessionIdRef.set(sessionInfo.getSessionId());
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final String receivedMessage = invocation.getArgumentAt(1, String.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedMessage, receivedMessage);
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
-        final byte[] expectedBinary = expectedMessage.getBytes();
-        final int offset = invocation.getArgumentAt(2, Integer.class);
-        final int length = invocation.getArgumentAt(3, Integer.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedBinary.length, receivedMessage.length);
-        assertEquals(expectedMessage, new String(receivedMessage));
-        assertEquals(0, offset);
-        assertEquals(expectedBinary.length, length);
-        latch.countDown();
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
deleted file mode 100644
index f5c96c2..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.websocket.WebSocketService;
-import org.junit.Test;
-
-
-public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
-
-    private final StandardSSLContextService sslContextService = new StandardSSLContextService();
-    private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
-
-    public TestJettyWebSocketSecureCommunication() {
-        try {
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
-            sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-
-            sslContextService.initialize(sslTestContext.getInitializationContext());
-            sslContextService.onConfigured(sslTestContext.getConfigurationContext());
-        } catch (InitializationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected boolean isSecure() {
-        return true;
-    }
-
-    @Override
-    protected void customizeServer() {
-        serverServiceContext.getInitializationContext().addControllerService(sslContextService);
-        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
-    }
-
-    @Override
-    protected void customizeClient() {
-        clientServiceContext.getInitializationContext().addControllerService(sslContextService);
-        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        super.testClientServerCommunication();
-    }
-
-}


Mime
View raw message