Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C4E95200D4F for ; Wed, 6 Dec 2017 22:14:02 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C1AAA160C0A; Wed, 6 Dec 2017 21:14:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C00F6160BF3 for ; Wed, 6 Dec 2017 22:14:00 +0100 (CET) Received: (qmail 22479 invoked by uid 500); 6 Dec 2017 21:14:00 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 22470 invoked by uid 99); 6 Dec 2017 21:13:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Dec 2017 21:13:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CEF47E0433; Wed, 6 Dec 2017 21:13:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joewitt@apache.org To: commits@nifi.apache.org Message-Id: <4963ea1ea7e04735b28955631d92a6d9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-4673 changed offending tests to be integration tests and fixed travis config to run the build only once and during appropriate script phase instead of install. Reviewed by Bende. Date: Wed, 6 Dec 2017 21:13:59 +0000 (UTC) archived-at: Wed, 06 Dec 2017 21:14:03 -0000 Repository: nifi Updated Branches: refs/heads/master 7e61c6333 -> a774f1df6 NIFI-4673 changed offending tests to be integration tests and fixed travis config to run the build only once and during appropriate script phase instead of install. Reviewed by Bende. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a774f1df Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a774f1df Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a774f1df Branch: refs/heads/master Commit: a774f1df69b1d7fd6c3fda2acf7e0adf883def25 Parents: 7e61c63 Author: joewitt Authored: Wed Dec 6 12:03:16 2017 -0500 Committer: joewitt Committed: Wed Dec 6 16:13:42 2017 -0500 ---------------------------------------------------------------------- .travis.yml | 7 +- .../standard/ITListenAndPutSyslog.java | 175 +++++++++++ .../standard/TestListenAndPutSyslog.java | 175 ----------- .../jetty/ITJettyWebSocketCommunication.java | 306 +++++++++++++++++++ .../ITJettyWebSocketSecureCommunication.java | 68 +++++ .../jetty/TestJettyWebSocketCommunication.java | 306 ------------------- .../TestJettyWebSocketSecureCommunication.java | 68 ----- 7 files changed, 554 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index e5b0ccb..6bb35a1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,6 +31,7 @@ cache: directories: - $HOME/.m2 - $HOME/.npm + before_cache: # Remove nifi repo again to save travis from caching it - rm -rf $HOME/.m2/repository/org/apache/nifi/ @@ -47,9 +48,11 @@ before_install: # Remove nifi repo again to save travis from caching it - rm -rf $HOME/.m2/repository/org/apache/nifi/ -install: +install: true + +script: # Replace variables seems to be the only option to pass proper values to surefire # Note: The reason the sed is done as part of script is to ensure the pom hack # won't affect the 'clean install' above - bash .travis.sh - - mvn -T 2C -Pcontrib-check -Ddir-only clean install + - mvn -T 2 clean install -Pcontrib-check -Ddir-only http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java new file mode 100644 index 0000000..5d0562d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding + * to ListenSyslog, or PutSyslog sending to a syslog server. + */ +public class ITListenAndPutSyslog { + + static final Logger LOGGER = LoggerFactory.getLogger(ITListenAndPutSyslog.class); + + private ListenSyslog listenSyslog; + private TestRunner listenSyslogRunner; + + private PutSyslog putSyslog; + private TestRunner putSyslogRunner; + + @Before + public void setup() { + this.listenSyslog = new ListenSyslog(); + this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog); + + this.putSyslog = new PutSyslog(); + this.putSyslogRunner = TestRunners.newTestRunner(putSyslog); + } + + @After + public void teardown() { + try { + putSyslog.onStopped(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + try { + listenSyslog.onUnscheduled(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + @Test + public void testUDP() throws IOException, InterruptedException { + run(ListenSyslog.UDP_VALUE.getValue(), 5, 5); + } + + @Test + public void testTCP() throws IOException, InterruptedException { + run(ListenSyslog.TCP_VALUE.getValue(), 5, 5); + } + + @Test + public void testTLS() throws InitializationException, IOException, InterruptedException { + configureSSLContextService(listenSyslogRunner); + listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + configureSSLContextService(putSyslogRunner); + putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + run(ListenSyslog.TCP_VALUE.getValue(), 7, 7); + } + + @Test + public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException { + configureSSLContextService(listenSyslogRunner); + listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); + + // send 7 but expect 0 because sender didn't use TLS + run(ListenSyslog.TCP_VALUE.getValue(), 7, 0); + } + + private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException { + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + return sslContextService; + } + + /** + * Sends numMessages from PutSyslog to ListenSyslog. + */ + private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException { + // set the same protocol on both processors + putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol); + listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol); + + // set a listening port of 0 to get a random available port + listenSyslogRunner.setProperty(ListenSyslog.PORT, "0"); + + // call onScheduled to start ListenSyslog listening + final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory(); + final ProcessContext context = listenSyslogRunner.getProcessContext(); + listenSyslog.onScheduled(context); + + // get the real port it is listening on and set that in PutSyslog + final int listeningPort = listenSyslog.getPort(); + putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort)); + + // configure the message properties on PutSyslog + final String pri = "34"; + final String version = "1"; + final String stamp = "2016-02-05T22:14:15.003Z"; + final String host = "localhost"; + final String body = "some message"; + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri); + putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version); + putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host); + putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body); + + // send the messages + for (int i=0; i < numMessages; i++) { + putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + } + putSyslogRunner.run(numMessages, false); + + // trigger ListenSyslog until we've seen all the messages + int numTransfered = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + listenSyslog.onTrigger(context, processSessionFactory); + numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered); + + if (expectedMessages > 0) { + // check that one of flow files has the expected content + MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(expectedMessage); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java deleted file mode 100644 index 29fa690..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.Charset; - -/** - * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding - * to ListenSyslog, or PutSyslog sending to a syslog server. - */ -public class TestListenAndPutSyslog { - - static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class); - - private ListenSyslog listenSyslog; - private TestRunner listenSyslogRunner; - - private PutSyslog putSyslog; - private TestRunner putSyslogRunner; - - @Before - public void setup() { - this.listenSyslog = new ListenSyslog(); - this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog); - - this.putSyslog = new PutSyslog(); - this.putSyslogRunner = TestRunners.newTestRunner(putSyslog); - } - - @After - public void teardown() { - try { - putSyslog.onStopped(); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - try { - listenSyslog.onUnscheduled(); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - } - - @Test - public void testUDP() throws IOException, InterruptedException { - run(ListenSyslog.UDP_VALUE.getValue(), 5, 5); - } - - @Test - public void testTCP() throws IOException, InterruptedException { - run(ListenSyslog.TCP_VALUE.getValue(), 5, 5); - } - - @Test - public void testTLS() throws InitializationException, IOException, InterruptedException { - configureSSLContextService(listenSyslogRunner); - listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); - - configureSSLContextService(putSyslogRunner); - putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); - - run(ListenSyslog.TCP_VALUE.getValue(), 7, 7); - } - - @Test - public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException { - configureSSLContextService(listenSyslogRunner); - listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context"); - - // send 7 but expect 0 because sender didn't use TLS - run(ListenSyslog.TCP_VALUE.getValue(), 7, 0); - } - - private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException { - final SSLContextService sslContextService = new StandardSSLContextService(); - runner.addControllerService("ssl-context", sslContextService); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); - runner.enableControllerService(sslContextService); - return sslContextService; - } - - /** - * Sends numMessages from PutSyslog to ListenSyslog. - */ - private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException { - // set the same protocol on both processors - putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol); - listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol); - - // set a listening port of 0 to get a random available port - listenSyslogRunner.setProperty(ListenSyslog.PORT, "0"); - - // call onScheduled to start ListenSyslog listening - final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory(); - final ProcessContext context = listenSyslogRunner.getProcessContext(); - listenSyslog.onScheduled(context); - - // get the real port it is listening on and set that in PutSyslog - final int listeningPort = listenSyslog.getPort(); - putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort)); - - // configure the message properties on PutSyslog - final String pri = "34"; - final String version = "1"; - final String stamp = "2016-02-05T22:14:15.003Z"; - final String host = "localhost"; - final String body = "some message"; - final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; - - putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri); - putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version); - putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); - putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host); - putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body); - - // send the messages - for (int i=0; i < numMessages; i++) { - putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); - } - putSyslogRunner.run(numMessages, false); - - // trigger ListenSyslog until we've seen all the messages - int numTransfered = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - listenSyslog.onTrigger(context, processSessionFactory); - numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered); - - if (expectedMessages > 0) { - // check that one of flow files has the expected content - MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - mockFlowFile.assertContentEquals(expectedMessage); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java new file mode 100644 index 0000000..6d3f063 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.websocket.BinaryMessageConsumer; +import org.apache.nifi.websocket.ConnectedListener; +import org.apache.nifi.websocket.TextMessageConsumer; +import org.apache.nifi.websocket.WebSocketClientService; +import org.apache.nifi.websocket.WebSocketServerService; +import org.apache.nifi.websocket.WebSocketSessionInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.net.ServerSocket; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + + +public class ITJettyWebSocketCommunication { + + protected int serverPort; + protected String serverPath = "/test"; + protected WebSocketServerService serverService; + protected ControllerServiceTestContext serverServiceContext; + protected WebSocketClientService clientService; + protected ControllerServiceTestContext clientServiceContext; + + protected boolean isSecure() { + return false; + } + + @Before + public void setup() throws Exception { + setupServer(); + + setupClient(); + } + + private void setupServer() throws Exception { + // Find an open port. + try (final ServerSocket serverSocket = new ServerSocket(0)) { + serverPort = serverSocket.getLocalPort(); + } + serverService = new JettyWebSocketServer(); + serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1"); + serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort)); + + customizeServer(); + + serverService.initialize(serverServiceContext.getInitializationContext()); + serverService.startServer(serverServiceContext.getConfigurationContext()); + } + + protected void customizeServer() { + } + + private void setupClient() throws Exception { + clientService = new JettyWebSocketClient(); + clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1"); + clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath); + + customizeClient(); + + clientService.initialize(clientServiceContext.getInitializationContext()); + clientService.startClient(clientServiceContext.getConfigurationContext()); + } + + protected void customizeClient() { + } + + @After + public void teardown() throws Exception { + clientService.stopClient(); + serverService.stopServer(); + } + + protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer { + } + + private boolean isWindowsEnvironment() { + return System.getProperty("os.name").toLowerCase().startsWith("windows"); + } + + @Test + public void testClientServerCommunication() throws Exception { + assumeFalse(isWindowsEnvironment()); + // Expectations. + final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1); + final CountDownLatch clientConnectedServer = new CountDownLatch(1); + final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1); + final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1); + final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1); + final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1); + + final String textMessageFromClient = "Message from client."; + final String textMessageFromServer = "Message from server."; + + final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class); + doReturn("serverProcessor1").when(serverProcessor).getIdentifier(); + final AtomicReference serverSessionIdRef = new AtomicReference<>(); + + doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation)) + .when(serverProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + serverService.registerProcessor(serverPath, serverProcessor); + + final String clientId = "client1"; + + final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class); + doReturn("clientProcessor1").when(clientProcessor).getIdentifier(); + final AtomicReference clientSessionIdRef = new AtomicReference<>(); + + + doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation)) + .when(clientProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + clientService.registerProcessor(clientId, clientProcessor); + + clientService.connect(clientId); + + assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS)); + + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient)); + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes()))); + + + assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS)); + + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer)); + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes()))); + + assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS)); + + clientService.deregisterProcessor(clientId, clientProcessor); + serverService.deregisterProcessor(serverPath, serverProcessor); + } + + @Test + public void testClientServerCommunicationRecovery() throws Exception { + assumeFalse(isWindowsEnvironment()); + // Expectations. + final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1); + final CountDownLatch clientConnectedServer = new CountDownLatch(1); + final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1); + final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1); + final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1); + final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1); + + final String textMessageFromClient = "Message from client."; + final String textMessageFromServer = "Message from server."; + + final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class); + doReturn("serverProcessor1").when(serverProcessor).getIdentifier(); + final AtomicReference serverSessionIdRef = new AtomicReference<>(); + + doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation)) + .when(serverProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + serverService.registerProcessor(serverPath, serverProcessor); + + final String clientId = "client1"; + + final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class); + doReturn("clientProcessor1").when(clientProcessor).getIdentifier(); + final AtomicReference clientSessionIdRef = new AtomicReference<>(); + + + doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation)) + .when(clientProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + clientService.registerProcessor(clientId, clientProcessor); + + clientService.connect(clientId); + + assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS)); + + // Nothing happens if maintenance is executed while sessions are alive. + ((JettyWebSocketClient) clientService).maintainSessions(); + + // Restart server. + serverService.stopServer(); + serverService.startServer(serverServiceContext.getConfigurationContext()); + + // Sessions will be recreated with the same session ids. + ((JettyWebSocketClient) clientService).maintainSessions(); + + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient)); + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes()))); + + assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS)); + + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer)); + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes()))); + + assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS)); + + clientService.deregisterProcessor(clientId, clientProcessor); + serverService.deregisterProcessor(serverPath, serverProcessor); + } + + protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference sessionIdRef, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + sessionIdRef.set(sessionInfo.getSessionId()); + latch.countDown(); + return null; + } + + protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + + final String receivedMessage = invocation.getArgumentAt(1, String.class); + assertNotNull(receivedMessage); + assertEquals(expectedMessage, receivedMessage); + latch.countDown(); + return null; + } + + protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + + final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class); + final byte[] expectedBinary = expectedMessage.getBytes(); + final int offset = invocation.getArgumentAt(2, Integer.class); + final int length = invocation.getArgumentAt(3, Integer.class); + assertNotNull(receivedMessage); + assertEquals(expectedBinary.length, receivedMessage.length); + assertEquals(expectedMessage, new String(receivedMessage)); + assertEquals(0, offset); + assertEquals(expectedBinary.length, length); + latch.countDown(); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java new file mode 100644 index 0000000..249af7a --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.websocket.WebSocketService; +import org.junit.Test; + + +public class ITJettyWebSocketSecureCommunication extends ITJettyWebSocketCommunication{ + + private final StandardSSLContextService sslContextService = new StandardSSLContextService(); + private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService"); + + public ITJettyWebSocketSecureCommunication() { + try { + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks"); + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + + sslContextService.initialize(sslTestContext.getInitializationContext()); + sslContextService.onConfigured(sslTestContext.getConfigurationContext()); + } catch (InitializationException e) { + throw new RuntimeException(e); + } + } + + @Override + protected boolean isSecure() { + return true; + } + + @Override + protected void customizeServer() { + serverServiceContext.getInitializationContext().addControllerService(sslContextService); + serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); + } + + @Override + protected void customizeClient() { + clientServiceContext.getInitializationContext().addControllerService(sslContextService); + clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); + } + + @Test + public void testClientServerCommunication() throws Exception { + super.testClientServerCommunication(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java deleted file mode 100644 index a225447..0000000 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.websocket.jetty; - -import org.apache.nifi.processor.Processor; -import org.apache.nifi.websocket.BinaryMessageConsumer; -import org.apache.nifi.websocket.ConnectedListener; -import org.apache.nifi.websocket.TextMessageConsumer; -import org.apache.nifi.websocket.WebSocketClientService; -import org.apache.nifi.websocket.WebSocketServerService; -import org.apache.nifi.websocket.WebSocketSessionInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; - -import java.net.ServerSocket; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - - -public class TestJettyWebSocketCommunication { - - protected int serverPort; - protected String serverPath = "/test"; - protected WebSocketServerService serverService; - protected ControllerServiceTestContext serverServiceContext; - protected WebSocketClientService clientService; - protected ControllerServiceTestContext clientServiceContext; - - protected boolean isSecure() { - return false; - } - - @Before - public void setup() throws Exception { - setupServer(); - - setupClient(); - } - - private void setupServer() throws Exception { - // Find an open port. - try (final ServerSocket serverSocket = new ServerSocket(0)) { - serverPort = serverSocket.getLocalPort(); - } - serverService = new JettyWebSocketServer(); - serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1"); - serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort)); - - customizeServer(); - - serverService.initialize(serverServiceContext.getInitializationContext()); - serverService.startServer(serverServiceContext.getConfigurationContext()); - } - - protected void customizeServer() { - } - - private void setupClient() throws Exception { - clientService = new JettyWebSocketClient(); - clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1"); - clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath); - - customizeClient(); - - clientService.initialize(clientServiceContext.getInitializationContext()); - clientService.startClient(clientServiceContext.getConfigurationContext()); - } - - protected void customizeClient() { - } - - @After - public void teardown() throws Exception { - clientService.stopClient(); - serverService.stopServer(); - } - - protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer { - } - - private boolean isWindowsEnvironment() { - return System.getProperty("os.name").toLowerCase().startsWith("windows"); - } - - @Test - public void testClientServerCommunication() throws Exception { - assumeFalse(isWindowsEnvironment()); - // Expectations. - final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1); - final CountDownLatch clientConnectedServer = new CountDownLatch(1); - final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1); - final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1); - final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1); - final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1); - - final String textMessageFromClient = "Message from client."; - final String textMessageFromServer = "Message from server."; - - final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class); - doReturn("serverProcessor1").when(serverProcessor).getIdentifier(); - final AtomicReference serverSessionIdRef = new AtomicReference<>(); - - doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation)) - .when(serverProcessor).connected(any(WebSocketSessionInfo.class)); - - doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation)) - .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString()); - - doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation)) - .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); - - serverService.registerProcessor(serverPath, serverProcessor); - - final String clientId = "client1"; - - final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class); - doReturn("clientProcessor1").when(clientProcessor).getIdentifier(); - final AtomicReference clientSessionIdRef = new AtomicReference<>(); - - - doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation)) - .when(clientProcessor).connected(any(WebSocketSessionInfo.class)); - - doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation)) - .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString()); - - doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation)) - .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); - - clientService.registerProcessor(clientId, clientProcessor); - - clientService.connect(clientId); - - assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS)); - - clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient)); - clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes()))); - - - assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS)); - - serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer)); - serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes()))); - - assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS)); - - clientService.deregisterProcessor(clientId, clientProcessor); - serverService.deregisterProcessor(serverPath, serverProcessor); - } - - @Test - public void testClientServerCommunicationRecovery() throws Exception { - assumeFalse(isWindowsEnvironment()); - // Expectations. - final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1); - final CountDownLatch clientConnectedServer = new CountDownLatch(1); - final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1); - final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1); - final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1); - final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1); - - final String textMessageFromClient = "Message from client."; - final String textMessageFromServer = "Message from server."; - - final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class); - doReturn("serverProcessor1").when(serverProcessor).getIdentifier(); - final AtomicReference serverSessionIdRef = new AtomicReference<>(); - - doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation)) - .when(serverProcessor).connected(any(WebSocketSessionInfo.class)); - - doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation)) - .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString()); - - doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation)) - .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); - - serverService.registerProcessor(serverPath, serverProcessor); - - final String clientId = "client1"; - - final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class); - doReturn("clientProcessor1").when(clientProcessor).getIdentifier(); - final AtomicReference clientSessionIdRef = new AtomicReference<>(); - - - doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation)) - .when(clientProcessor).connected(any(WebSocketSessionInfo.class)); - - doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation)) - .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString()); - - doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation)) - .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); - - clientService.registerProcessor(clientId, clientProcessor); - - clientService.connect(clientId); - - assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS)); - - // Nothing happens if maintenance is executed while sessions are alive. - ((JettyWebSocketClient) clientService).maintainSessions(); - - // Restart server. - serverService.stopServer(); - serverService.startServer(serverServiceContext.getConfigurationContext()); - - // Sessions will be recreated with the same session ids. - ((JettyWebSocketClient) clientService).maintainSessions(); - - clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient)); - clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes()))); - - assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS)); - - serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer)); - serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes()))); - - assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS)); - assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS)); - - clientService.deregisterProcessor(clientId, clientProcessor); - serverService.deregisterProcessor(serverPath, serverProcessor); - } - - protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference sessionIdRef, InvocationOnMock invocation) { - final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); - assertNotNull(sessionInfo.getLocalAddress()); - assertNotNull(sessionInfo.getRemoteAddress()); - assertNotNull(sessionInfo.getSessionId()); - assertEquals(isSecure(), sessionInfo.isSecure()); - sessionIdRef.set(sessionInfo.getSessionId()); - latch.countDown(); - return null; - } - - protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { - final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); - assertNotNull(sessionInfo.getLocalAddress()); - assertNotNull(sessionInfo.getRemoteAddress()); - assertNotNull(sessionInfo.getSessionId()); - assertEquals(isSecure(), sessionInfo.isSecure()); - - final String receivedMessage = invocation.getArgumentAt(1, String.class); - assertNotNull(receivedMessage); - assertEquals(expectedMessage, receivedMessage); - latch.countDown(); - return null; - } - - protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { - final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); - assertNotNull(sessionInfo.getLocalAddress()); - assertNotNull(sessionInfo.getRemoteAddress()); - assertNotNull(sessionInfo.getSessionId()); - assertEquals(isSecure(), sessionInfo.isSecure()); - - final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class); - final byte[] expectedBinary = expectedMessage.getBytes(); - final int offset = invocation.getArgumentAt(2, Integer.class); - final int length = invocation.getArgumentAt(3, Integer.class); - assertNotNull(receivedMessage); - assertEquals(expectedBinary.length, receivedMessage.length); - assertEquals(expectedMessage, new String(receivedMessage)); - assertEquals(0, offset); - assertEquals(expectedBinary.length, length); - latch.countDown(); - return null; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java deleted file mode 100644 index f5c96c2..0000000 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.websocket.jetty; - -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.websocket.WebSocketService; -import org.junit.Test; - - -public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{ - - private final StandardSSLContextService sslContextService = new StandardSSLContextService(); - private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService"); - - public TestJettyWebSocketSecureCommunication() { - try { - sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks"); - sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); - sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS"); - sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks"); - sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - - sslContextService.initialize(sslTestContext.getInitializationContext()); - sslContextService.onConfigured(sslTestContext.getConfigurationContext()); - } catch (InitializationException e) { - throw new RuntimeException(e); - } - } - - @Override - protected boolean isSecure() { - return true; - } - - @Override - protected void customizeServer() { - serverServiceContext.getInitializationContext().addControllerService(sslContextService); - serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); - } - - @Override - protected void customizeClient() { - clientServiceContext.getInitializationContext().addControllerService(sslContextService); - clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); - } - - @Test - public void testClientServerCommunication() throws Exception { - super.testClientServerCommunication(); - } - -}