pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2214: Add tls support to authenticate client to access function admin-api
Date Thu, 02 Aug 2018 21:34:50 GMT
sijie closed pull request #2214: Add tls support to authenticate client to access function
admin-api
URL: https://github.com/apache/incubator-pulsar/pull/2214
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 4194337e4a..70ae311819 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -20,6 +20,7 @@
 workerId: standalone
 workerHostname: localhost
 workerPort: 6750
+workerPortTls: 6751
 
 connectorsDirectory: ./connectors
 
@@ -53,3 +54,15 @@ authorizationEnabled: false
 authenticationProviders: 
 # Set of role names that are treated as "super-user", meaning they will be able to access
any admin-api
 superUserRoles: 
+
+#### tls configuration
+# Enable TLS
+tlsEnabled: false
+# Path for the TLS certificate file
+tlsCertificateFilePath: 
+# Path for the TLS private key file
+tlsKeyFilePath: 
+# Path for the trusted TLS certificate file 
+tlsTrustCertsFilePath:
+# Accept untrusted TLS certificate from client 
+tlsAllowInsecureConnection: false
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
new file mode 100644
index 0000000000..8f8b7ffd05
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.WorkerServer;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Test Pulsar function TLS authentication
+ *
+ */
+public class PulsarFunctionTlsTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL urlTls;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String workerId;
+    WorkerServer workerServer;
+    Thread serverThread;
+    PulsarAdmin functionAdmin;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+    private final int workerServicePortTls = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+        config.setTlsEnabled(true);
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsAllowInsecureConnection(true);
+        functionsWorkerService = spy(createPulsarFunctionWorker(config));
+        AuthenticationService authenticationService = new AuthenticationService(config);
+        when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
+        when(functionsWorkerService.isInitialized()).thenReturn(true);
+
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Tenants tenants = mock(Tenants.class);
+        when(admin.tenants()).thenReturn(tenants);
+        when(functionsWorkerService.getAdmin()).thenReturn(admin);
+        Set<String> admins = Sets.newHashSet("superUser");
+        TenantInfo tenantInfo = new TenantInfo(admins, null);
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
+
+        // mock: once authentication passes, function should return response: function already
exist
+        FunctionMetaDataManager dataManager = mock(FunctionMetaDataManager.class);
+        when(dataManager.containsFunction(any(), any(), any())).thenReturn(true);
+        when(functionsWorkerService.getFunctionMetaDataManager()).thenReturn(dataManager);
+
+        workerServer = new WorkerServer(functionsWorkerService);
+        serverThread = new Thread(workerServer, workerServer.getThreadName());
+        serverThread.start();
+        Thread.sleep(2000);
+        String functionTlsUrl = String.format("https://%s:%s",
+                functionsWorkerService.getWorkerConfig().getWorkerHostname(), workerServicePortTls);
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        functionAdmin = PulsarAdmin.builder().serviceHttpUrl(functionTlsUrl)
+                .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH).allowTlsInsecureConnection(true)
+                .authentication(authTls).build();
+
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        functionAdmin.close();
+        bkEnsemble.stop();
+        workerServer.stop();
+        if (null != serverThread) {
+            serverThread.interrupt();
+            try {
+                serverThread.join();
+            } catch (InterruptedException e) {
+                log.warn("Worker server thread is interrupted", e);
+            }
+        }
+        functionsWorkerService.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH,
TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
+        workerConfig.setWorkerPortTls(workerServicePortTls);
+        workerConfig.setTlsEnabled(true);
+        workerConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        workerConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
+
+        return new WorkerService(workerConfig);
+    }
+
+    @Test
+    public void testAuthorization() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+
+        String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
+                PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+        FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl,
tenant, namespacePortion,
+                functionName, sinkTopic, subscriptionName);
+
+        try {
+            functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
+            fail("Authentication should pass but call should fail with function already exist");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("already exists"));
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5398bc9b15..abc4735ea1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -381,8 +381,7 @@ public void testPulsarSinkStats() throws Exception {
         assertEquals(ownerWorkerId, workerId);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace,
String functionName,
-            String sinkTopic, String subscriptionName) {
+    protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String
namespace, String functionName, String sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index eda9b1515f..83a20bf9ce 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -53,6 +53,7 @@
     private String workerId;
     private String workerHostname;
     private int workerPort;
+    private int workerPortTls;
     private String connectorsDirectory = "./connectors";
     private String functionMetadataTopicName;
     private String pulsarServiceUrl;
@@ -73,10 +74,21 @@
     private long instanceLivenessCheckFreqMs;
     private String clientAuthenticationPlugin;
     private String clientAuthenticationParameters;
-    private boolean useTls = false;
+    /***** --- TLS --- ****/
+    // Enable TLS
+    private boolean tlsEnabled = false;
+    // Path for the TLS certificate file
+    private String tlsCertificateFilePath;
+    // Path for the TLS private key file
+    private String tlsKeyFilePath;
+    // Path for the trusted TLS certificate file
     private String tlsTrustCertsFilePath = "";
+    // Accept untrusted TLS certificate from client
     private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
+    private boolean useTls = false;
     private boolean tlsHostnameVerificationEnable = false;
+    
     private int metricsSamplingPeriodSec = 60;
     // Enforce authentication
     private boolean authenticationEnabled = false;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a57a9529fa..f6e148e719 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -21,36 +21,47 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
-
 import javax.servlet.DispatcherType;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
 
 import java.net.BindException;
 import java.net.URI;
-
+import java.security.GeneralSecurityException;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 @Slf4j
 public class WorkerServer implements Runnable {
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
     private static final String MATCH_ALL = "/*";
+    private static final int NUM_ACCEPTORS = 16;
+    private static final int MAX_CONCURRENT_REQUESTS = 1024;
+    private final ExecutorService webServerExecutor;
+    private Server server;
 
     private static String getErrorMessage(Server server, int port, Exception ex) {
         if (ex instanceof BindException) {
@@ -64,13 +75,21 @@ private static String getErrorMessage(Server server, int port, Exception
ex) {
     public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
+        this.webServerExecutor = Executors.newFixedThreadPool(NUM_ACCEPTORS, new DefaultThreadFactory("function-web"));
     }
 
     @Override
     public void run() {
-        final Server server = new Server(this.workerConfig.getWorkerPort());
-
-        List<Handler> handlers = new ArrayList<>(2);
+        server = new Server(new ExecutorThreadPool(webServerExecutor));
+
+        
+        List<ServerConnector> connectors = new ArrayList<>();
+        ServerConnector connector = new ServerConnector(server, 1, 1);
+        connector.setPort(this.workerConfig.getWorkerPort());
+        connector.setHost(this.workerConfig.getWorkerHostname());
+        connectors.add(connector);
+        
+        List<Handler> handlers = new ArrayList<>(3);
         handlers.add(newServletContextHandler("/admin",
                 new ResourceConfig(Resources.getApiResources()), workerService));
         handlers.add(newServletContextHandler("/admin/v2",
@@ -85,6 +104,27 @@ public void run() {
             contexts, new DefaultHandler()
         });
         server.setHandler(handlerCollection);
+        
+        if (this.workerConfig.isTlsEnabled()) {
+            try {
+                SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
+                        this.workerConfig.isTlsAllowInsecureConnection(),
+                        this.workerConfig.getTlsTrustCertsFilePath(),
+                        this.workerConfig.getTlsCertificateFilePath(),
+                        this.workerConfig.getTlsKeyFilePath(),
+                        this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
+                tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
+                tlsConnector.setHost(this.workerConfig.getWorkerHostname());
+                connectors.add(tlsConnector);
+            } catch (GeneralSecurityException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
+        connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS / connectors.size()));
+        server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
 
         try {
             server.start();
@@ -96,7 +136,6 @@ public void run() {
             log.error("ex: {}", ex, ex);
             final String message = getErrorMessage(server, this.workerConfig.getWorkerPort(),
ex);
             log.error(message);
-            System.exit(1);
         } finally {
             server.destroy();
         }
@@ -124,4 +163,15 @@ public static ServletContextHandler newServletContextHandler(String contextPath,
         return contextHandler;
     }
     
+    @VisibleForTesting
+    public void stop() {
+        if (this.server != null) {
+            try {
+                this.server.stop();
+            } catch (Exception e) {
+                log.error("Failed to stop function web-server ", e);
+            }
+        }
+    }
+    
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message