pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Make sure to create client before doing admission check (#2945)
Date Wed, 07 Nov 2018 05:09:16 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 144ee74  Make sure to create client before doing admission check (#2945)
144ee74 is described below

commit 144ee74eeb72d5636d57017839ee01b6f4f4cbd7
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Tue Nov 6 21:09:10 2018 -0800

    Make sure to create client before doing admission check (#2945)
    
    * Make sure to create client before doing admission check
    
    * Added unittest
---
 .../runtime/KubernetesRuntimeFactory.java          |   5 +
 .../runtime/KubernetesRuntimeFactoryTest.java      | 172 +++++++++++++++++++++
 2 files changed, 177 insertions(+)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 78eea16..3e06c03 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -201,6 +201,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     @Override
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
         KubernetesRuntime.doChecks(functionDetails);
+        try {
+            setupClient();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
         secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(),
functionDetails);
     }
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
new file mode 100644
index 0000000..832286e
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.protobuf.util.JsonFormat;
+import io.kubernetes.client.apis.AppsV1Api;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1PodSpec;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link ThreadRuntime}.
+ */
+public class KubernetesRuntimeFactoryTest {
+
+    class TestSecretProviderConfigurator implements SecretsProviderConfigurator {
+
+        @Override
+        public void init(Map<String, String> config) {
+
+        }
+
+        @Override
+        public String getSecretsProviderClassName(FunctionDetails functionDetails) {
+            if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+                if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) {
+                    return ClearTextSecretsProvider.class.getName();
+                } else {
+                    return "secretsprovider.ClearTextSecretsProvider";
+                }
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public Map<String, String> getSecretsProviderConfig(FunctionDetails functionDetails)
{
+            HashMap<String, String> map = new HashMap<>();
+            map.put("Somevalue", "myvalue");
+            return map;
+        }
+
+        @Override
+        public void configureKubernetesRuntimeSecretsProvider(V1PodSpec podSpec, String functionsContainerName,
FunctionDetails functionDetails) {
+
+        }
+
+        @Override
+        public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder,
FunctionDetails functionDetails) {
+
+        }
+
+        @Override
+        public Type getSecretObjectType() {
+            return null;
+        }
+
+        @Override
+        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace,
FunctionDetails functionDetails) {
+
+        }
+    }
+
+    private KubernetesRuntimeFactory factory;
+    private final String userJarFile;
+    private final String pulsarRootDir;
+    private final String javaInstanceJarFile;
+    private final String pythonInstanceFile;
+    private final String pulsarServiceUrl;
+    private final String pulsarAdminUrl;
+    private final String stateStorageServiceUrl;
+    private final String logDirectory;
+
+    public KubernetesRuntimeFactoryTest() throws Exception {
+        this.userJarFile = "UserJar.jar";
+        this.pulsarRootDir = "/pulsar";
+        this.javaInstanceJarFile = "/pulsar/instances/java-instance.jar";
+        this.pythonInstanceFile = "/pulsar/instances/python-instance/python_instance_main.py";
+        this.pulsarServiceUrl = "pulsar://localhost:6670";
+        this.pulsarAdminUrl = "http://localhost:8080";
+        this.stateStorageServiceUrl = "bk://localhost:4181";
+        this.logDirectory = "logs/functions";
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        if (null != this.factory) {
+            this.factory.close();
+        }
+    }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir) throws Exception
{
+        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
+            null,
+            null,
+            null,
+            pulsarRootDir,
+            false,
+            true,
+            "myrepo",
+            "anotherrepo",
+            extraDepsDir,
+            null,
+            pulsarServiceUrl,
+            pulsarAdminUrl,
+            stateStorageServiceUrl,
+            null,
+            null,
+            null,
+            null,
+            new TestSecretProviderConfigurator()));
+        doNothing().when(factory).setupClient();
+        return factory;
+    }
+
+    FunctionDetails createFunctionDetails() {
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setTenant("public");
+        functionDetailsBuilder.setNamespace("default");
+        functionDetailsBuilder.setName("function");
+        functionDetailsBuilder.setSecretsMap("SomeMap");
+        return functionDetailsBuilder.build();
+    }
+
+    @Test
+    public void testAdmissionChecks() throws Exception {
+        factory = createKubernetesRuntimeFactory(null);
+        FunctionDetails functionDetails = createFunctionDetails();
+        factory.doAdmissionChecks(functionDetails);
+        verify(factory, times(1)).setupClient();
+
+    }
+
+}


Mime
View raw message