nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danbr...@apache.org
Subject [04/29] incubator-nifi git commit: NIFI-454: Use random ports instead of specific ports for running unit tests; updated abstract class and interface to expose the port being used
Date Sun, 21 Jun 2015 17:23:05 GMT
NIFI-454: Use random ports instead of specific ports for running unit tests; updated abstract
class and interface to expose the port being used


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/315af02c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/315af02c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/315af02c

Branch: refs/heads/NIFI-632
Commit: 315af02c595fdf62881b839f44227d121cbda46d
Parents: 739baa2
Author: Mark Payne <markap14@hotmail.com>
Authored: Fri Jun 5 13:56:45 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Wed Jun 17 08:15:10 2015 -0400

----------------------------------------------------------------------
 .../cache/server/AbstractCacheServer.java       |   8 +-
 .../distributed/cache/server/CacheServer.java   |   2 +
 .../cache/server/DistributedCacheServer.java    |  65 +++--
 .../cache/server/TestServerAndClient.java       | 292 +++++++++----------
 4 files changed, 185 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index 10f53b2..5c5a9cb 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -52,7 +52,6 @@ public abstract class AbstractCacheServer implements CacheServer {
     private final SSLContext sslContext;
     protected volatile boolean stopped = false;
     private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
-    ;
 
     private volatile ServerSocketChannel serverSocketChannel;
 
@@ -63,6 +62,11 @@ public abstract class AbstractCacheServer implements CacheServer {
     }
 
     @Override
+    public int getPort() {
+        return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort();
+    }
+
+    @Override
     public void start() throws IOException {
         serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(true);
@@ -117,7 +121,7 @@ public abstract class AbstractCacheServer implements CacheServer {
                                 return;
                             }
                             try (final InputStream in = new BufferedInputStream(rawInputStream);
-                                    final OutputStream out = new BufferedOutputStream(rawOutputStream))
{
+                                final OutputStream out = new BufferedOutputStream(rawOutputStream))
{
 
                                 final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
index fab8f13..d97c519 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
@@ -24,4 +24,6 @@ public interface CacheServer {
 
     void stop() throws IOException;
 
+    int getPort();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 5907f50..44419b5 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -35,39 +35,39 @@ public abstract class DistributedCacheServer extends AbstractControllerService
{
     public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
 
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port to listen on for incoming connections")
-            .required(true)
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .defaultValue("4557")
-            .build();
+        .name("Port")
+        .description("The port to listen on for incoming connections")
+        .required(true)
+        .addValidator(StandardValidators.PORT_VALIDATOR)
+        .defaultValue("4557")
+        .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("If specified, this service will be used to create an SSL Context
that will be used "
-                    + "to secure communications; if not specified, communications will not
be secure")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
+        .name("SSL Context Service")
+        .description("If specified, this service will be used to create an SSL Context that
will be used "
+            + "to secure communications; if not specified, communications will not be secure")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
     public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Maximum Cache Entries")
-            .description("The maximum number of cache entries that the cache can hold")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10000")
-            .build();
+        .name("Maximum Cache Entries")
+        .description("The maximum number of cache entries that the cache can hold")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10000")
+        .build();
     public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
-            .name("Eviction Strategy")
-            .description("Determines which strategy should be used to evict values from the
cache to make room for new entries")
-            .required(true)
-            .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
-            .defaultValue(EVICTION_STRATEGY_LFU)
-            .build();
+        .name("Eviction Strategy")
+        .description("Determines which strategy should be used to evict values from the cache
to make room for new entries")
+        .required(true)
+        .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
+        .defaultValue(EVICTION_STRATEGY_LFU)
+        .build();
     public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
-            .name("Persistence Directory")
-            .description("If specified, the cache will be persisted in the given directory;
if not specified, the cache will be in-memory only")
-            .required(false)
-            .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
-            .build();
+        .name("Persistence Directory")
+        .description("If specified, the cache will be persisted in the given directory; if
not specified, the cache will be in-memory only")
+        .required(false)
+        .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+        .build();
 
     private volatile CacheServer cacheServer;
 
@@ -103,5 +103,12 @@ public abstract class DistributedCacheServer extends AbstractControllerService
{
         shutdownServer();
     }
 
+    /**
+     * @return the port that the server is listening on, or -1 if the server has not been
started
+     */
+    public int getPort() {
+        return cacheServer == null ? -1 : cacheServer.getPort();
+    }
+
     protected abstract CacheServer createCacheServer(ConfigurationContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 42698b8..82e4a99 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.distributed.cache.server;
 
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -29,8 +27,11 @@ import java.io.OutputStream;
 import java.net.ConnectException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
@@ -38,16 +39,16 @@ import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
-
-import org.apache.commons.lang3.SerializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
 import org.junit.Assume;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,20 +66,24 @@ public class TestServerAndClient {
         LOGGER = LoggerFactory.getLogger(TestServerAndClient.class);
     }
 
-    @Ignore("Test fails when in a maven parallel build due to address/port already taken
- need to vary these so tests can run in parallel")
     @Test
     public void testNonPersistentSetServerAndClient() throws InitializationException, IOException
{
+
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK
1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
-        final DistributedSetCacheServer server = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
-
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedSetCacheServer server = new SetServer();
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
 
-        final DistributedSetCacheClientService client = createClient();
+        final DistributedSetCacheClientService client = createClient(server.getPort());
         final Serializer<String> serializer = new StringSerializer();
         final boolean added = client.addIfAbsent("test", serializer);
         assertTrue(added);
@@ -98,24 +103,28 @@ public class TestServerAndClient {
         server.shutdownServer();
     }
 
-    @Ignore("Test fails when in a maven parallel build due to address/port already taken
- need to vary these so tests can run in parallel")
     @Test
     public void testPersistentSetServerAndClient() throws InitializationException, IOException
{
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK
1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final DistributedSetCacheServer server = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
 
         final File dataFile = new File("target/cache-data");
         deleteRecursively(dataFile);
 
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
+        // Create server
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedSetCacheServer server = new SetServer();
+        runner.addControllerService("server", server);
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.enableControllerService(server);
 
-        final DistributedSetCacheClientService client = createClient();
+        DistributedSetCacheClientService client = createClient(server.getPort());
         final Serializer<String> serializer = new StringSerializer();
         final boolean added = client.addIfAbsent("test", serializer);
         final boolean added2 = client.addIfAbsent("test2", serializer);
@@ -137,41 +146,45 @@ public class TestServerAndClient {
         assertFalse(containedAfterRemove);
 
         server.shutdownServer();
+        client.close();
 
-        final DistributedSetCacheServer newServer = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer,
"server2");
-        newServer.initialize(newServerInitContext);
-
-        final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties,
-                newServerInitContext.getControllerServiceLookup());
-        newServer.startServer(newServerContext);
+        final DistributedSetCacheServer newServer = new SetServer();
+        runner.addControllerService("server2", newServer);
+        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.enableControllerService(newServer);
+        client = createClient(newServer.getPort());
 
         assertFalse(client.contains("test", serializer));
         assertTrue(client.contains("test2", serializer));
 
         newServer.shutdownServer();
+        client.close();
     }
 
-    @Ignore("Test fails when in a maven parallel build due to address/port already taken
- need to vary these so tests can run in parallel")
     @Test
     public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException,
IOException {
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK
1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
-        final DistributedSetCacheServer server = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
-
         final File dataFile = new File("target/cache-data");
         deleteRecursively(dataFile);
 
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
-
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
-
-        final DistributedSetCacheClientService client = createClient();
+        // Create server
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedSetCacheServer server = new SetServer();
+        runner.addControllerService("server", server);
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
+        runner.enableControllerService(server);
+
+        DistributedSetCacheClientService client = createClient(server.getPort());
         final Serializer<String> serializer = new StringSerializer();
         final boolean added = client.addIfAbsent("test", serializer);
         waitABit();
@@ -199,13 +212,13 @@ public class TestServerAndClient {
 
         server.shutdownServer();
 
-        final DistributedSetCacheServer newServer = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer,
"server2");
-        newServer.initialize(newServerInitContext);
 
-        final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties,
-                newServerInitContext.getControllerServiceLookup());
-        newServer.startServer(newServerContext);
+        final DistributedSetCacheServer newServer = new SetServer();
+        runner.addControllerService("server2", newServer);
+        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.enableControllerService(newServer);
+        client.close();
+        client = createClient(newServer.getPort());
 
         assertTrue(client.contains("test", serializer));
         assertTrue(client.contains("test2", serializer));
@@ -213,29 +226,33 @@ public class TestServerAndClient {
         assertTrue(client.contains("test4", serializer));
 
         newServer.shutdownServer();
+        client.close();
     }
 
-    @Ignore("Test fails when in a maven parallel build due to address/port already taken
- need to vary these so tests can run in parallel")
     @Test
     public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException,
IOException {
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK
1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final DistributedSetCacheServer server = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
 
         final File dataFile = new File("target/cache-data");
         deleteRecursively(dataFile);
 
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
-        serverProperties.put(DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
-
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
-
-        final DistributedSetCacheClientService client = createClient();
+        // Create server
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedSetCacheServer server = new SetServer();
+        runner.addControllerService("server", server);
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
+        runner.enableControllerService(server);
+
+        DistributedSetCacheClientService client = createClient(server.getPort());
         final Serializer<String> serializer = new StringSerializer();
 
         // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
@@ -267,35 +284,42 @@ public class TestServerAndClient {
         assertTrue(client.contains("test3", serializer));
 
         server.shutdownServer();
+        client.close();
 
-        final DistributedSetCacheServer newServer = new DistributedSetCacheServer();
-        MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer,
"server2");
-        newServer.initialize(newServerInitContext);
 
-        final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties,
-                newServerInitContext.getControllerServiceLookup());
-        newServer.startServer(newServerContext);
+        final DistributedSetCacheServer newServer = new SetServer();
+        runner.addControllerService("server2", newServer);
+        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
+        runner.enableControllerService(newServer);
 
+        client = createClient(newServer.getPort());
         assertFalse(client.contains("test", serializer));
         assertTrue(client.contains("test2", serializer));
         assertTrue(client.contains("test3", serializer));
         assertTrue(client.contains("test4", serializer));
 
         newServer.shutdownServer();
+        client.close();
     }
 
-    @Ignore("Test fails when in a maven parallel build due to address/port already taken
- need to vary these so tests can run in parallel")
     @Test
     public void testNonPersistentMapServerAndClient() throws InitializationException, IOException,
InterruptedException {
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK
1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final DistributedMapCacheServer server = new DistributedMapCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
 
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
+        // Create server
+        final DistributedMapCacheServer server = new MapServer();
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
 
         DistributedMapCacheClientService client = new DistributedMapCacheClientService();
         MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client,
"client");
@@ -303,6 +327,7 @@ public class TestServerAndClient {
 
         final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
         clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
         clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360
secs");
         MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup());
         client.cacheConfig(clientContext);
@@ -338,7 +363,7 @@ public class TestServerAndClient {
         try {
             client.containsKey("testKey", keySerializer);
             fail("Should be closed and not accessible");
-        } catch (Exception e) {
+        } catch (final Exception e) {
 
         }
         client = null;
@@ -346,12 +371,11 @@ public class TestServerAndClient {
         clientContext = null;
 
         DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
-
         MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2,
"client2");
         client2.initialize(clientInitContext2);
 
         MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties,
-                clientInitContext2.getControllerServiceLookup());
+            clientInitContext2.getControllerServiceLookup());
         client2.cacheConfig(clientContext2);
         assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer));
         assertTrue(client2.containsKey("testKey", keySerializer));
@@ -360,13 +384,11 @@ public class TestServerAndClient {
         try {
             client2.containsKey("testKey", keySerializer);
             fail("Should have blown exception!");
-        } catch (ConnectException e) {
+        } catch (final ConnectException e) {
             client2 = null;
             clientContext2 = null;
             clientInitContext2 = null;
         }
-        Thread.sleep(2000);
-        System.gc();
         LOGGER.debug("end testNonPersistentMapServerAndClient");
     }
 
@@ -377,12 +399,12 @@ public class TestServerAndClient {
          * This bypasses the test for build environments in OS X running Java 1.8 due to
a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437
          */
         Assume.assumeFalse("testClientTermination is skipped due to build environment being
OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
-                SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
-        final DistributedMapCacheServer server = new DistributedMapCacheServer();
-        MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
+        final DistributedMapCacheServer server = new MapServer();
+        final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
         server.initialize(serverInitContext);
 
         final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
@@ -428,65 +450,6 @@ public class TestServerAndClient {
         server.shutdownServer();
     }
 
-    @Ignore
-    @Test
-    public void testSSLWith2RequestsWithServerTimeout() throws InitializationException, IOException,
InterruptedException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create SSLContext Service
-        final StandardSSLContextService sslService = new StandardSSLContextService();
-        final MockControllerServiceInitializationContext sslServerInitContext = new MockControllerServiceInitializationContext(sslService,
-                "ssl-context");
-        sslService.initialize(sslServerInitContext);
-
-        final Map<PropertyDescriptor, String> sslServerProps = new HashMap<>();
-        sslServerProps.put(StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
-        sslServerProps.put(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
-        sslServerProps.put(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-        sslServerProps.put(StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
-        sslServerProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
-        sslServerProps.put(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-        MockConfigurationContext sslServerContext = new MockConfigurationContext(sslServerProps,
sslServerInitContext);
-        sslService.onConfigured(sslServerContext);
-        sslService.createSSLContext(ClientAuth.REQUIRED);
-        // Create server
-        final DistributedMapCacheServer server = new DistributedMapCacheServer();
-        final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server,
"server");
-        server.initialize(serverInitContext);
-
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        serverProperties.put(DistributedMapCacheServer.SSL_CONTEXT_SERVICE, "ssl-context");
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties,
serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
-
-        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client,
"client");
-        client.initialize(clientInitContext);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360
secs");
-        clientProperties.put(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, "ssl-context");
-        MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup());
-        client.cacheConfig(clientContext);
-        final Serializer<String> valueSerializer = new StringSerializer();
-        final Serializer<String> keySerializer = new StringSerializer();
-        final Deserializer<String> deserializer = new StringDeserializer();
-
-        final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer,
valueSerializer, deserializer);
-        assertEquals(null, original);
-
-        Thread.sleep(30000);
-        try {
-            final boolean contains = client.containsKey("testKey", keySerializer);
-            assertTrue(contains);
-        } catch (IOException e) {
-            // this is due to the server timing out in the middle of this request
-            assertTrue(e.getMessage().contains("Channel is closed"));
-        }
-
-        server.shutdownServer();
-    }
-
     private void waitABit() {
         try {
             Thread.sleep(10L);
@@ -494,13 +457,14 @@ public class TestServerAndClient {
         }
     }
 
-    private DistributedSetCacheClientService createClient() throws InitializationException
{
+    private DistributedSetCacheClientService createClient(final int port) throws InitializationException
{
         final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client,
"client");
+        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client,
"client");
         client.initialize(clientInitContext);
 
         final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
         clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
         final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup());
         client.onConfigured(clientContext);
 
@@ -519,7 +483,7 @@ public class TestServerAndClient {
 
         @Override
         public String deserialize(final byte[] input) throws DeserializationException, IOException
{
-            return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8);
+            return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
         }
     }
 
@@ -543,4 +507,30 @@ public class TestServerAndClient {
             }
         }
     }
+
+    private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor>
descriptors) {
+        descriptors.remove(DistributedCacheServer.PORT);
+        descriptors.add(new PropertyDescriptor.Builder()
+            .name("Port")
+            .description("The port to listen on for incoming connections")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(0L, 65535L, true))
+            .defaultValue("0")
+            .build());
+        return descriptors;
+    }
+
+    private static class SetServer extends DistributedSetCacheServer {
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            return replacePortDescriptor(super.getSupportedPropertyDescriptors());
+        }
+    }
+
+    private static class MapServer extends DistributedMapCacheServer {
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            return replacePortDescriptor(super.getSupportedPropertyDescriptors());
+        }
+    }
 }



Mime
View raw message