pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: Add tls support to authenticate client to access function admin-api (#2214)
Date Thu, 02 Aug 2018 21:34:51 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 87fa838  Add tls support to authenticate client to access function admin-api (#2214)
87fa838 is described below

commit 87fa8381516c5b591e3d2888d46d47e94034a72a
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Thu Aug 2 14:34:48 2018 -0700

    Add tls support to authenticate client to access function admin-api (#2214)
    
    ### Motivation
    
    Add TLS support on function webservice so, function-authentication in #2213 can leverage
TLS authentication.
    
    ### Modifications
    
    Function server can now support TLS.
    
    ### Note
    I will add test once #2213 is merged and we can add TLS-authentication test on top of
it.
---
 conf/functions_worker.yml                          |  13 ++
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    | 230 +++++++++++++++++++++
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |   3 +-
 .../pulsar/functions/worker/WorkerConfig.java      |  14 +-
 .../pulsar/functions/worker/rest/WorkerServer.java |  68 +++++-
 5 files changed, 316 insertions(+), 12 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 4194337..70ae311 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -20,6 +20,7 @@
 workerId: standalone
 workerHostname: localhost
 workerPort: 6750
+workerPortTls: 6751
 
 connectorsDirectory: ./connectors
 
@@ -53,3 +54,15 @@ authorizationEnabled: false
 authenticationProviders: 
 # Set of role names that are treated as "super-user", meaning they will be able to access
any admin-api
 superUserRoles: 
+
+#### tls configuration
+# Enable TLS
+tlsEnabled: false
+# Path for the TLS certificate file
+tlsCertificateFilePath: 
+# Path for the TLS private key file
+tlsKeyFilePath: 
+# Path for the trusted TLS certificate file 
+tlsTrustCertsFilePath:
+# Accept untrusted TLS certificate from client 
+tlsAllowInsecureConnection: false
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
new file mode 100644
index 0000000..8f8b7ff
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.WorkerServer;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Test Pulsar function TLS authentication
+ *
+ */
+public class PulsarFunctionTlsTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL urlTls;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String workerId;
+    WorkerServer workerServer;
+    Thread serverThread;
+    PulsarAdmin functionAdmin;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+    private final int workerServicePortTls = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+        config.setTlsEnabled(true);
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsAllowInsecureConnection(true);
+        functionsWorkerService = spy(createPulsarFunctionWorker(config));
+        AuthenticationService authenticationService = new AuthenticationService(config);
+        when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
+        when(functionsWorkerService.isInitialized()).thenReturn(true);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Tenants tenants = mock(Tenants.class);
+        when(admin.tenants()).thenReturn(tenants);
+        when(functionsWorkerService.getAdmin()).thenReturn(admin);
+        Set<String> admins = Sets.newHashSet("superUser");
+        TenantInfo tenantInfo = new TenantInfo(admins, null);
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
+
+        // mock: once authentication passes, function should return response: function already
exist
+        FunctionMetaDataManager dataManager = mock(FunctionMetaDataManager.class);
+        when(dataManager.containsFunction(any(), any(), any())).thenReturn(true);
+        when(functionsWorkerService.getFunctionMetaDataManager()).thenReturn(dataManager);
+
+        workerServer = new WorkerServer(functionsWorkerService);
+        serverThread = new Thread(workerServer, workerServer.getThreadName());
+        serverThread.start();
+        Thread.sleep(2000);
+        String functionTlsUrl = String.format("https://%s:%s",
+                functionsWorkerService.getWorkerConfig().getWorkerHostname(), workerServicePortTls);
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        functionAdmin = PulsarAdmin.builder().serviceHttpUrl(functionTlsUrl)
+                .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH).allowTlsInsecureConnection(true)
+                .authentication(authTls).build();
+
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        functionAdmin.close();
+        bkEnsemble.stop();
+        workerServer.stop();
+        if (null != serverThread) {
+            serverThread.interrupt();
+            try {
+                serverThread.join();
+            } catch (InterruptedException e) {
+                log.warn("Worker server thread is interrupted", e);
+            }
+        }
+        functionsWorkerService.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH,
TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
+        workerConfig.setWorkerPortTls(workerServicePortTls);
+        workerConfig.setTlsEnabled(true);
+        workerConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        workerConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
+
+        return new WorkerService(workerConfig);
+    }
+
+    @Test
+    public void testAuthorization() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+
+        String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
+                PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+        FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl,
tenant, namespacePortion,
+                functionName, sinkTopic, subscriptionName);
+
+        try {
+            functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            fail("Authentication should pass but call should fail with function already exist");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("already exists"));
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5398bc9..abc4735 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -381,8 +381,7 @@ public class PulsarSinkE2ETest {
         assertEquals(ownerWorkerId, workerId);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace,
String functionName,
-            String sinkTopic, String subscriptionName) {
+    protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String
namespace, String functionName, String sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index eda9b15..83a20bf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -53,6 +53,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     private String workerId;
     private String workerHostname;
     private int workerPort;
+    private int workerPortTls;
     private String connectorsDirectory = "./connectors";
     private String functionMetadataTopicName;
     private String pulsarServiceUrl;
@@ -73,10 +74,21 @@ public class WorkerConfig implements Serializable, PulsarConfiguration
{
     private long instanceLivenessCheckFreqMs;
     private String clientAuthenticationPlugin;
     private String clientAuthenticationParameters;
-    private boolean useTls = false;
+    /***** --- TLS --- ****/
+    // Enable TLS
+    private boolean tlsEnabled = false;
+    // Path for the TLS certificate file
+    private String tlsCertificateFilePath;
+    // Path for the TLS private key file
+    private String tlsKeyFilePath;
+    // Path for the trusted TLS certificate file
     private String tlsTrustCertsFilePath = "";
+    // Accept untrusted TLS certificate from client
     private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
+    private boolean useTls = false;
     private boolean tlsHostnameVerificationEnable = false;
+    
     private int metricsSamplingPeriodSec = 60;
     // Enforce authentication
     private boolean authenticationEnabled = false;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a57a952..f6e148e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -21,36 +21,47 @@ package org.apache.pulsar.functions.worker.rest;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
-
 import javax.servlet.DispatcherType;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
 
 import java.net.BindException;
 import java.net.URI;
-
+import java.security.GeneralSecurityException;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 @Slf4j
 public class WorkerServer implements Runnable {
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
     private static final String MATCH_ALL = "/*";
+    private static final int NUM_ACCEPTORS = 16;
+    private static final int MAX_CONCURRENT_REQUESTS = 1024;
+    private final ExecutorService webServerExecutor;
+    private Server server;
 
     private static String getErrorMessage(Server server, int port, Exception ex) {
         if (ex instanceof BindException) {
@@ -64,13 +75,21 @@ public class WorkerServer implements Runnable {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
+        this.webServerExecutor = Executors.newFixedThreadPool(NUM_ACCEPTORS, new DefaultThreadFactory("function-web"));
     }
 
     @Override
     public void run() {
-        final Server server = new Server(this.workerConfig.getWorkerPort());
-
-        List<Handler> handlers = new ArrayList<>(2);
+        server = new Server(new ExecutorThreadPool(webServerExecutor));
+
+        
+        List<ServerConnector> connectors = new ArrayList<>();
+        ServerConnector connector = new ServerConnector(server, 1, 1);
+        connector.setPort(this.workerConfig.getWorkerPort());
+        connector.setHost(this.workerConfig.getWorkerHostname());
+        connectors.add(connector);
+        
+        List<Handler> handlers = new ArrayList<>(3);
         handlers.add(newServletContextHandler("/admin",
                 new ResourceConfig(Resources.getApiResources()), workerService));
         handlers.add(newServletContextHandler("/admin/v2",
@@ -85,6 +104,27 @@ public class WorkerServer implements Runnable {
             contexts, new DefaultHandler()
         });
         server.setHandler(handlerCollection);
+        
+        if (this.workerConfig.isTlsEnabled()) {
+            try {
+                SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
+                        this.workerConfig.isTlsAllowInsecureConnection(),
+                        this.workerConfig.getTlsTrustCertsFilePath(),
+                        this.workerConfig.getTlsCertificateFilePath(),
+                        this.workerConfig.getTlsKeyFilePath(),
+                        this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
+                tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
+                tlsConnector.setHost(this.workerConfig.getWorkerHostname());
+                connectors.add(tlsConnector);
+            } catch (GeneralSecurityException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
+        connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS / connectors.size()));
+        server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
 
         try {
             server.start();
@@ -96,7 +136,6 @@ public class WorkerServer implements Runnable {
             log.error("ex: {}", ex, ex);
             final String message = getErrorMessage(server, this.workerConfig.getWorkerPort(),
ex);
             log.error(message);
-            System.exit(1);
         } finally {
             server.destroy();
         }
@@ -124,4 +163,15 @@ public class WorkerServer implements Runnable {
         return contextHandler;
     }
     
+    @VisibleForTesting
+    public void stop() {
+        if (this.server != null) {
+            try {
+                this.server.stop();
+            } catch (Exception e) {
+                log.error("Failed to stop function web-server ", e);
+            }
+        }
+    }
+    
 }


Mime
View raw message