pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2570: Fix BC issue in functions trigger function submitted by old CLI
Date Wed, 28 Nov 2018 20:36:09 GMT
sijie closed pull request #2570: Fix BC issue in functions trigger function submitted by old
CLI
URL: https://github.com/apache/pulsar/pull/2570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 9de92269e4..d297a7899c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -27,6 +28,8 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -111,6 +114,26 @@ public void initialize() {
         }
     }
 
+    static FunctionMetaData normalizeFunctionMetaData(FunctionMetaData fmd) {
+        if (!fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().isEmpty())
{
+            FunctionDetails.Builder fdb = FunctionDetails.newBuilder(fmd.getFunctionDetails());
+            for (Map.Entry<String, String> topicEntry : fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().entrySet())
{
+                fdb.getSourceBuilder().putInputSpecs(
+                    topicEntry.getKey(),
+                    ConsumerSpec.newBuilder()
+                        .setSerdeClassName(topicEntry.getValue())
+                        .setIsRegexPattern(topicEntry.getKey() == fmd.getFunctionDetails().getSource().getTopicsPattern())
+                        .build());
+            }
+            fdb.getSourceBuilder().clearTopicsToSerDeClassName();
+            return FunctionMetaData.newBuilder(fmd)
+                .setFunctionDetails(fdb)
+                .build();
+        } else {
+            return fmd;
+        }
+    }
+
     /**
      * Get the function metadata for a function
      * @param tenant the tenant the function belongs to
@@ -119,7 +142,7 @@ public void initialize() {
      * @return FunctionMetaData that contains the function metadata
      */
     public synchronized FunctionMetaData getFunctionMetaData(String tenant, String namespace,
String functionName) {
-        return this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
+        return normalizeFunctionMetaData(this.functionMetaDataMap.get(tenant).get(namespace).get(functionName));
     }
 
     /**
@@ -130,7 +153,7 @@ public synchronized FunctionMetaData getFunctionMetaData(String tenant,
String n
         List<FunctionMetaData> ret = new LinkedList<>();
         for (Map<String, Map<String, FunctionMetaData>> i : this.functionMetaDataMap.values())
{
             for (Map<String, FunctionMetaData> j : i.values()) {
-                ret.addAll(j.values());
+                ret.addAll(j.values().stream().map(FunctionMetaDataManager::normalizeFunctionMetaData).collect(Collectors.toList()));
             }
         }
         return ret;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index df82c0d25c..f15d4b2c73 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -574,7 +574,6 @@ public Response triggerFunction(final String tenant, final String namespace,
fin
             return getUnavailableResponse();
         }
 
-        FunctionDetails functionDetails;
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -591,6 +590,8 @@ public Response triggerFunction(final String tenant, final String namespace,
fin
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
 
+        // function metadata will be normalized by function metadata manager to ensure validation
logic
+        // only handle with latest format.
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant,
namespace,
                 functionName);
 
@@ -598,15 +599,17 @@ public Response triggerFunction(final String tenant, final String namespace,
fin
         if (topic != null) {
             inputTopicToWrite = topic;
         } else if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount()
== 1) {
+            // the function was submitted by a newer CLI which is using input specs
             inputTopicToWrite = functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
                     .keySet().iterator().next();
         } else {
             log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}",
tenant, namespace, functionName);
             return Response.status(Status.BAD_REQUEST).build();
         }
-        if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0
-                || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
-                        .containsKey(inputTopicToWrite)) {
+        boolean topicIdentified =
+            functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() > 0
+                && functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite);
+        if (!topicIdentified) {
             log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}",
tenant, namespace, functionName, inputTopicToWrite);
 
             return Response.status(Status.BAD_REQUEST).build();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a6a4..561ec5bf4e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -26,6 +26,10 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,6 +41,10 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Request;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -58,6 +66,39 @@ private static PulsarClient mockPulsarClient() throws PulsarClientException
{
         return client;
     }
 
+    @Test
+    public void testNormalizeFunctionMetadata() throws Exception {
+        FunctionMetaData fmd = FunctionMetaData.newBuilder()
+            .setFunctionDetails(FunctionDetails.newBuilder()
+                .setSource(SourceSpec.newBuilder()
+                    .setTopicsPattern("test-pattern")
+                    .putTopicsToSerDeClassName("test-pattern", "class-pattern")
+                    .putTopicsToSerDeClassName("test-topic-1", "class1")
+                    .putTopicsToSerDeClassName("test-topic-2", "class2")
+                    .build())
+                .build())
+            .build();
+
+        FunctionMetaData normalizedFmd = FunctionMetaDataManager.normalizeFunctionMetaData(fmd);
+        SourceSpec ss = normalizedFmd.getFunctionDetails().getSource();
+        assertEquals(0, ss.getTopicsToSerDeClassNameCount());
+        assertEquals(3, ss.getInputSpecsCount());
+        ConsumerSpec cs = ss.getInputSpecsOrThrow("test-pattern");
+        assertNotNull(cs);
+        assertEquals("class-pattern", cs.getSerdeClassName());
+        assertTrue(cs.getIsRegexPattern());
+
+        cs = ss.getInputSpecsOrThrow("test-topic-1");
+        assertNotNull(cs);
+        assertEquals("class1", cs.getSerdeClassName());
+        assertFalse(cs.getIsRegexPattern());
+
+        cs = ss.getInputSpecsOrThrow("test-topic-2");
+        assertNotNull(cs);
+        assertEquals("class2", cs.getSerdeClassName());
+        assertFalse(cs.getIsRegexPattern());
+    }
+
     @Test
     public void testListFunctions() throws PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
@@ -81,15 +122,15 @@ public void testListFunctions() throws PulsarClientException {
         functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);
         functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-2", functionMetaDataInfoMap2);
 
-        Assert.assertEquals(0, functionMetaDataManager.listFunctions(
+        assertEquals(0, functionMetaDataManager.listFunctions(
                 "tenant", "namespace").size());
-        Assert.assertEquals(2, functionMetaDataManager.listFunctions(
+        assertEquals(2, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").contains("func-1"));
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").contains("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.listFunctions(
+        assertEquals(1, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").contains("func-3"));
@@ -279,9 +320,9 @@ public void processUpdateTest() throws PulsarClientException {
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(1)).schedule();
-        Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // worker has record of function
@@ -308,7 +349,7 @@ public void processUpdateTest() throws PulsarClientException {
                 .build();
         functionMetaDataManager.processUpdate(serviceRequest);
 
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+        assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
                 "tenant-1", "namespace-1", "func-1"));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
@@ -324,15 +365,15 @@ public void processUpdateTest() throws PulsarClientException {
                 .setWorkerId("worker-2")
                 .build();
         functionMetaDataManager.processUpdate(serviceRequest);
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+        assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
                 "tenant-1", "namespace-1", "func-1"));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
 
-        Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // schedule
@@ -361,10 +402,10 @@ public void processUpdateTest() throws PulsarClientException {
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(1)).schedule();
 
-        Assert.assertEquals(m1.toBuilder().setVersion(version + 1).build(),
+        assertEquals(m1.toBuilder().setVersion(version + 1).build(),
                 functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
     }
 
@@ -394,9 +435,9 @@ public void processDeregister() throws PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
 
         verify(schedulerManager, times(0)).schedule();
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // function exists but request outdated
@@ -419,11 +460,11 @@ public void processDeregister() throws PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
         verify(schedulerManager, times(0)).schedule();
 
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // function deleted
@@ -451,9 +492,9 @@ public void processDeregister() throws PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
         verify(schedulerManager, times(1)).schedule();
 
-        Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
     }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message