pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] Jennifer88huang commented on a change in pull request #4725: [docs] Add functions-develop file for Pulsar Functions(new)
Date Thu, 18 Jul 2019 04:21:37 GMT
Jennifer88huang commented on a change in pull request #4725: [docs] Add functions-develop file
for Pulsar Functions(new)
URL: https://github.com/apache/pulsar/pull/4725#discussion_r304729431
 
 

 ##########
 File path: site2/docs/functions-develop.md
 ##########
 @@ -0,0 +1,594 @@
+---
+id: functions-develop
+title: Develop Pulsar Functions
+sidebar_label: Develop functions
+---
+
+This tutorial walks you through how to develop Pulsar Functions.
+
+## Available APIs
+In Java and Python, you have two options to write Pulsar Functions. In Go, you can use Pulsar
Functions SDK for Go.
+
+Interface | Description | Use cases
+:---------|:------------|:---------
+Language-native interface | No Pulsar-specific libraries or special dependencies required
(only core libraries from Java/Python). | Functions that do not require access to the function
[context](#context).
+Pulsar Function SDK for Java/Python/Go | Pulsar-specific libraries that provide a range of
functionality not provided by "native" interfaces. | Functions that require access to the
function [context](#context).
+
+The language-native function, which adds an exclamation point to all incoming strings and
publishes the resulting string to a topic, has no external dependencies. The following example
is language-native function.
+
+<!--DOCUSAURUS_CODE_TABS-->
+<!--Java-->
+```Java
+public class JavaNativeExclamationFunction implements Function<String, String> {
+    @Override
+    public String apply(String input) {
+        return String.format("%s!", input);
+    }
+}
+```
+For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclamationFunction.java).
+
+<!--Python-->
+```python
+def process(input):
+    return "{}!".format(input)
+```
+For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/native_exclamation_function.py).
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+The following example uses Pulsar Functions SDK.
+<!--DOCUSAURUS_CODE_TABS-->
+<!--Java-->
+```Java
+public class ExclamationFunction implements Function<String, String> {
+    @Override
+    public String process(String input, Context context) {
+        return String.format("%s!", input);
+    }
+}
+```
+For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java).
+
+<!--Python-->
+```python
+from pulsar import Function
+
+class ExclamationFunction(Function):
+  def __init__(self):
+    pass
+
+  def process(self, input, context):
+    return input + '!'
+```
+For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/exclamation_function.py).
+
+<!--Go-->
+```Go
+package main
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func HandleRequest(ctx context.Context, in []byte) error{
+	fmt.Println(string(in) + "!")
+	return nil
+}
+
+func main() {
+	pf.Start(HandleRequest)
+}
+```
+For complete code, see [here](https://github.com/apache/pulsar/blob/master/pulsar-function-go/examples/inputFunc.go#L20-L36).
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+## Schema registry
+Pulsar has a built in [Schema Registry](concepts-schema-registry) and comes bundled with
a variety of popular schema types(avro, json and protobuf). Pulsar Functions can leverage
existing schema information from input topics and derive the input type. The schema registry
applies for output topic as well.
+
+## SerDe
+SerDe stands for **Ser**ialization and **De**serialization. Pulsar Functions uses SerDe when
publishing data to and consuming data from Pulsar topics. How SerDe works by default depends
on the language you use for a particular function.
+
+<!--DOCUSAURUS_CODE_TABS-->
+<!--Java-->
+When you write Pulsar Functions in Java, the following basic Java types are built in and
supported by default:
+
+* `String`
+* `Double`
+* `Integer`
+* `Float`
+* `Long`
+* `Short`
+* `Byte`
+
+To customize Java types, you need to implement the following interface.
+
+```java
+public interface SerDe<T> {
+    T deserialize(byte[] input);
+    byte[] serialize(T input);
+}
+```
+
+### Example
+Imagine that you're writing Pulsar Functions in Java that are processing tweet objects, you
can refer to the following example of `Tweet` class.
+
+```java
+public class Tweet {
+    private String username;
+    private String tweetContent;
+
+    public Tweet(String username, String tweetContent) {
+        this.username = username;
+        this.tweetContent = tweetContent;
+    }
+
+    // Standard setters and getters
+}
+```
+
+To pass `Tweet` objects directly between Pulsar Functions, you need to provide a custom SerDe
class. In the example below, `Tweet` objects are basically strings in which the username and
tweet content are separated by a `|`.
+
+```java
+package com.example.serde;
+
+import org.apache.pulsar.functions.api.SerDe;
+
+import java.util.regex.Pattern;
+
+public class TweetSerde implements SerDe<Tweet> {
+    public Tweet deserialize(byte[] input) {
+        String s = new String(input);
+        String[] fields = s.split(Pattern.quote("|"));
+        return new Tweet(fields[0], fields[1]);
+    }
+
+    public byte[] serialize(Tweet input) {
+        return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes();
+    }
+}
+```
+
+To apply this customized SerDe to a particular Pulsar Function, you need to:
+
+* Package the `Tweet` and `TweetSerde` classes into a JAR.
+* Specify a path to the JAR and SerDe class name when deploying the function.
+
+The following is an example of [`create`](reference-pulsar-admin.md#create-1) operation.
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar /path/to/your.jar \
+  --output-serde-classname com.example.serde.TweetSerde \
+  # Other function attributes
+```
+
+> #### Custom SerDe classes must be packaged with your function JARs
+> Pulsar does not store your custom SerDe classes separately from your Pulsar Functions.
So you need to include your SerDe classes in your function JARs. If not, Pulsar returns an
error.
+
+<!--Python-->
+In Python, the default SerDe is identity, meaning that the type is serialized as whatever
type the producer function returns.
+
+You can specify the SerDe when [creating](functions-deploying.md#cluster-mode) or [running](functions-deploying.md#local-run-mode)
functions. 
+
+```bash
+$ bin/pulsar-admin functions create \
+  --tenant public \
+  --namespace default \
+  --name my_function \
+  --py my_function.py \
+  --classname my_function.MyFunction \
+  --custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \
+  --output-serde-classname Serde3 \
+  --output output-topic-1
+```
+
+This case contains two input topics: `input-topic-1` and `input-topic-2`, each of which is
mapped to a different SerDe class (the map must be specified as a JSON string). The output
topic, `output-topic-1`, uses the `Serde3` class for SerDe. At the moment, all Pulsar Functions
logic, include processing function and SerDe classes, must be contained within a single Python
file.
+
+When using Pulsar Functions for Python, you have three SerDe options:
+
+1. You can use the [`IdentitySerde`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L70),
which leaves the data unchanged. The `IdentitySerDe` is the **default**. Creating or running
a function without explicitly specifying SerDe means that this option is used.
+2. You can use the [`PickleSerDe`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L62),
which uses Python [`pickle`](https://docs.python.org/3/library/pickle.html) for SerDe.
+3. You can create a custom SerDe class by implementing the baseline [`SerDe`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L50)
class, which has just two methods: [`serialize`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L53)
for converting the object into bytes, and [`deserialize`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L58)
for converting bytes into an object of the required application-specific type.
+
+The table below shows when you should use each SerDe.
+
+SerDe option | When to use
+:------------|:-----------
+`IdentitySerde` | When you work with simple types like strings, Booleans, integers.
+`PickleSerDe` | When you work with complex, application-specific types and are comfortable
with the "best effort" approach of `pickle`.
+Custom SerDe | When you require explicit control over SerDe, potentially for performance
or data compatibility purposes.
+
+### Example
+Imagine that you write Pulsar Functions in Python that are processing tweet objects, you
can refer to the following example of `Tweet` class.
+
+```python
+class Tweet(object):
+    def __init__(self, username, tweet_content):
+        self.username = username
+        self.tweet_content = tweet_content
+```
+
+In order to use this class in Pulsar Functions, you have two options:
+
+1. You can specify `PickleSerDe`, which applies the [`pickle`](https://docs.python.org/3/library/pickle.html)
library SerDe.
+2. You can create your own SerDe class. The following is an example.
+
+  ```python
+  from pulsar import SerDe
+
+  class TweetSerDe(SerDe):
+      def __init__(self, tweet):
+          self.tweet = tweet
+
+      def serialize(self, input):
+          return bytes("{0}|{1}".format(self.tweet.username, self.tweet.tweet_content))
+
+      def deserialize(self, input_bytes):
+          tweet_components = str(input_bytes).split('|')
+          return Tweet(tweet_components[0], tweet_componentsp[1])
+  ```
+<!--Go-->
 
 Review comment:
   For issue, see https://github.com/apache/pulsar/issues/4752

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message