pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #1696: Allow functions to be triggered without specifying topic name
Date Tue, 01 May 2018 01:56:27 GMT
sijie closed pull request #1696: Allow functions to be triggered without specifying topic name
URL: https://github.com/apache/incubator-pulsar/pull/1696
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 80e98af1a5..c22b6117b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -237,12 +237,12 @@ public Response getAssignments() {
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String triggerValue,
-                                    final @FormDataParam("dataStream") InputStream triggerStream)
{
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
 
         return functions.triggerFunction(
-                tenant, namespace, functionName, topic, triggerValue, triggerStream);
+                tenant, namespace, functionName, triggerValue, triggerStream, topic);
 
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 8f058c7530..9a774956b2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -157,7 +157,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    String triggerFunction(String tenant, String namespace, String function, String triggerValue,
String triggerFile) throws PulsarAdminException;
+    String triggerFunction(String tenant, String namespace, String function, String topic,
String triggerValue, String triggerFile) throws PulsarAdminException;
 
     /**
      * Upload Data.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 30e6bd9d52..9d5b8eec5f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -149,7 +149,7 @@ public void updateFunction(FunctionDetails functionDetails, String fileName)
thr
     }
 
     @Override
-    public String triggerFunction(String tenant, String namespace, String functionName, String
triggerValue, String triggerFile) throws PulsarAdminException {
+    public String triggerFunction(String tenant, String namespace, String functionName, String
topic, String triggerValue, String triggerFile) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
             if (triggerFile != null) {
@@ -160,9 +160,11 @@ public String triggerFunction(String tenant, String namespace, String
functionNa
             if (triggerValue != null) {
                 mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
             }
-            String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
+            if (topic != null && !topic.isEmpty()) {
+                mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
+            }
+            return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
                     .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
-            return response;
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e2f0a7172a..8cc1e2308f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -776,12 +776,14 @@ void runCmd() throws Exception {
         protected String triggerValue;
         @Parameter(names = "--triggerFile", description = "The path to the file that contains
the data with which you'd like to trigger the function")
         protected String triggerFile;
+        @Parameter(names = "--topic", description = "The specific topic name that the function
consumes from that you want to inject the data to")
+        protected String topic;
         @Override
         void runCmd() throws Exception {
             if (triggerFile == null && triggerValue == null) {
                 throw new RuntimeException("Either a trigger value or a trigger filepath
needs to be specified");
             }
-            String retval = admin.functions().triggerFunction(tenant, namespace, functionName,
triggerValue, triggerFile);
+            String retval = admin.functions().triggerFunction(tenant, namespace, functionName,
topic, triggerValue, triggerFile);
             System.out.println(retval);
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04b6c706f0..e95f620d9c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -448,14 +448,14 @@ public Response getAssignments() {
     }
 
     @POST
-    @Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger")
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("name") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
-                                    final @FormDataParam("dataStream") InputStream uploadedInputStream)
{
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                    final @FormDataParam("topic") String topic) {
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -480,9 +480,15 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant,
namespace, functionName);
 
         String inputTopicToWrite;
-        // only if the source is PulsarSource
-        if (functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName()))
{
-            inputTopicToWrite =  topic;
+        // only if the source is PulsarSource and if the function consumes only one topic
+        if (!functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName()))
{
+            return Response.status(Status.BAD_REQUEST).build();
+        }
+        if (topic != null) {
+            inputTopicToWrite = topic;
+        } else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size()
== 1) {
+            inputTopicToWrite =
+                    functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
         } else {
             return Response.status(Status.BAD_REQUEST).build();
         }
@@ -707,9 +713,6 @@ private void validateTriggerRequestParams(String tenant,
         if (functionName == null) {
             throw new IllegalArgumentException("Function Name is not provided");
         }
-        if (topic == null) {
-            throw new IllegalArgumentException("Topic Name is not provided");
-        }
         if (uploadedInputStream == null && input == null) {
             throw new IllegalArgumentException("Trigger Data is not provided");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 12068d2f5c..00c2a2836b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,10 +136,10 @@ public Response getAssignments() {
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("name") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
-                                    final @FormDataParam("dataStream") InputStream uploadedInputStream)
{
-        return functions.triggerFunction(tenant, namespace, functionName, topic, input, uploadedInputStream);
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream,
topic);
     }
 
     @POST


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message