pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2070: Run functions runtime test with thread mode
Date Tue, 03 Jul 2018 00:14:23 GMT
sijie closed pull request #2070: Run functions runtime test with thread mode
URL: https://github.com/apache/incubator-pulsar/pull/2070
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docker/pulsar/scripts/gen-yml-from-env.py b/docker/pulsar/scripts/gen-yml-from-env.py
index 28599a54b9..45fe2a5464 100755
--- a/docker/pulsar/scripts/gen-yml-from-env.py
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -27,6 +27,18 @@
 import os, sys
 import yaml
 
+INT_KEYS = [
+    'workerPort',
+    'numFunctionPackageReplicas',
+    'failureCheckFreqMs',
+    'rescheduleTimeoutMs',
+    'initialBrokerReconnectMaxRetries',
+    'assignmentWriteMaxRetries',
+    'instanceLivenessCheckFreqMs'
+]
+
+PF_ENV_PREFIX = 'PF_'
+
 if len(sys.argv) < 2:
     print 'Usage: %s' % (sys.argv[0])
     sys.exit(1)
@@ -39,25 +51,29 @@
     # update the config
     modified = False
     for k in sorted(os.environ.keys()):
-        key_parts = k.split('_')
+        if not k.startswith(PF_ENV_PREFIX):
+            continue
+
         v = os.environ[k]
 
+        k = k[len(PF_ENV_PREFIX):]
+        key_parts = k.split('_')
+
         i = 0
         conf_to_modify = conf
         while i < len(key_parts):
             key_part = key_parts[i]
-            if not key_part in conf_to_modify:
-                break
-
             if i == (len(key_parts) - 1):
-                if key_part == 'workerPort':
+                if key_part in INT_KEYS:
                     conf_to_modify[key_part] = int(v)
                 else:
                     conf_to_modify[key_part] = v
-
                 modified = True
             else:
+                if not key_part in conf_to_modify:
+                    conf_to_modify[key_part] = {}
                 conf_to_modify = conf_to_modify[key_part]
+                modified = True
             i += 1
     # Store back the updated config in the same file
     f = open(conf_filename , 'w')
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index efbd0fdd84..3e8f4b30c8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -54,7 +54,6 @@
     private String pulsarFunctionsCluster;
     private int numFunctionPackageReplicas;
     private String downloadDirectory;
-    private long snapshotFreqMs;
     private String stateStorageServiceUrl;
     private String functionAssignmentTopicName;
     private String schedulerClassName;
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index a721915d62..c7bcf621db 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -183,7 +183,7 @@ public void stop() {
         }
     }
 
-    public void startFunctionWorkers(int numFunctionWorkers) {
+    public void startFunctionWorkersWithProcessContainerFactory(int numFunctionWorkers) {
         String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
@@ -193,14 +193,39 @@ public void startFunctionWorkers(int numFunctionWorkers) {
                 .withNetwork(network)
                 .withNetworkAliases(name)
                 // worker settings
-                .withEnv("workerId", name)
-                .withEnv("workerHostname", name)
-                .withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
-                .withEnv("pulsarFunctionsCluster", clusterName)
-                .withEnv("pulsarServiceUrl", serviceUrl)
-                .withEnv("pulsarWebServiceUrl", httpServiceUrl)
+                .withEnv("PF_workerId", name)
+                .withEnv("PF_workerHostname", name)
+                .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+                .withEnv("PF_pulsarFunctionsCluster", clusterName)
+                .withEnv("PF_pulsarServiceUrl", serviceUrl)
+                .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+                // script
                 .withEnv("clusterName", clusterName)
+                .withEnv("zookeeperServers", ZKContainer.NAME)
+                // bookkeeper tools
+                .withEnv("zkServers", ZKContainer.NAME)
+        ));
+    }
+
+    public void startFunctionWorkersWithThreadContainerFactory(int numFunctionWorkers) {
+        String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
+        String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
+        workerContainers.putAll(runNumContainers(
+            "functions-worker",
+            numFunctionWorkers,
+            (name) -> new WorkerContainer(clusterName, name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                // worker settings
+                .withEnv("PF_workerId", name)
+                .withEnv("PF_workerHostname", name)
+                .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+                .withEnv("PF_pulsarFunctionsCluster", clusterName)
+                .withEnv("PF_pulsarServiceUrl", serviceUrl)
+                .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+                .withEnv("PF_threadContainerFactory_threadGroupName", "pf-container-group")
                 // script
+                .withEnv("clusterName", clusterName)
                 .withEnv("zookeeperServers", ZKContainer.NAME)
                 // bookkeeper tools
                 .withEnv("zkServers", ZKContainer.NAME)
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a35d4662b2..625d800d54 100644
--- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,20 +18,10 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.containers.WorkerContainer;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
 import org.apache.pulsar.tests.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
@@ -94,126 +84,6 @@ public void checkDownload() throws Exception {
         assertTrue(output.getStderr().isEmpty());
     }
 
-    //
-    // Test CRUD functions on different runtimes.
-    //
-
-    @Test(dataProvider = "FunctionRuntimes")
-    public void testExclamationFunction(Runtime runtime) throws Exception {
-        String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
-        String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
-        String functionName = "test-exclamation-fn-" + randomName(8);
-        final int numMessages = 10;
-
-        // submit the exclamation function
-        submitExclamationFunction(
-            inputTopicName, outputTopicName, functionName);
-
-        // get function info
-        getFunctionInfoSuccess(functionName);
-
-        // publish and consume result
-        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
-
-        // get function status
-        getFunctionStatus(functionName, numMessages);
-
-        // delete function
-        deleteFunction(functionName);
 
-        // get function info
-        getFunctionInfoNotFound(functionName);
-    }
-
-    private static void submitExclamationFunction(String inputTopicName,
-                                                  String outputTopicName,
-                                                  String functionName) throws Exception {
-        CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName,
EXCLAMATION_FUNC_CLASS);
-        generator.setSinkTopic(outputTopicName);
-        generator.setFunctionName(functionName);
-        String command = generator.generateCreateFunctionCommand();
-        String[] commands = {
-            "sh", "-c", command
-        };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            commands);
-        assertTrue(result.getStdout().contains("\"Created successfully\""));
-    }
-
-    private static void getFunctionInfoSuccess(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
-    }
-
-    private static void getFunctionInfoNotFound(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStderr().contains("Reason: Function " + functionName + " doesn't
exist"));
-    }
-
-    private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("\"running\": true"));
-        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
-        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\""));
-    }
-
-    private static void publishAndConsumeMessages(String inputTopic,
-                                                  String outputTopic,
-                                                  int numMessages) throws Exception {
-        @Cleanup PulsarClient client = PulsarClient.builder()
-            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-            .build();
-        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
-            .topic(outputTopic)
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscriptionName("test-sub")
-            .subscribe();
-        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(inputTopic)
-            .create();
-
-        for (int i = 0; i < numMessages; i++) {
-            producer.send("message-" + i);
-        }
-
-        for (int i = 0; i < numMessages; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals("message-" + i + "!", msg.getValue());
-        }
-    }
-
-    private static void deleteFunction(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "delete",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("Deleted successfully"));
-        assertTrue(result.getStderr().isEmpty());
-    }
 
 }
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index c0acd1de54..dc2e290e5a 100644
--- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,15 +18,18 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 /**
  * A cluster to run pulsar functions for testing functions related features.
  */
-public class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
+@Slf4j
+public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
 
     public static final String EXCLAMATION_FUNC_CLASS =
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
@@ -35,7 +38,10 @@
     public static void setupCluster() throws Exception {
         PulsarClusterTestBase.setupCluster();
 
-        pulsarCluster.startFunctionWorkers(1);
+        pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
+
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
+        log.info("Functions Worker Config : \n{}", result.getStdout());
     }
 
     @DataProvider(name = "FunctionRuntimes")
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
new file mode 100644
index 0000000000..7c4033f07c
--- /dev/null
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.runtime;
+
+/**
+ * Run runtime tests in process mode.
+ */
+public class PulsarFunctionsProcessRuntimeTest extends PulsarFunctionsRuntimeTest {
+}
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
new file mode 100644
index 0000000000..e816f70c80
--- /dev/null
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.runtime;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.Test;
+
+/**
+ * The tests that run over different container mode.
+ */
+public abstract class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
+
+    //
+    // Test CRUD functions on different runtimes.
+    //
+
+    @Test(dataProvider = "FunctionRuntimes")
+    public void testExclamationFunction(Runtime runtime) throws Exception {
+        String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
+        String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+        String functionName = "test-exclamation-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitExclamationFunction(
+            inputTopicName, outputTopicName, functionName);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+
+        // get function status
+        getFunctionStatus(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void submitExclamationFunction(String inputTopicName,
+                                                  String outputTopicName,
+                                                  String functionName) throws Exception {
+        CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName,
EXCLAMATION_FUNC_CLASS);
+        generator.setSinkTopic(outputTopicName);
+        generator.setFunctionName(functionName);
+        String command = generator.generateCreateFunctionCommand();
+        String[] commands = {
+            "sh", "-c", command
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+    }
+
+    private static void getFunctionInfoSuccess(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
+    }
+
+    private static void getFunctionInfoNotFound(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStderr().contains("Reason: Function " + functionName + " doesn't
exist"));
+    }
+
+    private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"running\": true"));
+        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
+        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\""));
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages) throws Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("message-" + i + "!", msg.getValue());
+        }
+    }
+
+    private static void deleteFunction(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+}
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
new file mode 100644
index 0000000000..61ddba3704
--- /dev/null
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.runtime;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * Run the runtime test cases in thread mode.
+ */
+@Slf4j
+public class PulsarFunctionsThreadRuntimeTest extends PulsarFunctionsRuntimeTest {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        PulsarClusterTestBase.setupCluster();
+
+        pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);
+
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
+        log.info("Functions Worker Config : \n{}", result.getStdout());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message