pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jerryp...@apache.org
Subject [pulsar] branch master updated: Fix bug for not setting resources (#3005)
Date Mon, 19 Nov 2018 02:44:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a22842e   Fix bug for not setting resources (#3005)
a22842e is described below

commit a22842ea34ceffaf53b67df8ce60486f9975fa71
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Sun Nov 18 18:44:15 2018 -0800

     Fix bug for not setting resources (#3005)
    
    * Fix bug for not setting resources
    
    * remove log
---
 .../functions/utils/FunctionConfigUtils.java       |  1 +
 .../functions/utils/FunctionConfigUtilsTest.java   | 61 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 5a8ac5b..7fcf5bb 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -300,6 +300,7 @@ public class FunctionConfigUtils {
             resources.setCpu(functionDetails.getResources().getCpu());
             resources.setRam(functionDetails.getResources().getRam());
             resources.setDisk(functionDetails.getResources().getDisk());
+            functionConfig.setResources(resources);
         }
 
         return functionConfig;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 951e6f7..6d71d45 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.WindowConfig;
@@ -90,4 +91,64 @@ public class FunctionConfigUtilsTest {
                 new Gson().toJson(convertedConfig)
         );
     }
+
+    @Test
+    public void testFunctionConfigConvertFromDetails() {
+        String name = "test1";
+        String namespace = "ns1";
+        String tenant = "tenant1";
+        String classname = getClass().getName();
+        int parallelism = 3;
+        Map<String, String> userConfig = new HashMap<>();
+        userConfig.put("key1", "val1");
+        Function.ProcessingGuarantees processingGuarantees = Function.ProcessingGuarantees.EFFECTIVELY_ONCE;
+        Function.FunctionDetails.Runtime runtime = Function.FunctionDetails.Runtime.JAVA;
+        Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder().setTopic("sinkTopic1").build();
+        Map<String, Function.ConsumerSpec> consumerSpecMap = new HashMap<>();
+        consumerSpecMap.put("sourceTopic1", Function.ConsumerSpec.newBuilder()
+                .setSchemaType(JSONSchema.class.getName()).build());
+        Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
+                .putAllInputSpecs(consumerSpecMap)
+                .setSubscriptionType(Function.SubscriptionType.FAILOVER).build();
+        boolean autoAck = true;
+        String logTopic = "log-topic1";
+        Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024
* 20).setRam(1024 * 10).build();
+        String packageUrl = "http://package.url";
+        Map<String, String> secretsMap = new HashMap<>();
+        secretsMap.put("secretConfigKey1", "secretConfigVal1");
+        Function.RetryDetails retryDetails = Function.RetryDetails.newBuilder().setDeadLetterTopic("dead-letter-1").build();
+
+        Function.FunctionDetails functionDetails = Function.FunctionDetails
+                .newBuilder()
+                .setNamespace(namespace)
+                .setTenant(tenant)
+                .setName(name)
+                .setClassName(classname)
+                .setParallelism(parallelism)
+                .setUserConfig(new Gson().toJson(userConfig))
+                .setProcessingGuarantees(processingGuarantees)
+                .setRuntime(runtime)
+                .setSink(sinkSpec)
+                .setSource(sourceSpec)
+                .setAutoAck(autoAck)
+                .setLogTopic(logTopic)
+                .setResources(resources)
+                .setPackageUrl(packageUrl)
+                .setSecretsMap(new Gson().toJson(secretsMap))
+                .setRetryDetails(retryDetails)
+                .build();
+
+        FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
+
+        assertEquals(functionConfig.getTenant(), tenant);
+        assertEquals(functionConfig.getNamespace(), namespace);
+        assertEquals(functionConfig.getName(), name);
+        assertEquals(functionConfig.getClassName(), classname);
+        assertEquals(functionConfig.getLogTopic(), logTopic);
+        assertEquals(functionConfig.getResources().getCpu(), resources.getCpu());
+        assertEquals(functionConfig.getResources().getDisk().longValue(), resources.getDisk());
+        assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam());
+        assertEquals(functionConfig.getOutput(), sinkSpec.getTopic());
+        assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
+    }
 }


Mime
View raw message