Repository: nifi
Updated Branches:
refs/heads/master 7e61c6333 -> a774f1df6
NIFI-4673 changed offending tests to be integration tests and fixed travis config to run the build only once and during appropriate script phase instead of install. Reviewed by Bende.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a774f1df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a774f1df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a774f1df
Branch: refs/heads/master
Commit: a774f1df69b1d7fd6c3fda2acf7e0adf883def25
Parents: 7e61c63
Author: joewitt <joewitt@apache.org>
Authored: Wed Dec 6 12:03:16 2017 -0500
Committer: joewitt <joewitt@apache.org>
Committed: Wed Dec 6 16:13:42 2017 -0500
----------------------------------------------------------------------
.travis.yml | 7 +-
.../standard/ITListenAndPutSyslog.java | 175 +++++++++++
.../standard/TestListenAndPutSyslog.java | 175 -----------
.../jetty/ITJettyWebSocketCommunication.java | 306 +++++++++++++++++++
.../ITJettyWebSocketSecureCommunication.java | 68 +++++
.../jetty/TestJettyWebSocketCommunication.java | 306 -------------------
.../TestJettyWebSocketSecureCommunication.java | 68 -----
7 files changed, 554 insertions(+), 551 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index e5b0ccb..6bb35a1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,6 +31,7 @@ cache:
directories:
- $HOME/.m2
- $HOME/.npm
+
before_cache:
# Remove nifi repo again to save travis from caching it
- rm -rf $HOME/.m2/repository/org/apache/nifi/
@@ -47,9 +48,11 @@ before_install:
# Remove nifi repo again to save travis from caching it
- rm -rf $HOME/.m2/repository/org/apache/nifi/
-install:
+install: true
+
+script:
# Replace variables seems to be the only option to pass proper values to surefire
# Note: The reason the sed is done as part of script is to ensure the pom hack
# won't affect the 'clean install' above
- bash .travis.sh
- - mvn -T 2C -Pcontrib-check -Ddir-only clean install
+ - mvn -T 2 clean install -Pcontrib-check -Ddir-only
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
new file mode 100644
index 0000000..5d0562d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
+ * to ListenSyslog, or PutSyslog sending to a syslog server.
+ */
+public class ITListenAndPutSyslog {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(ITListenAndPutSyslog.class);
+
+ private ListenSyslog listenSyslog;
+ private TestRunner listenSyslogRunner;
+
+ private PutSyslog putSyslog;
+ private TestRunner putSyslogRunner;
+
+ @Before
+ public void setup() {
+ this.listenSyslog = new ListenSyslog();
+ this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
+
+ this.putSyslog = new PutSyslog();
+ this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
+ }
+
+ @After
+ public void teardown() {
+ try {
+ putSyslog.onStopped();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ try {
+ listenSyslog.onUnscheduled();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testUDP() throws IOException, InterruptedException {
+ run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
+ }
+
+ @Test
+ public void testTCP() throws IOException, InterruptedException {
+ run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
+ }
+
+ @Test
+ public void testTLS() throws InitializationException, IOException, InterruptedException {
+ configureSSLContextService(listenSyslogRunner);
+ listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+ configureSSLContextService(putSyslogRunner);
+ putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+ run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
+ }
+
+ @Test
+ public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
+ configureSSLContextService(listenSyslogRunner);
+ listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+ // send 7 but expect 0 because sender didn't use TLS
+ run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
+ }
+
+ private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
+ final SSLContextService sslContextService = new StandardSSLContextService();
+ runner.addControllerService("ssl-context", sslContextService);
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+ runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+ runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+ runner.enableControllerService(sslContextService);
+ return sslContextService;
+ }
+
+ /**
+ * Sends numMessages from PutSyslog to ListenSyslog.
+ */
+ private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
+ // set the same protocol on both processors
+ putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
+ listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
+
+ // set a listening port of 0 to get a random available port
+ listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
+
+ // call onScheduled to start ListenSyslog listening
+ final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
+ final ProcessContext context = listenSyslogRunner.getProcessContext();
+ listenSyslog.onScheduled(context);
+
+ // get the real port it is listening on and set that in PutSyslog
+ final int listeningPort = listenSyslog.getPort();
+ putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
+
+ // configure the message properties on PutSyslog
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2016-02-05T22:14:15.003Z";
+ final String host = "localhost";
+ final String body = "some message";
+ final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+ putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
+ putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
+
+ // send the messages
+ for (int i=0; i < numMessages; i++) {
+ putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ }
+ putSyslogRunner.run(numMessages, false);
+
+ // trigger ListenSyslog until we've seen all the messages
+ int numTransfered = 0;
+ long timeout = System.currentTimeMillis() + 30000;
+
+ while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
+ Thread.sleep(10);
+ listenSyslog.onTrigger(context, processSessionFactory);
+ numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+ }
+ Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
+
+ if (expectedMessages > 0) {
+ // check that one of flow files has the expected content
+ MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+ mockFlowFile.assertContentEquals(expectedMessage);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
deleted file mode 100644
index 29fa690..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
- * to ListenSyslog, or PutSyslog sending to a syslog server.
- */
-public class TestListenAndPutSyslog {
-
- static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class);
-
- private ListenSyslog listenSyslog;
- private TestRunner listenSyslogRunner;
-
- private PutSyslog putSyslog;
- private TestRunner putSyslogRunner;
-
- @Before
- public void setup() {
- this.listenSyslog = new ListenSyslog();
- this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
-
- this.putSyslog = new PutSyslog();
- this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
- }
-
- @After
- public void teardown() {
- try {
- putSyslog.onStopped();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- try {
- listenSyslog.onUnscheduled();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- @Test
- public void testUDP() throws IOException, InterruptedException {
- run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
- }
-
- @Test
- public void testTCP() throws IOException, InterruptedException {
- run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
- }
-
- @Test
- public void testTLS() throws InitializationException, IOException, InterruptedException {
- configureSSLContextService(listenSyslogRunner);
- listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
- configureSSLContextService(putSyslogRunner);
- putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
- run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
- }
-
- @Test
- public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
- configureSSLContextService(listenSyslogRunner);
- listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
-
- // send 7 but expect 0 because sender didn't use TLS
- run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
- }
-
- private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
- final SSLContextService sslContextService = new StandardSSLContextService();
- runner.addControllerService("ssl-context", sslContextService);
- runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
- runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
- runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
- runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
- runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
- runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
- runner.enableControllerService(sslContextService);
- return sslContextService;
- }
-
- /**
- * Sends numMessages from PutSyslog to ListenSyslog.
- */
- private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
- // set the same protocol on both processors
- putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
- listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
-
- // set a listening port of 0 to get a random available port
- listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
-
- // call onScheduled to start ListenSyslog listening
- final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
- final ProcessContext context = listenSyslogRunner.getProcessContext();
- listenSyslog.onScheduled(context);
-
- // get the real port it is listening on and set that in PutSyslog
- final int listeningPort = listenSyslog.getPort();
- putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
-
- // configure the message properties on PutSyslog
- final String pri = "34";
- final String version = "1";
- final String stamp = "2016-02-05T22:14:15.003Z";
- final String host = "localhost";
- final String body = "some message";
- final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
-
- putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
- putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
- putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
- putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
- putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
-
- // send the messages
- for (int i=0; i < numMessages; i++) {
- putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
- }
- putSyslogRunner.run(numMessages, false);
-
- // trigger ListenSyslog until we've seen all the messages
- int numTransfered = 0;
- long timeout = System.currentTimeMillis() + 30000;
-
- while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
- Thread.sleep(10);
- listenSyslog.onTrigger(context, processSessionFactory);
- numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
- }
- Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
-
- if (expectedMessages > 0) {
- // check that one of flow files has the expected content
- MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- mockFlowFile.assertContentEquals(expectedMessage);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
new file mode 100644
index 0000000..6d3f063
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.BinaryMessageConsumer;
+import org.apache.nifi.websocket.ConnectedListener;
+import org.apache.nifi.websocket.TextMessageConsumer;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.apache.nifi.websocket.WebSocketSessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class ITJettyWebSocketCommunication {
+
+ protected int serverPort;
+ protected String serverPath = "/test";
+ protected WebSocketServerService serverService;
+ protected ControllerServiceTestContext serverServiceContext;
+ protected WebSocketClientService clientService;
+ protected ControllerServiceTestContext clientServiceContext;
+
+ protected boolean isSecure() {
+ return false;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupServer();
+
+ setupClient();
+ }
+
+ private void setupServer() throws Exception {
+ // Find an open port.
+ try (final ServerSocket serverSocket = new ServerSocket(0)) {
+ serverPort = serverSocket.getLocalPort();
+ }
+ serverService = new JettyWebSocketServer();
+ serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
+ serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
+
+ customizeServer();
+
+ serverService.initialize(serverServiceContext.getInitializationContext());
+ serverService.startServer(serverServiceContext.getConfigurationContext());
+ }
+
+ protected void customizeServer() {
+ }
+
+ private void setupClient() throws Exception {
+ clientService = new JettyWebSocketClient();
+ clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
+ clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+ customizeClient();
+
+ clientService.initialize(clientServiceContext.getInitializationContext());
+ clientService.startClient(clientServiceContext.getConfigurationContext());
+ }
+
+ protected void customizeClient() {
+ }
+
+ @After
+ public void teardown() throws Exception {
+ clientService.stopClient();
+ serverService.stopServer();
+ }
+
+ protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+ }
+
+ private boolean isWindowsEnvironment() {
+ return System.getProperty("os.name").toLowerCase().startsWith("windows");
+ }
+
+ @Test
+ public void testClientServerCommunication() throws Exception {
+ assumeFalse(isWindowsEnvironment());
+ // Expectations.
+ final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+ final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+ final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+ final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+ final String textMessageFromClient = "Message from client.";
+ final String textMessageFromServer = "Message from server.";
+
+ final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+ final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+ doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+ .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ serverService.registerProcessor(serverPath, serverProcessor);
+
+ final String clientId = "client1";
+
+ final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+ final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+ doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+ .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ clientService.registerProcessor(clientId, clientProcessor);
+
+ clientService.connect(clientId);
+
+ assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+ assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+ assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+ clientService.deregisterProcessor(clientId, clientProcessor);
+ serverService.deregisterProcessor(serverPath, serverProcessor);
+ }
+
+ @Test
+ public void testClientServerCommunicationRecovery() throws Exception {
+ assumeFalse(isWindowsEnvironment());
+ // Expectations.
+ final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+ final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+ final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
+ final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
+ final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
+
+ final String textMessageFromClient = "Message from client.";
+ final String textMessageFromServer = "Message from server.";
+
+ final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+ final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
+
+ doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
+ .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
+ .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ serverService.registerProcessor(serverPath, serverProcessor);
+
+ final String clientId = "client1";
+
+ final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
+ doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+ final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
+
+
+ doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
+ .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+ doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+ doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
+ .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
+
+ clientService.registerProcessor(clientId, clientProcessor);
+
+ clientService.connect(clientId);
+
+ assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+ // Nothing happens if maintenance is executed while sessions are alive.
+ ((JettyWebSocketClient) clientService).maintainSessions();
+
+ // Restart server.
+ serverService.stopServer();
+ serverService.startServer(serverServiceContext.getConfigurationContext());
+
+ // Sessions will be recreated with the same session ids.
+ ((JettyWebSocketClient) clientService).maintainSessions();
+
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
+ clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+ assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
+ serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+ assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+ assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+ clientService.deregisterProcessor(clientId, clientProcessor);
+ serverService.deregisterProcessor(serverPath, serverProcessor);
+ }
+
+ protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+ sessionIdRef.set(sessionInfo.getSessionId());
+ latch.countDown();
+ return null;
+ }
+
+ protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+
+ final String receivedMessage = invocation.getArgumentAt(1, String.class);
+ assertNotNull(receivedMessage);
+ assertEquals(expectedMessage, receivedMessage);
+ latch.countDown();
+ return null;
+ }
+
+ protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
+ final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
+ assertNotNull(sessionInfo.getLocalAddress());
+ assertNotNull(sessionInfo.getRemoteAddress());
+ assertNotNull(sessionInfo.getSessionId());
+ assertEquals(isSecure(), sessionInfo.isSecure());
+
+ final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
+ final byte[] expectedBinary = expectedMessage.getBytes();
+ final int offset = invocation.getArgumentAt(2, Integer.class);
+ final int length = invocation.getArgumentAt(3, Integer.class);
+ assertNotNull(receivedMessage);
+ assertEquals(expectedBinary.length, receivedMessage.length);
+ assertEquals(expectedMessage, new String(receivedMessage));
+ assertEquals(0, offset);
+ assertEquals(expectedBinary.length, length);
+ latch.countDown();
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..249af7a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.websocket.WebSocketService;
+import org.junit.Test;
+
+
+public class ITJettyWebSocketSecureCommunication extends ITJettyWebSocketCommunication{
+
+ private final StandardSSLContextService sslContextService = new StandardSSLContextService();
+ private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+ public ITJettyWebSocketSecureCommunication() {
+ try {
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+ sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+ sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+ sslContextService.initialize(sslTestContext.getInitializationContext());
+ sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+ } catch (InitializationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected boolean isSecure() {
+ return true;
+ }
+
+ @Override
+ protected void customizeServer() {
+ serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+ serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+ }
+
+ @Override
+ protected void customizeClient() {
+ clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+ clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
+ }
+
+ @Test
+ public void testClientServerCommunication() throws Exception {
+ super.testClientServerCommunication();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
deleted file mode 100644
index a225447..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.websocket.BinaryMessageConsumer;
-import org.apache.nifi.websocket.ConnectedListener;
-import org.apache.nifi.websocket.TextMessageConsumer;
-import org.apache.nifi.websocket.WebSocketClientService;
-import org.apache.nifi.websocket.WebSocketServerService;
-import org.apache.nifi.websocket.WebSocketSessionInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-
-public class TestJettyWebSocketCommunication {
-
- protected int serverPort;
- protected String serverPath = "/test";
- protected WebSocketServerService serverService;
- protected ControllerServiceTestContext serverServiceContext;
- protected WebSocketClientService clientService;
- protected ControllerServiceTestContext clientServiceContext;
-
- protected boolean isSecure() {
- return false;
- }
-
- @Before
- public void setup() throws Exception {
- setupServer();
-
- setupClient();
- }
-
- private void setupServer() throws Exception {
- // Find an open port.
- try (final ServerSocket serverSocket = new ServerSocket(0)) {
- serverPort = serverSocket.getLocalPort();
- }
- serverService = new JettyWebSocketServer();
- serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
- serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
-
- customizeServer();
-
- serverService.initialize(serverServiceContext.getInitializationContext());
- serverService.startServer(serverServiceContext.getConfigurationContext());
- }
-
- protected void customizeServer() {
- }
-
- private void setupClient() throws Exception {
- clientService = new JettyWebSocketClient();
- clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
- clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
-
- customizeClient();
-
- clientService.initialize(clientServiceContext.getInitializationContext());
- clientService.startClient(clientServiceContext.getConfigurationContext());
- }
-
- protected void customizeClient() {
- }
-
- @After
- public void teardown() throws Exception {
- clientService.stopClient();
- serverService.stopServer();
- }
-
- protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
- }
-
- private boolean isWindowsEnvironment() {
- return System.getProperty("os.name").toLowerCase().startsWith("windows");
- }
-
- @Test
- public void testClientServerCommunication() throws Exception {
- assumeFalse(isWindowsEnvironment());
- // Expectations.
- final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
- final CountDownLatch clientConnectedServer = new CountDownLatch(1);
- final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
- final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
- final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
- final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
- final String textMessageFromClient = "Message from client.";
- final String textMessageFromServer = "Message from server.";
-
- final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
- doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
- final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
- doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
- .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- serverService.registerProcessor(serverPath, serverProcessor);
-
- final String clientId = "client1";
-
- final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
- doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
- final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
- doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
- .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- clientService.registerProcessor(clientId, clientProcessor);
-
- clientService.connect(clientId);
-
- assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-
- assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
- assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
- clientService.deregisterProcessor(clientId, clientProcessor);
- serverService.deregisterProcessor(serverPath, serverProcessor);
- }
-
- @Test
- public void testClientServerCommunicationRecovery() throws Exception {
- assumeFalse(isWindowsEnvironment());
- // Expectations.
- final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
- final CountDownLatch clientConnectedServer = new CountDownLatch(1);
- final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
- final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
- final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
- final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
-
- final String textMessageFromClient = "Message from client.";
- final String textMessageFromServer = "Message from server.";
-
- final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
- doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
- final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
-
- doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
- .when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
- .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- serverService.registerProcessor(serverPath, serverProcessor);
-
- final String clientId = "client1";
-
- final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
- doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
- final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
-
-
- doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
- .when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
- doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
- doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
- .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
-
- clientService.registerProcessor(clientId, clientProcessor);
-
- clientService.connect(clientId);
-
- assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
- // Nothing happens if maintenance is executed while sessions are alive.
- ((JettyWebSocketClient) clientService).maintainSessions();
-
- // Restart server.
- serverService.stopServer();
- serverService.startServer(serverServiceContext.getConfigurationContext());
-
- // Sessions will be recreated with the same session ids.
- ((JettyWebSocketClient) clientService).maintainSessions();
-
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
- clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
- assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
- serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
- assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
- assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
- clientService.deregisterProcessor(clientId, clientProcessor);
- serverService.deregisterProcessor(serverPath, serverProcessor);
- }
-
- protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
- sessionIdRef.set(sessionInfo.getSessionId());
- latch.countDown();
- return null;
- }
-
- protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
-
- final String receivedMessage = invocation.getArgumentAt(1, String.class);
- assertNotNull(receivedMessage);
- assertEquals(expectedMessage, receivedMessage);
- latch.countDown();
- return null;
- }
-
- protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
- final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
- assertNotNull(sessionInfo.getLocalAddress());
- assertNotNull(sessionInfo.getRemoteAddress());
- assertNotNull(sessionInfo.getSessionId());
- assertEquals(isSecure(), sessionInfo.isSecure());
-
- final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
- final byte[] expectedBinary = expectedMessage.getBytes();
- final int offset = invocation.getArgumentAt(2, Integer.class);
- final int length = invocation.getArgumentAt(3, Integer.class);
- assertNotNull(receivedMessage);
- assertEquals(expectedBinary.length, receivedMessage.length);
- assertEquals(expectedMessage, new String(receivedMessage));
- assertEquals(0, offset);
- assertEquals(expectedBinary.length, length);
- latch.countDown();
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
deleted file mode 100644
index f5c96c2..0000000
--- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.websocket.WebSocketService;
-import org.junit.Test;
-
-
-public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
-
- private final StandardSSLContextService sslContextService = new StandardSSLContextService();
- private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
-
- public TestJettyWebSocketSecureCommunication() {
- try {
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
- sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
- sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-
- sslContextService.initialize(sslTestContext.getInitializationContext());
- sslContextService.onConfigured(sslTestContext.getConfigurationContext());
- } catch (InitializationException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected boolean isSecure() {
- return true;
- }
-
- @Override
- protected void customizeServer() {
- serverServiceContext.getInitializationContext().addControllerService(sslContextService);
- serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
- }
-
- @Override
- protected void customizeClient() {
- clientServiceContext.getInitializationContext().addControllerService(sslContextService);
- clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
- }
-
- @Test
- public void testClientServerCommunication() throws Exception {
- super.testClientServerCommunication();
- }
-
-}
|