pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2266: Integrate functions and io with schema registry
Date Fri, 17 Aug 2018 19:48:35 GMT
srkukarni closed pull request #2266: Integrate functions and io with schema registry
URL: https://github.com/apache/incubator-pulsar/pull/2266
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 1306f13335..d13cd5e51d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -57,7 +57,6 @@
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.Function;
@@ -69,6 +68,7 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
 import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -246,7 +246,7 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
 
     /**
      * Validates pulsar sink e2e functionality on functions.
-     * 
+     *
      * @throws Exception
      */
     @Test(timeOut = 20000)
@@ -409,7 +409,7 @@ protected static FunctionDetails createSinkConfig(String jarFile, String tenant,
         sourceSpecBuilder.setTypeClassName(typeArg.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
         sourceSpecBuilder.setSubscriptionName(subscriptionName);
-        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, DefaultSerDe.class.getName());
+        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
@@ -487,7 +487,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
         // source spec classname should be empty so that the default pulsar source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
         sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
-        sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName());
+        sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, TopicSchema.DEFAULT_SERDE);
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
@@ -507,7 +507,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
         assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName());
 
     }
-    
+
     @Test(timeOut = 20000)
     public void testFunctionRestartApi() throws Exception {
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 4525c51b81..6bce495ff2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -286,11 +286,11 @@
      *
      */
     Set<String> getSinks() throws PulsarAdminException;
-    
+
     /**
      * Get list of workers present under a cluster
      * @return
-     * @throws PulsarAdminException 
+     * @throws PulsarAdminException
      */
     List<WorkerInfo> getCluster() throws PulsarAdminException;
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index a3f528a8ae..4eca216088 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -25,6 +25,9 @@
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
+/**
+ * Schema definition for Strings encoded in UTF-8 format.
+ */
 public class StringSchema implements Schema<String> {
     private final Charset charset;
     private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 11a3c7a0b9..bea70c951e 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -40,7 +40,6 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -137,7 +136,6 @@ public void setup() throws Exception {
             .thenReturn(true);
         when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true);
         when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction());
-        when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
         PowerMockito.stub(PowerMockito.method(Utils.class, "fileExists")).toReturn(true);
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5f52313d57..81507cdc9e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -75,6 +75,7 @@
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -82,6 +83,7 @@
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
@@ -165,7 +167,7 @@ public void processArguments() {
 
         @Parameter(names = "--name", description = "The function's name")
         protected String functionName;
-        
+
         @Override
         void processArguments() throws Exception {
             super.processArguments();
@@ -227,14 +229,16 @@ void processArguments() throws Exception {
                 description = "Path to the main Python file for the function (if the function is written in Python)",
                 listConverter = StringConverter.class)
         protected String pyFile;
-        @Parameter(names = "--inputs", description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
+        @Parameter(names = { "-i",
+                "--inputs" }, description = "The function's input topic or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
         // for backwards compatibility purposes
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", hidden = true)
         protected String DEPRECATED_topicsPattern;
         @Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)")
         protected String topicsPattern;
-        @Parameter(names = "--output", description = "The function's output topic (use skipOutput flag to skip output topic)")
+
+        @Parameter(names = {"-o", "--output"}, description = "The function's output topic (use skipOutput flag to skip output topic)")
         protected String output;
         @Parameter(names = "--skip-output", description = "Skip publishing function output to output topic")
         protected boolean skipOutput;
@@ -243,16 +247,16 @@ void processArguments() throws Exception {
         protected String DEPRECATED_logTopic;
         @Parameter(names = "--log-topic", description = "The topic to which the function's logs are produced")
         protected String logTopic;
+
+        @Parameter(names = {"-st", "--schema-type"}, description = "The builtin schema type or custom schema class name to be used for messages output by the function")
+        protected String schemaTypeOrClassName = "";
+
         // for backwards compatibility purposes
         @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
         protected String DEPRECATED_customSerdeInputString;
-        @Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)")
-        protected String customSerdeInputString;
         // for backwards compatibility purposes
         @Parameter(names = "--outputSerdeClassName", description = "The SerDe class to be used for messages output by the function", hidden = true)
         protected String DEPRECATED_outputSerdeClassName;
-        @Parameter(names = "--output-serde-classname", description = "The SerDe class to be used for messages output by the function")
-        protected String outputSerdeClassName;
         // for backwards compatibility purposes
         @Parameter(names = "--functionConfigFile", description = "The path to a YAML config file specifying the function's configuration", hidden = true)
         protected String DEPRECATED_fnConfigFile;
@@ -316,8 +320,7 @@ private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
             if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern;
             if (!StringUtils.isBlank(DEPRECATED_logTopic)) logTopic = DEPRECATED_logTopic;
-            if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) outputSerdeClassName = DEPRECATED_outputSerdeClassName;
-            if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString;
+
             if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile;
             if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees;
             if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString;
@@ -357,16 +360,31 @@ void processArguments() throws Exception {
             }
 
             if (null != inputs) {
-                List<String> inputTopics = Arrays.asList(inputs.split(","));
-                functionConfig.setInputs(inputTopics);
+                Arrays.asList(inputs.split(",")).forEach(topic -> {
+                    functionConfig.getInputSpecs().put(topic, ConsumerConfig.builder()
+                            .schemaTypeOrClassName(schemaTypeOrClassName)
+                            .isRegexPattern(false)
+                            .build());
+                });
             }
-            if (null != customSerdeInputString) {
+            if (null != DEPRECATED_customSerdeInputString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
-                Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
-                functionConfig.setCustomSerdeInputs(customSerdeInputMap);
+                Map<String, String> customSerdeInputMap = new Gson().fromJson(DEPRECATED_customSerdeInputString, type);
+
+                customSerdeInputMap.forEach((topic, serde) -> {
+                    functionConfig.getInputSpecs().put(topic, ConsumerConfig.builder()
+                            .schemaTypeOrClassName(serde)
+                            .isRegexPattern(false)
+                            .build());
+                });
             }
             if (null != topicsPattern) {
-                functionConfig.setTopicsPattern(topicsPattern);
+                ConsumerConfig conf = functionConfig.getInputSpecs().get(topicsPattern);
+                String schema = (conf != null) ? conf.getSchemaTypeOrClassName() : "";
+                functionConfig.getInputSpecs().put(topicsPattern, ConsumerConfig.builder()
+                        .schemaTypeOrClassName(schema)
+                        .isRegexPattern(true)
+                        .build());
             }
             if (null != output) {
                 functionConfig.setOutput(output);
@@ -378,26 +396,24 @@ void processArguments() throws Exception {
             if (null != className) {
                 functionConfig.setClassName(className);
             }
-            if (null != outputSerdeClassName) {
-                functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+            if (null != DEPRECATED_outputSerdeClassName) {
+                functionConfig.setOutputSchemaOrClassName(DEPRECATED_outputSerdeClassName);
+            }
+
+            if (null != schemaTypeOrClassName) {
+                functionConfig.setOutputSchemaOrClassName(schemaTypeOrClassName);
             }
             if (null != processingGuarantees) {
                 functionConfig.setProcessingGuarantees(processingGuarantees);
             }
-            
+
             functionConfig.setRetainOrdering(retainOrdering);
-            
+
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
                 functionConfig.setUserConfig(userConfigMap);
             }
-            if (functionConfig.getInputs() == null) {
-                functionConfig.setInputs(new LinkedList<>());
-            }
-            if (functionConfig.getCustomSerdeInputs() == null) {
-                functionConfig.setCustomSerdeInputs(new HashMap<>());
-            }
             if (functionConfig.getUserConfig() == null) {
                 functionConfig.setUserConfig(new HashMap<>());
             }
@@ -466,7 +482,7 @@ void processArguments() throws Exception {
         }
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
-            
+
             if (isBlank(functionConfig.getOutput()) && !functionConfig.isSkipOutput()) {
                 throw new ParameterException(
                         "output topic is not present (pass skipOutput flag to skip publish output on topic)");
@@ -589,14 +605,11 @@ private void inferMissingNamespace(FunctionConfig functionConfig) {
         }
 
         private String getUniqueInput(FunctionConfig functionConfig) {
-            if (functionConfig.getInputs().size() + functionConfig.getCustomSerdeInputs().size() != 1) {
+            if (functionConfig.getInputSpecs().size() != 1) {
                 throw new IllegalArgumentException();
             }
-            if (functionConfig.getInputs().size() == 1) {
-                return functionConfig.getInputs().iterator().next();
-            } else {
-                return functionConfig.getCustomSerdeInputs().keySet().iterator().next();
-            }
+
+            return functionConfig.getInputSpecs().keySet().iterator().next();
         }
 
         protected FunctionDetails convert(FunctionConfig functionConfig)
@@ -621,13 +634,13 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
 
             // Setup source
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
-            topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
-            functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
-            sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
-            if (StringUtils.isNotBlank(functionConfig.getTopicsPattern())) {
-                sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
-            }
+            functionConfig.getInputSpecs().forEach((topic, conf) -> {
+                sourceSpecBuilder.putTopicsToSchema(topic,
+                        ConsumerSpec.newBuilder()
+                                .setSchemaTypeOrClassName(conf.getSchemaTypeOrClassName())
+                                .setIsRegexPattern(conf.isRegexPattern())
+                                .build());
+            });
 
             // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
             SubscriptionType subType = (functionConfig.isRetainOrdering()
@@ -635,7 +648,7 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
                             ? SubscriptionType.FAILOVER
                             : SubscriptionType.SHARED;
             sourceSpecBuilder.setSubscriptionType(subType);
-            
+
             if (typeArgs != null) {
                 sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
             }
@@ -649,8 +662,8 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
             if (functionConfig.getOutput() != null) {
                 sinkSpecBuilder.setTopic(functionConfig.getOutput());
             }
-            if (functionConfig.getOutputSerdeClassName() != null) {
-                sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
+            if (functionConfig.getOutputSchemaOrClassName() != null) {
+                sinkSpecBuilder.setSchemaTypeOrClassName(functionConfig.getOutputSchemaOrClassName());
             }
             if (typeArgs != null) {
                 sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
@@ -835,10 +848,10 @@ void runCmd() throws Exception {
 
     @Parameters(commandDescription = "Restart function instance")
     class RestartFunction extends FunctionCommand {
-        
+
         @Parameter(names = "--instance-id", description = "The function instanceId (restart all instances if instance-id is not provided")
         protected String instanceId;
-        
+
         @Override
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
@@ -853,7 +866,7 @@ void runCmd() throws Exception {
             System.out.println("Restarted successfully");
         }
     }
-    
+
     @Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster")
     class DeleteFunction extends FunctionCommand {
         @Override
@@ -1162,6 +1175,7 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
         if (serviceUrl == null) {
             serviceUrl = DEFAULT_SERVICE_URL;
         }
+
         try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
                 null)) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 8417874f38..adf6cfff36 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
+import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
+
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
@@ -39,8 +47,7 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -51,6 +58,7 @@
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -59,17 +67,12 @@
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.SinkConfig;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
-
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
 @Slf4j
@@ -204,14 +207,27 @@ void runCmd() throws Exception {
         @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider")
         protected String sinkType;
 
-        @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)")
+        @Parameter(names = { "-i",
+                "--inputs" }, description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
-        @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs  (supported for java fun only)")
+
+        @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs  (supported for java fun only)", hidden = true)
         protected String topicsPattern;
+
+        @Parameter(names = { "-st",
+                "--schema-type" }, description = "The builtin schema type (eg: 'avro', 'json', etc..) or the class name for a Schema or Serde implementation")
+        protected String schemaTypeOrClassName = "";
+
         @Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
         protected String subsName;
-        @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)")
+
+        @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
         protected String customSerdeInputString;
+
+        @Parameter(names = "--customSchemaInputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
+        protected String customSchemaInputString;
+
+
         @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
         @Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order")
@@ -258,34 +274,42 @@ void processArguments() throws Exception {
             if (null != className) {
                 sinkConfig.setClassName(className);
             }
-            
+
             if (null != name) {
                 sinkConfig.setName(name);
             }
             if (null != processingGuarantees) {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
-            
+
             sinkConfig.setRetainOrdering(retainOrdering);
 
-            Map<String, String> topicsToSerDeClassName = new HashMap<>();
+            Map<String, ConsumerConfig> topicsToSchema = new HashMap<>();
             if (null != inputs) {
-                parseInputs(inputs, topicsToSerDeClassName);
+                parseInputs(inputs, topicsToSchema);
             }
             if (null != customSerdeInputString) {
-                parseCustomSerdeInput(customSerdeInputString, topicsToSerDeClassName);
+                parseCustomSerdeInput(customSerdeInputString, topicsToSchema);
             }
 
-            if (!topicsToSerDeClassName.isEmpty()) {
-                sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+            if (null != customSchemaInputString) {
+                parseCustomSerdeInput(customSchemaInputString, topicsToSchema);
             }
-            
+
+            if (!topicsToSchema.isEmpty()) {
+                sinkConfig.getTopicsToSchema().putAll(topicsToSchema);
+            }
+
             if (isNotBlank(subsName)) {
                 sinkConfig.setSourceSubscriptionName(subsName);
             }
-            
+
             if (null != topicsPattern) {
-                sinkConfig.setTopicsPattern(topicsPattern);
+                sinkConfig.getTopicsToSchema().put(topicsPattern,
+                        ConsumerConfig.builder()
+                                .schemaTypeOrClassName(schemaTypeOrClassName)
+                                .isRegexPattern(true)
+                                .build());
             }
 
             if (parallelism != null) {
@@ -333,17 +357,32 @@ void processArguments() throws Exception {
             return new Gson().fromJson(str, type);
         }
 
-        protected void parseCustomSerdeInput(String str, Map<String, String> topicsToSerDeClassName) {
+        protected void parseCustomSerdeInput(String str, Map<String, ConsumerConfig> topicsSchema) {
             Type type = new TypeToken<Map<String, String>>(){}.getType();
             Map<String, String> customSerdeInputMap = new Gson().fromJson(str, type);
             customSerdeInputMap.forEach((topic, serde) -> {
-                topicsToSerDeClassName.put(topic, serde);
+                topicsSchema.put(topic, ConsumerConfig.builder()
+                        .schemaTypeOrClassName(serde)
+                        .isRegexPattern(false)
+                        .build());
             });
         }
 
-        protected void parseInputs(String str, Map<String, String> topicsToSerDeClassName) {
+        protected void parseInputs(String str, Map<String, ConsumerConfig> topicsSchema) {
             List<String> inputTopics = Arrays.asList(str.split(","));
-            inputTopics.forEach(s -> topicsToSerDeClassName.put(s, ""));
+            inputTopics.forEach(s -> topicsSchema.put(s,
+                    ConsumerConfig.builder()
+                            .schemaTypeOrClassName(schemaTypeOrClassName)
+                            .isRegexPattern(false)
+                            .build()));
+        }
+
+        protected void addTopicPattern(String topicPattern, Map<String, ConsumerConfig> topicsSchema) {
+            topicsSchema.put(topicPattern,
+                    ConsumerConfig.builder()
+                            .schemaTypeOrClassName(schemaTypeOrClassName)
+                            .isRegexPattern(true)
+                            .build());
         }
 
         protected void inferMissingArguments(SinkConfig sinkConfig) {
@@ -469,20 +508,24 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep
             // set source spec
             // source spec classname should be empty so that the default pulsar source will be used
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            if (sinkConfig.getTopicToSerdeClassName() != null) {
-                sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
+            sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+            if (sinkConfig.getTopicsToSchema() != null) {
+                sinkConfig.getTopicsToSchema().forEach((topic, spec) -> {
+                    sourceSpecBuilder.putTopicsToSchema(topic,
+                            ConsumerSpec.newBuilder()
+                                    .setSchemaTypeOrClassName(spec.getSchemaTypeOrClassName())
+                                    .setIsRegexPattern(spec.isRegexPattern())
+                                    .build());
+                });
             }
 
-            if (sinkConfig.getTopicsPattern() != null) {
-                sourceSpecBuilder.setTopicsPattern(sinkConfig.getTopicsPattern());
-            }
             if (typeArg != null) {
                 sourceSpecBuilder.setTypeClassName(typeArg);
             }
             if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
                 sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
             }
-            
+
             SubscriptionType subType = (sinkConfig.isRetainOrdering()
                     || ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
                             ? SubscriptionType.FAILOVER
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index d050ab515c..39c3260a26 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
+import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
+
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
@@ -36,6 +43,7 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
@@ -57,20 +65,6 @@
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
-
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
-
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
 @Slf4j
@@ -207,10 +201,18 @@ void runCmd() throws Exception {
 
         @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--destinationTopicName", description = "The Pulsar topic to which data is sent")
+
+        @Parameter(names = { "-o", "--destinationTopicName" }, description = "The Pulsar topic to which data is sent")
         protected String destinationTopicName;
-        @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source")
+
+        @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source", hidden = true)
         protected String deserializationClassName;
+
+        @Parameter(names = { "-st",
+                "--schema-type" }, description = "The schema type (either a builtin schema like 'avro', 'json', etc.."
+                        + " or custom Schema class name to be used to encode messages emitted from the source")
+        protected String schemaTypeOrClassName;
+
         @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)")
         protected Integer parallelism;
         @Parameter(names = { "-a", "--archive" },
@@ -257,8 +259,12 @@ void processArguments() throws Exception {
                 sourceConfig.setTopicName(destinationTopicName);
             }
             if (null != deserializationClassName) {
-                sourceConfig.setSerdeClassName(deserializationClassName);
+                sourceConfig.setSchemaTypeOrClassName(deserializationClassName);
             }
+            if (null != schemaTypeOrClassName) {
+                sourceConfig.setSchemaTypeOrClassName(schemaTypeOrClassName);
+            }
+
             if (null != processingGuarantees) {
                 sourceConfig.setProcessingGuarantees(processingGuarantees);
             }
@@ -452,9 +458,10 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws I
             // set up sink spec.
             // Sink spec classname should be empty so that the default pulsar sink will be used
             SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
-                sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
+            if (!StringUtils.isEmpty(sourceConfig.getSchemaTypeOrClassName())) {
+                sinkSpecBuilder.setSchemaTypeOrClassName(sourceConfig.getSchemaTypeOrClassName());
             }
+
             sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
 
             if (typeArg != null) {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 92219782c4..a3fdda027b 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -18,9 +18,26 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Collections;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -39,21 +56,6 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-
 @Slf4j
 @PrepareForTest({CmdFunctions.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*", "org.apache.pulsar.functions.api.*" })
@@ -121,12 +123,9 @@ public SinkConfig getSinkConfig() {
         sinkConfig.setNamespace(NAMESPACE);
         sinkConfig.setName(NAME);
 
-        Map<String, String> topicsToSerDeClassName = new HashMap<>();
-        createSink.parseInputs(INPUTS, topicsToSerDeClassName);
-        createSink.parseCustomSerdeInput(CUSTOM_SERDE_INPUT_STRING, topicsToSerDeClassName);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-
-        sinkConfig.setTopicsPattern(TOPIC_PATTERN);
+        createSink.parseInputs(INPUTS, sinkConfig.getTopicsToSchema());
+        createSink.parseCustomSerdeInput(CUSTOM_SERDE_INPUT_STRING, sinkConfig.getTopicsToSchema());
+        createSink.addTopicPattern(TOPIC_PATTERN, sinkConfig.getTopicsToSchema());
         sinkConfig.setProcessingGuarantees(PROCESSING_GUARANTEES);
         sinkConfig.setParallelism(PARALLELISM);
         sinkConfig.setArchive(JAR_FILE_PATH);
@@ -225,9 +224,8 @@ public void testMissingName() throws Exception {
     @Test
     public void testMissingInput() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        Map<String, String> topicsToSerDeClassName = new HashMap<>();
-        createSink.parseCustomSerdeInput(CUSTOM_SERDE_INPUT_STRING, topicsToSerDeClassName);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        sinkConfig.getTopicsToSchema().clear();
+        createSink.parseCustomSerdeInput(CUSTOM_SERDE_INPUT_STRING, sinkConfig.getTopicsToSchema());
         testCmdSinkCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -249,9 +247,8 @@ public void testMissingInput() throws Exception {
     @Test
     public void testMissingCustomSerdeInput() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        Map<String, String> topicsToSerDeClassName = new HashMap<>();
-        createSink.parseInputs(INPUTS, topicsToSerDeClassName);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        sinkConfig.getTopicsToSchema().clear();
+        createSink.parseInputs(INPUTS, sinkConfig.getTopicsToSchema());
         testCmdSinkCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -273,7 +270,8 @@ public void testMissingCustomSerdeInput() throws Exception {
     @Test
     public void testMissingTopicPattern() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setTopicsPattern(null);
+        sinkConfig.getTopicsToSchema().clear();
+        createSink.addTopicPattern(null, sinkConfig.getTopicsToSchema());
         testCmdSinkCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -295,8 +293,6 @@ public void testMissingTopicPattern() throws Exception {
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via inputs, customSerdeInputs, or topicPattern")
     public void testMissingAllInputTopics() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setTopicsPattern(null);
-        sinkConfig.setTopicToSerdeClassName(new HashMap<>());
         testCmdSinkCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -677,32 +673,25 @@ public void testCmdSinkConfigFileMissingName() throws Exception {
     @Test
     public void testCmdSinkConfigFileMissingTopicToSerdeClassName() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setTopicToSerdeClassName(null);
 
         SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setTopicToSerdeClassName(null);
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
     @Test
     public void testCmdSinkConfigFileMissingTopicsPattern() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setTopicsPattern(null);
 
         SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setTopicsPattern(null);
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via inputs, customSerdeInputs, or topicPattern")
     public void testCmdSinkConfigFileMissingAllInput() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setTopicsPattern(null);
-        testSinkConfig.setTopicToSerdeClassName(null);
+        testSinkConfig.getTopicsToSchema().clear();
 
         SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setTopicsPattern(null);
-        expectedSinkConfig.setTopicToSerdeClassName(null);
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
@@ -826,12 +815,10 @@ public void testCliOverwriteConfigFile() throws Exception {
         testSinkConfig.setNamespace(NAMESPACE + "-prime");
         testSinkConfig.setName(NAME + "-prime");
 
-        Map<String, String> topicsToSerDeClassName = new HashMap<>();
-        createSink.parseInputs(INPUTS + ",test-src-prime", topicsToSerDeClassName);
-        createSink.parseCustomSerdeInput("{\"test_src3-prime\": \"\"}", topicsToSerDeClassName);
-        testSinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        createSink.parseInputs(INPUTS + ",test-src-prime", testSinkConfig.getTopicsToSchema());
+        createSink.parseCustomSerdeInput("{\"test_src3-prime\": \"\"}", testSinkConfig.getTopicsToSchema());
+        createSink.addTopicPattern(TOPIC_PATTERN + "-prime", testSinkConfig.getTopicsToSchema());
 
-        testSinkConfig.setTopicsPattern(TOPIC_PATTERN + "-prime");
         testSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
         testSinkConfig.setParallelism(PARALLELISM + 1);
         testSinkConfig.setArchive(JAR_FILE_PATH + "-prime");
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index cd9605146a..5c049f7ba9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -18,13 +18,26 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+
+import java.io.File;
+import java.nio.file.Files;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Resources;
@@ -39,20 +52,6 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-
 @Slf4j
 @PrepareForTest({CmdFunctions.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
@@ -67,7 +66,7 @@ public IObjectFactory getObjectFactory() {
     private static final String NAMESPACE = "test-namespace";
     private static final String NAME = "test";
     private static final String TOPIC_NAME = "src_topic_1";
-    private static final String SERDE_CLASS_NAME = DefaultSerDe.class.getName();
+    private static final String SERDE_CLASS_NAME = "";
     private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES
             = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
     private static final Integer PARALLELISM = 1;
@@ -115,7 +114,7 @@ public SourceConfig getSourceConfig() {
         sourceConfig.setName(NAME);
 
         sourceConfig.setTopicName(TOPIC_NAME);
-        sourceConfig.setSerdeClassName(SERDE_CLASS_NAME);
+        sourceConfig.setSchemaTypeOrClassName(SERDE_CLASS_NAME);
         sourceConfig.setProcessingGuarantees(PROCESSING_GUARANTEES);
         sourceConfig.setParallelism(PARALLELISM);
         sourceConfig.setArchive(JAR_FILE_PATH);
@@ -223,7 +222,7 @@ public void testMissingTopicName() throws Exception {
     @Test
     public void testMissingSerdeClassName() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setSerdeClassName(null);
+        sourceConfig.setSchemaTypeOrClassName(null);
         testCmdSourceCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -574,10 +573,10 @@ public void testCmdSourceConfigFileMissingTopicName() throws Exception {
     @Test
     public void testCmdSourceConfigFileMissingSerdeClassname() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setSerdeClassName(null);
+        testSourceConfig.setSchemaTypeOrClassName(null);
 
         SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setSerdeClassName(null);
+        expectedSourceConfig.setSchemaTypeOrClassName(null);
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
@@ -701,7 +700,7 @@ public void testCliOverwriteConfigFile() throws Exception {
         testSourceConfig.setNamespace(NAMESPACE + "-prime");
         testSourceConfig.setName(NAME + "-prime");
         testSourceConfig.setTopicName(TOPIC_NAME + "-prime");
-        testSourceConfig.setSerdeClassName(SERDE_CLASS_NAME + "-prime");
+        testSourceConfig.setSchemaTypeOrClassName(SERDE_CLASS_NAME + "-prime");
         testSourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
         testSourceConfig.setParallelism(PARALLELISM + 1);
         testSourceConfig.setArchive(JAR_FILE_PATH + "-prime");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index bbfb3f3fb7..799bec8e4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -117,7 +118,7 @@
         this.namespaceName = conf.getTopicNames().stream().findFirst()
                 .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+        List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(this::subscribeAsync)
                 .collect(Collectors.toList());
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
@@ -127,7 +128,7 @@
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can start receiving messages now
-                    startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    startReceivingMessages(new ArrayList<>(consumers.values()));
                     subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
                         topic, subscription, allTopicPartitionsNumber.get());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 199665be2a..ff391be3ee 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -43,9 +45,10 @@
 
     private final PulsarClientImpl client;
     private ProducerConfigurationData conf;
-    private final Schema<T> schema;
+    private Schema<T> schema;
 
-    ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
+    @VisibleForTesting
+    public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
         this(client, new ProducerConfigurationData(), schema);
     }
 
@@ -55,6 +58,16 @@ private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData c
         this.schema = schema;
     }
 
+
+    /**
+     * Allow to override schema in builder implementation
+     * @return
+     */
+    public ProducerBuilder<T> schema(Schema<T> schema) {
+        this.schema = schema;
+        return this;
+    }
+
     @Override
     public ProducerBuilder<T> clone() {
         return new ProducerBuilderImpl<>(client, conf.clone(), schema);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index ca7b93f174..97214447d7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -39,8 +39,20 @@
     @EqualsAndHashCode.Exclude
     private String name;
 
+
+    /**
+     * The schema data in AVRO JSON format
+     */
     private byte[] schema;
+
+    /**
+     * The type of schema (AVRO, JSON, PROTOBUF, etc..)
+     */
     private SchemaType type;
+
+    /**
+     * Additional properties of the schema definition (implementation defined)
+     */
     private Map<String, String> properties = Collections.emptyMap();
 
     public SchemaInfo(String name, SchemaData data) {
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 98690674b2..fba82a7579 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -52,10 +52,10 @@
     String getOutputTopic();
 
     /**
-     * Get output Serde class
-     * @return output serde class
+     * Get output schema builtin type or custom class name
+     * @return output schema builtin type or custom class name
      */
-    String getOutputSerdeClassName();
+    String getOutputSchemaType();
 
     /**
      * The tenant this function belongs to
@@ -161,15 +161,19 @@
 
     /**
      * Publish an object using serDe for serializing to the topic
-     * @param topicName The name of the topic for publishing
-     * @param object The object that needs to be published
-     * @param serDeClassName The class name of the class that needs to be used to serialize the object before publishing
+     *
+     * @param topicName
+     *            The name of the topic for publishing
+     * @param object
+     *            The object that needs to be published
+     * @param topicsPattern
+     *            Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
      * @return A future that completes when the framework is done publishing the message
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object, String serDeClassName);
+    <O> CompletableFuture<Void> publish(String topicName, O object, String schemaType);
 
     /**
-     * Publish an object using DefaultSerDe for serializing to the topic
+     * Publish an object to the topic using default schemas
      * @param topicName The name of the topic for publishing
      * @param object The object that needs to be published
      * @return A future that completes when the framework is done publishing the message
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/DefaultSerDe.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/DefaultSerDe.java
deleted file mode 100644
index 1fc9a492e0..0000000000
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/DefaultSerDe.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.api.utils;
-
-import org.apache.pulsar.functions.api.SerDe;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Simplest form of SerDe.
- */
-public class DefaultSerDe implements SerDe<Object> {
-
-    private static final Set<Class> supportedInputTypes = new HashSet<>(Arrays.asList(
-            byte[].class,
-            Integer.class,
-            Double.class,
-            Long.class,
-            Boolean.class,
-            String.class,
-            Short.class,
-            Byte.class,
-            Float.class
-    ));
-    private final Class type;
-
-    public DefaultSerDe(Class type) {
-        this.type = type;
-    }
-
-    @Override
-    public Object deserialize(byte[] input) {
-        String data = new String(input, StandardCharsets.UTF_8);
-
-        if (type.equals(byte[].class)) {
-            return input;
-        } else if (type.equals(Integer.class)) {
-            return Integer.valueOf(data);
-        } else if (type.equals(Double.class)) {
-            return Double.valueOf(data);
-        } else if (type.equals(Long.class)) {
-            return Long.valueOf(data);
-        } else if (type.equals(Boolean.class)) {
-            return Boolean.valueOf(data);
-        } else if (type.equals(String.class)) {
-            return data;
-        } else if (type.equals(Short.class)) {
-            return Short.valueOf(data);
-        } else if (type.equals(Byte.class)) {
-            return Byte.decode(data);
-        } else if (type.equals(Float.class)) {
-            return Float.valueOf(data);
-        } else {
-            throw new RuntimeException("Unknown type " + type);
-        }
-    }
-
-    @Override
-    public byte[] serialize(Object input) {
-        if (type.equals(byte[].class)) {
-            return (byte[]) input;
-        } else if (type.equals(Integer.class)) {
-            return ((Integer) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Double.class)) {
-            return ((Double) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Long.class)) {
-            return ((Long) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Boolean.class)) {
-            return ((Boolean) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(String.class)) {
-            return ((String) input).getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Short.class)) {
-            return ((Short) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Byte.class)) {
-            return ((Byte) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else if (type.equals(Float.class)) {
-            return ((Float) input).toString().getBytes(StandardCharsets.UTF_8);
-        } else {
-            throw new RuntimeException("Unknown type " + type);
-        }
-    }
-
-    public static boolean IsSupportedType(Class typ) {
-        return supportedInputTypes.contains(typ);
-    }
-}
diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java
deleted file mode 100644
index 6acb6a1625..0000000000
--- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.api.utils;
-
-import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.SerDe;
-import org.testng.annotations.Test;
-
-
-import static org.testng.Assert.*;
-
-/**
- * Unit test of {@link DefaultSerDe}.
- */
-public class DefaultSerDeTest {
-    @Test
-    public void testStringSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(String.class);
-        DefaultSerDe deserializer = new DefaultSerDe(String.class);
-        String input = new String("input");
-        byte[] output = serializer.serialize(input);
-        String result = (String) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-
-    @Test
-    public void testLongSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(Long.class);
-        DefaultSerDe deserializer = new DefaultSerDe(Long.class);
-        Long input = new Long(648292);
-        byte[] output = serializer.serialize(input);
-        Long result = (Long) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-
-    @Test
-    public void testBooleanSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(Boolean.class);
-        DefaultSerDe deserializer = new DefaultSerDe(Boolean.class);
-        Boolean input = Boolean.TRUE;
-        byte[] output = serializer.serialize(input);
-        Boolean result = (Boolean) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-    
-    @Test
-    public void testDoubleSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(Double.class);
-        DefaultSerDe deserializer = new DefaultSerDe(Double.class);
-        Double input = new Double(648292.32432);
-        byte[] output = serializer.serialize(input);
-        Double result = (Double) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-
-    @Test
-    public void testFloatSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(Float.class);
-        DefaultSerDe deserializer = new DefaultSerDe(Float.class);
-        Float input = new Float(354353.54654);
-        byte[] output = serializer.serialize(input);
-        Float result = (Float) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-
-    @Test
-    public void testIntegerSerDe() {
-        DefaultSerDe serializer = new DefaultSerDe(Integer.class);
-        DefaultSerDe deserializer = new DefaultSerDe(Integer.class);
-        Integer input = new Integer(2542352);
-        byte[] output = serializer.serialize(input);
-        Integer result = (Integer) deserializer.deserialize(output);
-        assertEquals(result, input);
-    }
-
-    private class SimplePulsarFunction implements Function<String, String> {
-        @Override
-        public String process(String input, Context context) {
-            return null;
-        }
-    }
-
-    @Test
-    public void testPulsarFunction() {
-        SimplePulsarFunction pulsarFunction = new SimplePulsarFunction();
-        Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-        SerDe serDe = new DefaultSerDe(String.class);
-        Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-        assertTrue(inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0]));
-    }
-
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 8e8f9665f0..8131f97891 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -37,19 +37,19 @@
 import lombok.Getter;
 import lombok.Setter;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.client.api.MessageId;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -71,12 +71,14 @@
         private double sum;
         private double max;
         private double min;
+
         AccumulatedMetricDatum() {
             count = 0;
             sum = 0;
             max = Double.MIN_VALUE;
             min = Double.MAX_VALUE;
         }
+
         public void update(double value) {
             count++;
             sum += value;
@@ -92,41 +94,36 @@ public void update(double value) {
     private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics;
     private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
 
-    private Map<String, Producer> publishProducers;
-    private Map<String, SerDe> publishSerializers;
-    private ProducerConfiguration producerConfiguration;
-    private PulsarClient pulsarClient;
-    private final ClassLoader classLoader;
+    private Map<String, Producer<?>> publishProducers;
+    private ProducerBuilderImpl<?> producerBuilder;
 
     private final List<String> inputTopics;
 
+    private final TopicSchema topicSchema;
 
     @Getter
     @Setter
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
-    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
-                       ClassLoader classLoader, List<String> inputTopics) {
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics) {
         this.config = config;
         this.logger = logger;
-        this.pulsarClient = client;
-        this.classLoader = classLoader;
         this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
-        this.publishSerializers = new HashMap<>();
         this.inputTopics = inputTopics;
-        producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setBlockIfQueueFull(true);
-        producerConfiguration.setBatchingEnabled(true);
-        producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
-        producerConfiguration.setMaxPendingMessages(1000000);
+        this.topicSchema = new TopicSchema(client);
+
+        this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
             userConfigs = new HashMap<>();
         } else {
             userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
-                    new TypeToken<Map<String, Object>>(){}.getType());
+                    new TypeToken<Map<String, Object>>() {
+                    }.getType());
         }
     }
 
@@ -150,8 +147,13 @@ public String getOutputTopic() {
     }
 
     @Override
-    public String getOutputSerdeClassName() {
-        return config.getFunctionDetails().getSink().getSerDeClassName();
+    public String getOutputSchemaType() {
+        SinkSpec sink = config.getFunctionDetails().getSink();
+        if (!StringUtils.isEmpty(sink.getSchemaTypeOrClassName())) {
+            return sink.getSchemaTypeOrClassName();
+        } else {
+            return sink.getSerDeClassName();
+        }
     }
 
     @Override
@@ -205,7 +207,6 @@ public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
         return userConfigs;
     }
 
-
     private void ensureStateEnabled() {
         checkState(null != stateContext, "State is not enabled.");
     }
@@ -250,49 +251,41 @@ public ByteBuffer getState(String key) {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object) {
-        return publish(topicName, object, DefaultSerDe.class.getName());
+        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, String serDeClassName) {
-        if (!publishProducers.containsKey(topicName)) {
+        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, serDeClassName));
+    }
+
+    @SuppressWarnings("unchecked")
+    public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
+        Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
+
+        if (producer == null) {
             try {
-                publishProducers.put(topicName, pulsarClient.createProducer(topicName, producerConfiguration));
-            } catch (PulsarClientException ex) {
-                CompletableFuture<Void> retval = new CompletableFuture<>();
-                retval.completeExceptionally(ex);
-                return retval;
-            }
-        }
-        if (StringUtils.isEmpty(serDeClassName)) {
-            serDeClassName = DefaultSerDe.class.getName();
-        }
-        if (!publishSerializers.containsKey(serDeClassName)) {
-            SerDe serDe;
-            if (serDeClassName.equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(object.getClass())) {
-                    throw new RuntimeException("Default Serializer does not support " + object.getClass());
-                }
-                serDe = new DefaultSerDe(object.getClass());
-            } else {
-                try {
-                    Class<? extends SerDe> serDeClass = (Class<? extends SerDe>) Class.forName(serDeClassName);
-                    serDe = Reflections.createInstance(
-                            serDeClassName,
-                            serDeClass,
-                            classLoader);
-                } catch (ClassNotFoundException e) {
-                    throw new RuntimeException(e);
+                Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone()).schema(schema).create();
+
+                Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
+                if (existingProducer != null) {
+                    // The value in the map was not updated after the concurrent put
+                    newProducer.close();
+                    producer = existingProducer;
+                } else {
+                    producer = newProducer;
                 }
+
+            } catch (PulsarClientException e) {
+                return FutureUtil.failedFuture(e);
             }
-            publishSerializers.put(serDeClassName, serDe);
         }
 
-        byte[] bytes = publishSerializers.get(serDeClassName).serialize(object);
-        return publishProducers.get(topicName).sendAsync(bytes)
-                .thenApply(msgId -> null);
+        return producer.sendAsync(object).thenApply(msgId -> null);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 72336596ee..b4a9bf9cf3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -18,29 +18,48 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.experimental.UtilityClass;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.Reflections;
 
+import net.jodah.typetools.TypeResolver;
+
+@UtilityClass
 public class InstanceUtils {
-    public static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader,
-                                        Class<?> type) {
-        if (null == serdeClassName || serdeClassName.isEmpty()) {
-            return null;
-        } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
-            return initializeDefaultSerDe(type);
-        } else {
-            return Reflections.createInstance(
-                    serdeClassName,
-                    SerDe.class,
-                    clsLoader);
-        }
+    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg) {
+        SerDe<?> serDe = createInstance(serdeClassName, clsLoader, SerDe.class);
+
+        Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                "Inconsistent types found between function input type and input serde type: "
+                        + " function type = " + typeArg + " should be assignable from "
+                        + inputSerdeTypeArgs[0]);
+
+        return serDe;
     }
 
-    public static SerDe initializeDefaultSerDe(Class<?> type) {
-        if (!DefaultSerDe.IsSupportedType(type)) {
-            throw new RuntimeException("Default Serializer does not support " + type);
+    public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg) {
+        Schema<?> schema = createInstance(schemaClassName, clsLoader, Schema.class);
+
+        Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
+        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                "Inconsistent types found between function input type and input schema type: "
+                        + " function type = " + typeArg + " should be assignable from "
+                        + inputSerdeTypeArgs[0]);
+
+        return schema;
+    }
+
+    private static <T> T createInstance(String className, ClassLoader clsLoader, Class<T> baseClass) {
+        if (StringUtils.isEmpty(className)) {
+            return null;
+        } else {
+            return Reflections.createInstance(className, baseClass, clsLoader);
         }
-        return new DefaultSerDe(type);
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fe94b179f8..fc366c7c55 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -67,6 +67,7 @@
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -109,7 +110,7 @@
     // function stats
     private final FunctionStats stats;
 
-    private Record currentRecord;
+    private Record<?> currentRecord;
 
     private Source source;
     private Sink sink;
@@ -176,8 +177,7 @@ ContextImpl setupContext() {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client,
-                Thread.currentThread().getContextClassLoader(), inputTopics);
+        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics);
     }
 
     /**
@@ -510,8 +510,26 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
         // If source classname is not set, we default pulsar source
         if (sourceSpec.getClassName().isEmpty()) {
             PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
-            pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
-            pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern());
+            sourceSpec.getTopicsToSchemaMap().forEach((topic, conf) -> {
+                pulsarSourceConfig.getTopicSchema().put(topic,
+                        ConsumerConfig.builder()
+                            .schemaTypeOrClassName(conf.getSchemaTypeOrClassName())
+                            .isRegexPattern(conf.getIsRegexPattern())
+                            .build());
+            });
+
+            sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> {
+                pulsarSourceConfig.getTopicSchema().put(topic,
+                        ConsumerConfig.builder()
+                                .schemaTypeOrClassName(serde)
+                                .isRegexPattern(false)
+                                .build());
+            });
+
+            if (!StringUtils.isEmpty(sourceSpec.getTopicsPattern())) {
+                pulsarSourceConfig.getTopicSchema().get(sourceSpec.getTopicsPattern()).setRegexPattern(true);
+            }
+
             pulsarSourceConfig.setSubscriptionName(
                     StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName()
                             : FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
@@ -547,7 +565,7 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
         } else {
             throw new RuntimeException("Source does not implement correct interface");
         }
-        this.source = (Source) object;
+        this.source = (Source<?>) object;
 
         if (sourceSpec.getConfigs().isEmpty()) {
             this.source.open(new HashMap<>(), contextImpl);
@@ -563,19 +581,24 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {
         Object object;
         // If sink classname is not set, we default pulsar sink
         if (sinkSpec.getClassName().isEmpty()) {
-
             if (StringUtils.isEmpty(sinkSpec.getTopic())) {
                 object = PulsarSinkDisable.INSTANCE;
             } else {
                 PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
-                pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees
-                        .valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+                pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
+                        this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
                 pulsarSinkConfig.setTopic(sinkSpec.getTopic());
-                pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+
+                if (!StringUtils.isEmpty(sinkSpec.getSchemaTypeOrClassName())) {
+                    pulsarSinkConfig.setSchemaTypeOrClassName(sinkSpec.getSchemaTypeOrClassName());
+                } else if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
+                    pulsarSinkConfig.setSchemaTypeOrClassName(sinkSpec.getSerDeClassName());
+                }
+
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
+
                 object = new PulsarSink(this.client, pulsarSinkConfig);
             }
-
         } else {
             object = Reflections.createInstance(
                     sinkSpec.getClassName(),
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 7a561a17d6..fa47252d1d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -27,9 +27,10 @@
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-public abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> {
 
     protected final PulsarClient client;
     protected final String outputTopic;
@@ -41,9 +42,9 @@
         this.outputTopic = outputTopic;
     }
 
-    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
+    static <T> ProducerBuilder<T> newProducerBuilder(PulsarClient client, Schema<T> schema) {
         // use function result router to deal with different processing guarantees.
-        return client.newProducer() //
+        return client.newProducer(schema) //
                 .blockIfQueueFull(true) //
                 .enableBatching(true) //
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
@@ -53,23 +54,23 @@
                 .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer<byte[]> createProducer(String topic)
+    protected Producer<T> createProducer(String topic, Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic);
+        return createProducer(client, topic, schema);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String topic)
+    public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema)
             throws PulsarClientException {
-        return newProducerBuilder(client).topic(topic).create();
+        return newProducerBuilder(client, schema).topic(topic).create();
     }
 
-    protected Producer<byte[]> createProducer(String topic, String producerName)
+    protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic, producerName);
+        return createProducer(client, topic, schema, producerName);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String topic, String producerName)
+    public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema, String producerName)
             throws PulsarClientException {
-        return newProducerBuilder(client).topic(topic).producerName(producerName).create();
+        return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create();
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 12a639e574..f15df42e5e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -19,32 +19,37 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
-public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
+public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> {
 
     @Getter(AccessLevel.PACKAGE)
     // PartitionId -> producer
-    private final Map<String, Producer<byte[]>> producers;
+    private final Map<String, Producer<T>> producers;
+
+    private final Schema<T> schema;
 
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic)
+                                                String outputTopic, Schema<T> schema)
             throws PulsarClientException {
         super(client, outputTopic);
         this.producers = new ConcurrentHashMap<>();
+        this.schema = schema;
     }
 
     @Override
@@ -57,10 +62,10 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) {
     }
 
     @Override
-    public synchronized Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+    public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException {
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null == producer) {
-            producer = createProducer(outputTopic, srcPartitionId);
+            producer = createProducer(outputTopic, srcPartitionId, schema);
             producers.put(srcPartitionId, producer);
         }
         return producer;
@@ -68,7 +73,7 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) {
 
     @Override
     public synchronized void closeProducer(String srcPartitionId) {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null != producer) {
             producer.closeAsync();
             producers.remove(srcPartitionId);
@@ -78,7 +83,7 @@ public synchronized void closeProducer(String srcPartitionId) {
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size());
-        for (Producer<byte[]> producer: producers.values()) {
+        for (Producer<T> producer: producers.values()) {
             closeFutures.add(producer.closeAsync());
         }
         try {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 4d026ee4dd..7892876c1b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -24,7 +24,7 @@
 /**
  * An interface for managing publishers within a java instance.
  */
-public interface Producers extends AutoCloseable {
+public interface Producers<T> extends AutoCloseable {
 
     /**
      * Initialize all the producers.
@@ -40,7 +40,7 @@
      *          src partition Id
      * @return the producer instance to produce messages
      */
-    Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException;
+    Producer<T> getProducer(String srcPartitionId) throws PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcPartitionId</tt>.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4be73018d3..03776c3b36 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -30,59 +30,60 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
 public class PulsarSink<T> implements Sink<T> {
 
-    private PulsarClient client;
-    private PulsarSinkConfig pulsarSinkConfig;
-    private SerDe<T> outputSerDe;
+    private final PulsarClient client;
+    private final PulsarSinkConfig pulsarSinkConfig;
+
+    private PulsarSinkProcessor<T> pulsarSinkProcessor;
 
-    private PulsarSinkProcessor pulsarSinkProcessor;
+    private final TopicSchema topicSchema;
 
     private interface PulsarSinkProcessor<T> {
-        void initializeOutputProducer(String outputTopic) throws Exception;
+        void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception;
+
+        TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
 
-        void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                               Record<T> recordContext) throws Exception;
+        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception;
 
-        void close() throws Exception;
+        abstract void close() throws Exception;
     }
 
     private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> {
-        private Producer<byte[]> producer;
+        private Producer<T> producer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic) throws Exception {
+        public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), schema);
+        }
+
+        @Override
+        public TypedMessageBuilder<T> newMessage(Record<T> record) {
+            return producer.newMessage();
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      Record<T> recordContext) throws Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
-            this.producer.sendAsync(outputMsg);
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+            msg.sendAsync();
         }
 
         @Override
@@ -98,19 +99,22 @@ public void close() throws Exception {
     }
 
     private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> {
-        private Producer<byte[]> producer;
+        private Producer<T> producer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic) throws Exception {
+        public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), schema);
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      Record<T> recordContext) throws Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
-            this.producer.sendAsync(outputMsg).thenAccept(messageId -> recordContext.ack());
+        public TypedMessageBuilder<T> newMessage(Record<T> record) {
+            return producer.newMessage();
+        }
+
+        @Override
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+            msg.sendAsync().thenAccept(messageId -> record.ack());
         }
 
         @Override
@@ -128,29 +132,31 @@ public void close() throws Exception {
     private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<T>, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
-        protected Producers outputProducer;
+        protected Producers<T> outputProducer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic) throws Exception {
-            outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic);
+        public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception {
+            outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema);
             outputProducer.initialize();
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder, Record<T> recordContext)
+        public TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception {
+            // Route message to appropriate partition producer
+            return outputProducer.getProducer(record.getPartitionId().get()).newMessage();
+        }
+
+        @Override
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
-            if (recordContext.getRecordSequence().isPresent()) {
-                outputMsgBuilder.setSequenceId(recordContext.getRecordSequence().get());
+            if (record.getRecordSequence().isPresent()) {
+                msg.sequenceId(record.getRecordSequence().get());
             }
 
-            // currently on PulsarRecord
-            Producer producer = outputProducer.getProducer(recordContext.getPartitionId().get());
-
-            org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build();
-            producer.sendAsync(outputMsg)
-                    .thenAccept(messageId -> recordContext.ack())
+            msg.sendAsync()
+                    .thenAccept(messageId -> record.ack())
                     .join();
         }
 
@@ -191,13 +197,12 @@ public void becameInactive(Consumer<?> consumer, int partitionId) {
     public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
+        this.topicSchema = new TopicSchema(client);
     }
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-
-        // Setup Serialization/Deserialization
-        setupSerDe();
+        Schema<T> schema = initializeSchema();
 
         FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
         switch (processingGuarantees) {
@@ -211,41 +216,32 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
                 this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
                 break;
         }
-        this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
+        this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema);
     }
 
     @Override
     public void write(Record<T> record) throws Exception {
-
-        byte[] output;
-        try {
-            output = this.outputSerDe.serialize(record.getValue());
-        } catch (Exception e) {
-            //TODO Add serialization exception stats
-            throw new RuntimeException("Error occured when attempting to serialize output:", e);
-        }
-
-        MessageBuilder msgBuilder = MessageBuilder.create();
+        TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
         if (record.getKey().isPresent()) {
-            msgBuilder.setKey(record.getKey().get());
+            msg.key(record.getKey().get());
         }
 
-        msgBuilder.setContent(output);
+        msg.value(record.getValue());
 
         if (!record.getProperties().isEmpty()) {
-            msgBuilder.setProperties(record.getProperties());
+            msg.properties(record.getProperties());
         }
 
         SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
         if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
             PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
-            msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName().get()).setProperty(
-                    "__pfn_input_msg_id__",
-                    new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+            msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get())
+               .property("__pfn_input_msg_id__",
+                         new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
 
-        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, record);
+        pulsarSinkProcessor.sendOutputMessage(msg, record);
     }
 
     @Override
@@ -255,34 +251,23 @@ public void close() throws Exception {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @VisibleForTesting
-    void setupSerDe() throws ClassNotFoundException {
+    Schema<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            this.outputSerDe = InstanceUtils.initializeDefaultSerDe(byte[].class);
-            return;
+            return (Schema<T>) Schema.BYTES;
         }
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(),
                 Thread.currentThread().getContextClassLoader());
 
-        if (!Void.class.equals(typeArg)) { // return type is not `Void.class`
-            if (this.pulsarSinkConfig.getSerDeClassName() == null
-                    || this.pulsarSinkConfig.getSerDeClassName().isEmpty()
-                    || this.pulsarSinkConfig.getSerDeClassName().equals(DefaultSerDe.class.getName())) {
-                this.outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArg);
-            } else {
-                this.outputSerDe = InstanceUtils.initializeSerDe(this.pulsarSinkConfig.getSerDeClassName(),
-                        Thread.currentThread().getContextClassLoader(), typeArg);
-            }
-            Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
-            if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(typeArg)) {
-                    throw new RuntimeException("Default Serde does not support type " + typeArg);
-                }
-            } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArg)) {
-                throw new RuntimeException("Inconsistent types found between function output type and output serde type: "
-                        + " function type = " + typeArg + "should be assignable from " + outputSerdeTypeArgs[0]);
-            }
-        }
+        if (Void.class.equals(typeArg)) {
+            // return type is 'void', so there's no schema to check
+            return (Schema<T>) Schema.BYTES;
+         }
+
+
+        return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
+                pulsarSinkConfig.getSchemaTypeOrClassName());
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
index 6f0385aeeb..b6cff78001 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
@@ -29,6 +29,6 @@
 public class PulsarSinkConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private String topic;
-    private String serDeClassName;
+    private String schemaTypeOrClassName;
     private String typeClassName;
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index c662ad63c4..46c9213b83 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -40,9 +40,7 @@
     private final String topicName;
     private final int partition;
 
-    // TODO: When we switch to schema for functions, we should just rely on the message object value
-    private final T value;
-    private final Message<byte[]> message;
+    private final Message<T> message;
 
     private final Runnable failFunction;
     private final Runnable ackFunction;
@@ -71,6 +69,11 @@
         return Optional.of(Utils.getSequenceId(message.getMessageId()));
     }
 
+    @Override
+    public T getValue() {
+        return message.getValue();
+    }
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return message.getEncryptionCtx();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 1b3c177a56..10e4afd44a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,86 +18,91 @@
  */
 package org.apache.pulsar.functions.source;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
-public class PulsarSource<T> implements Source<T> {
+public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> {
 
-    private PulsarClient pulsarClient;
-    private PulsarSourceConfig pulsarSourceConfig;
-    private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
-    private boolean isTopicsPattern;
+    private final PulsarClient pulsarClient;
+    private final PulsarSourceConfig pulsarSourceConfig;
     private List<String> inputTopics;
-
-    @Getter
-    private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
+    private List<Consumer<T>> inputConsumers;
+    private final TopicSchema topicSchema;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) {
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarConfig;
+        this.topicSchema = new TopicSchema(pulsarClient);
     }
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        // Setup Serialization/Deserialization
-        setupSerDe();
-
-        // Setup pulsar consumer
-        ConsumerBuilder<byte[]> consumerBuilder = this.pulsarClient.newConsumer()
-                //consume message even if can't decrypt and deliver it along with encryption-ctx
-                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
-                .subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-                .subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
-
-        if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-            consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
-            isTopicsPattern = true;
-        }else {
-            consumerBuilder.topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
-        }
+        // Setup schemas
+        log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
+        Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
+
+        inputConsumers = configs.entrySet().stream().map(e -> {
+            String topic = e.getKey();
+            ConsumerConfig<T> conf = e.getValue();
+            log.info("Creating consumers for topic : {}",  topic);
+            ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
+                    // consume message even if can't decrypt and deliver it along with encryption-ctx
+                    .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                    .subscriptionName(pulsarSourceConfig.getSubscriptionName())
+                    .subscriptionType(pulsarSourceConfig.getSubscriptionType())
+                    .messageListener(this);
+
+            if (conf.isRegexPattern) {
+                cb.topicsPattern(topic);
+            } else {
+                cb.topic(topic);
+            }
 
-        if (pulsarSourceConfig.getTimeoutMs() != null) {
-            consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
-        }
-        this.inputConsumer = consumerBuilder.subscribe();
-        if (inputConsumer instanceof MultiTopicsConsumerImpl) {
-            inputTopics = ((MultiTopicsConsumerImpl<?>) inputConsumer).getTopics();
-        } else {
-            inputTopics = Collections.singletonList(inputConsumer.getTopic());
-        }
+            if (pulsarSourceConfig.getTimeoutMs() != null) {
+                cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
+            }
+
+            return cb.subscribeAsync();
+        }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
+
+        inputTopics = inputConsumers.stream().flatMap(c -> {
+            return (c instanceof MultiTopicsConsumerImpl) ? ((MultiTopicsConsumerImpl<?>) c).getTopics().stream()
+                    : Collections.singletonList(c.getTopic()).stream();
+        }).collect(Collectors.toList());
     }
 
     @Override
-    public Record<T> read() throws Exception {
-        org.apache.pulsar.client.api.Message<byte[]> message = this.inputConsumer.receive();
-
+    public void received(Consumer<T> consumer, Message<T> message) {
         String topicName;
 
         // If more than one topics are being read than the Message return by the consumer will be TopicMessageImpl
@@ -105,44 +110,18 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
         if (message instanceof TopicMessageImpl) {
             topicName = ((TopicMessageImpl<?>) message).getTopicName();
         } else {
-            topicName = this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
+            topicName = consumer.getTopic();
         }
 
-        Object object;
-        try {
-            SerDe deserializer = null;
-            if (this.topicToSerDeMap.containsKey(topicName)) {
-                deserializer = this.topicToSerDeMap.get(topicName);
-            } else if (isTopicsPattern) {
-                deserializer = this.topicToSerDeMap.get(this.pulsarSourceConfig.getTopicsPattern());
-            }
-            if (deserializer != null) {
-                object = deserializer.deserialize(message.getData());
-            } else {
-                throw new IllegalStateException("Topic deserializer not configured : " + topicName);
-            }
-        } catch (Exception e) {
-            // TODO Add deserialization exception stats
-            throw new RuntimeException("Error occured when attempting to deserialize input:", e);
-        }
-
-        T input;
-        try {
-            input = (T) object;
-        } catch (ClassCastException e) {
-            throw new RuntimeException("Error in casting input to expected type:", e);
-        }
-
-        return PulsarRecord.<T>builder()
-                .value(input)
+        Record<T> record = PulsarRecord.<T>builder()
                 .message(message)
                 .topicName(topicName)
                 .ackFunction(() -> {
                     if (pulsarSourceConfig
                             .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                        inputConsumer.acknowledgeCumulativeAsync(message);
+                        consumer.acknowledgeCumulativeAsync(message);
                     } else {
-                        inputConsumer.acknowledgeAsync(message);
+                        consumer.acknowledgeAsync(message);
                     }
                 }).failFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
@@ -150,52 +129,49 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
                     }
                 })
                 .build();
+
+        consume(record);
     }
 
     @Override
     public void close() throws Exception {
-        if (this.inputConsumer != null) {
-            this.inputConsumer.close();
-        }
+        inputConsumers.forEach(consumer -> {
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+            }
+        });
     }
 
+    @SuppressWarnings("unchecked")
     @VisibleForTesting
-    void setupSerDe() throws ClassNotFoundException {
+    Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
+        Map<String, ConsumerConfig<T>> configs = new TreeMap<>();
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
                 Thread.currentThread().getContextClassLoader());
 
-        if (Void.class.equals(typeArg)) {
-            throw new RuntimeException("Input type of Pulsar Function cannot be Void");
-        }
+        checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
 
-        for (Map.Entry<String, String> entry : this.pulsarSourceConfig.getTopicSerdeClassNameMap().entrySet()) {
-            String topic = entry.getKey();
-            String serDeClassname = entry.getValue();
-            if (serDeClassname == null || serDeClassname.isEmpty()) {
-                serDeClassname = DefaultSerDe.class.getName();
-            }
-            SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname,
-                    Thread.currentThread().getContextClassLoader(), typeArg);
-            this.topicToSerDeMap.put(topic, serDe);
-        }
+        // Check new config with schema types or classnames
+        pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
+            Schema<T> schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaTypeOrClassName());
+            configs.put(topic,
+                    ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
+        });
 
-        for (SerDe serDe : this.topicToSerDeMap.values()) {
-            if (serDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(typeArg)) {
-                    throw new RuntimeException("Default Serde does not support " + typeArg);
-                }
-            } else {
-                Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-                if (!typeArg.isAssignableFrom(inputSerdeTypeArgs[0])) {
-                    throw new RuntimeException("Inconsistent types found between function input type and input serde type: "
-                            + " function type = " + typeArg + " should be assignable from " + inputSerdeTypeArgs[0]);
-                }
-            }
-        }
+        return configs;
     }
 
     public List<String> getInputTopics() {
         return inputTopics;
     }
+
+    @Data
+    @Builder
+    private static class ConsumerConfig<T> {
+        private Schema<T> schema;
+        private boolean isRegexPattern;
+    }
+
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 8f815f1c5b..f1cb09b494 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -19,31 +19,32 @@
 package org.apache.pulsar.functions.source;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.FunctionConfig;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Data;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
-@Getter
-@Setter
-@ToString
+@Data
 public class PulsarSourceConfig {
 
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
-    private Map<String, String> topicSerdeClassNameMap;
-    private String topicsPattern;
+
+    private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();
+
     private String typeClassName;
     private Long timeoutMs;
 
     public static PulsarSourceConfig load(Map<String, Object> map) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarSourceConfig.class);
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
new file mode 100644
index 0000000000..8d4bf1ff39
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.functions.api.SerDe;
+
+@AllArgsConstructor
+@EqualsAndHashCode
+@ToString
+public class SerDeSchema<T> implements Schema<T> {
+
+    private final SerDe<T> serDe;
+
+    @Override
+    public byte[] encode(T value) {
+        return serDe.serialize(value);
+    }
+
+    @Override
+    public T decode(byte[] bytes) {
+        return serDe.deserialize(bytes);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        // Do not persist schema information
+        return null;
+    }
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
new file mode 100644
index 0000000000..05c57ab974
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+
+public class TopicSchema {
+
+    private final Map<String, Schema<?>> cachedSchemas = new HashMap<>();
+    private final PulsarClient client;
+
+    public TopicSchema(PulsarClient client) {
+        this.client = client;
+    }
+
+    /**
+     * If there is no other information available, use JSON as default schema type
+     */
+    private static final SchemaType DEFAULT_SCHEMA_TYPE = SchemaType.JSON;
+
+    public static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
+
+    public Schema<?> getSchema(String topic, Object object) {
+        return getSchema(topic, object.getClass());
+    }
+
+    public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName) {
+        return getSchema(topic, object.getClass(), schemaTypeOrClassName);
+    }
+
+    public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName) {
+        return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+    }
+
+    public Schema<?> getSchema(String topic, Class<?> clazz, Optional<SchemaType> schemaType) {
+        return cachedSchemas.computeIfAbsent(topic, key -> {
+            // If schema type was not provided, try to get it from schema registry, or fallback to default types
+            SchemaType type = schemaType.orElse(getSchemaTypeOrDefault(topic, clazz));
+            return newSchemaInstance(clazz, type);
+        });
+    }
+
+    public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType schemaType) {
+        return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, schemaType));
+    }
+
+    /**
+     * If the topic is already created, we should be able to fetch the schema type (avro, json, ...)
+     */
+    private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
+        Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join();
+        if (schema.isPresent()) {
+            return schema.get().getType();
+        } else {
+            return getDefaultSchemaType(clazz);
+        }
+    }
+
+    private static SchemaType getDefaultSchemaType(Class<?> clazz) {
+        if (byte[].class.equals(clazz)) {
+            return SchemaType.NONE;
+        } else if (String.class.equals(clazz)) {
+            // If type is String, then we use schema type string, otherwise we fallback on default schema
+            return SchemaType.STRING;
+        } else if (isProtobufClass(clazz)) {
+            return SchemaType.PROTOBUF;
+        } else {
+            return DEFAULT_SCHEMA_TYPE;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type) {
+        switch (type) {
+        case NONE:
+            return (Schema<T>) Schema.BYTES;
+
+        case STRING:
+            return (Schema<T>) Schema.STRING;
+
+        case AVRO:
+            return AvroSchema.of(clazz);
+
+        case JSON:
+            return JSONSchema.of(clazz);
+
+        case PROTOBUF:
+            return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());
+
+        default:
+            throw new RuntimeException("Unsupported schema type" + type);
+        }
+    }
+
+    private static boolean isProtobufClass(Class<?> pojoClazz) {
+        try {
+            Class<?> protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3");
+            return protobufBaseClass.isAssignableFrom(pojoClazz);
+        } catch (ClassNotFoundException e) {
+            // If function does not have protobuf in classpath then it cannot be protobuf
+            return false;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName) {
+        // The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de
+        // class name.
+
+        if (StringUtils.isEmpty(schemaTypeOrClassName) || DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
+            // No preferred schema was provided, auto-discover schema or fallback to defaults
+            return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic, clazz));
+        }
+
+        SchemaType schemaType = null;
+        try {
+            schemaType = SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            // schemaType is not referring to builtin type
+        }
+
+        if (schemaType != null) {
+            // The parameter passed was indeed a valid builtin schema type
+            return newSchemaInstance(clazz, schemaType);
+        }
+
+        // At this point, the string can represent either a schema or serde class name. Create an instance and
+        // check if it complies with either interface
+
+        // First try with Schema
+        try {
+            return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
+                    Thread.currentThread().getContextClassLoader(), clazz);
+        } catch (Throwable t) {
+            // Now try with Serde or just fail
+            SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName,
+                    Thread.currentThread().getContextClassLoader(), clazz);
+            return new SerDeSchema<>(serDe);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Schema<T> extractSchema(Class<T> clazz, SchemaType type) {
+        switch (type) {
+        case NONE:
+            return (Schema<T>) Schema.BYTES;
+
+        case STRING:
+            return (Schema<T>) Schema.STRING;
+
+        case AVRO:
+            return AvroSchema.of(clazz);
+
+        case JSON:
+            return JSONSchema.of(clazz);
+
+        case PROTOBUF:
+            return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());
+
+        default:
+            throw new RuntimeException("Unsupported schema type" + type);
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index eb3026da57..c6262d3c33 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -239,7 +239,7 @@ private void processWindow(Context context, List<I> tuples, List<I> newTuples, L
             throw new RuntimeException(e);
         }
         if (output != null) {
-            context.publish(context.getOutputTopic(), output, context.getOutputSerdeClassName());
+            context.publish(context.getOutputTopic(), output, context.getOutputSchemaType());
         }
     }
 
@@ -288,7 +288,7 @@ public O process(I input, Context context) throws Exception {
                 this.windowManager.add(input, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
-                    context.publish(this.windowConfig.getLateDataTopic(), input, context.getOutputSerdeClassName());
+                    context.publish(this.windowConfig.getLateDataTopic(), input);
                 } else {
                     log.info(String.format(
                             "Received a late tuple %s with ts %d. This will not be " + "processed"
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 710a127354..4c8393d042 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -24,11 +24,15 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.junit.Before;
@@ -42,8 +46,7 @@
 
     private InstanceConfig config;
     private Logger logger;
-    private PulsarClient client;
-    private ClassLoader classLoader;
+    private PulsarClientImpl client;
     private ContextImpl context;
 
     @Before
@@ -54,13 +57,13 @@ public void setup() {
             .build();
         config.setFunctionDetails(functionDetails);
         logger = mock(Logger.class);
-        client = mock(PulsarClient.class);
-        classLoader = getClass().getClassLoader();
+        client = mock(PulsarClientImpl.class);
+        when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
+
         context = new ContextImpl(
             config,
             logger,
             client,
-            classLoader,
             new ArrayList<>()
         );
     }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index ce67442b4c..e5af97d060 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pulsar.functions.instance.producers;
 
-import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -32,6 +30,7 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -43,6 +42,7 @@
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -55,7 +55,7 @@
 
     private PulsarClient mockClient;
     private final Map<String, Producer<byte[]>> mockProducers = new HashMap<>();
-    private MultiConsumersOneOuputTopicProducers producers;
+    private MultiConsumersOneOuputTopicProducers<byte[]> producers;
 
     private class MockProducerBuilder implements ProducerBuilder<byte[]> {
 
@@ -196,10 +196,10 @@
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.newProducer())
+        when(mockClient.newProducer(any(Schema.class)))
             .thenReturn(new MockProducerBuilder());
 
-        producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
+        producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES);
         producers.initialize();
     }
 
@@ -224,13 +224,13 @@ public void testGetCloseProducer() throws Exception {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index df4e83b28b..f44d300c29 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -18,20 +18,6 @@
  */
 package org.apache.pulsar.functions.sink;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
@@ -42,11 +28,27 @@
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.testng.annotations.Test;
+
 @Slf4j
 public class PulsarSinkTest {
 
     private static final String TOPIC = "persistent://sample/standalone/ns1/test_result";
-    private static final String serDeClassName = DefaultSerDe.class.getName();
 
     public static class TestSerDe implements SerDe<String> {
 
@@ -65,8 +67,8 @@ public String deserialize(byte[] input) {
      * Verify that JavaInstance does not support functions that take Void type as input
      */
 
-    private static PulsarClient getPulsarClient() throws PulsarClientException {
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+    private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
         doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
         doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
@@ -74,7 +76,8 @@ private static PulsarClient getPulsarClient() throws PulsarClientException {
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
         Consumer consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
+        doReturn(CompletableFuture.completedFuture(Optional.empty())).when(pulsarClient).getSchema(anyString());
         return pulsarClient;
     }
 
@@ -82,7 +85,7 @@ private static PulsarSinkConfig getPulsarConfigs() {
         PulsarSinkConfig pulsarConfig = new PulsarSinkConfig();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         pulsarConfig.setTopic(TOPIC);
-        pulsarConfig.setSerDeClassName(serDeClassName);
+        pulsarConfig.setSchemaTypeOrClassName(TopicSchema.DEFAULT_SERDE);
         pulsarConfig.setTypeClassName(String.class.getName());
         return pulsarConfig;
     }
@@ -117,7 +120,7 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
 
         try {
-            pulsarSink.setupSerDe();
+            pulsarSink.initializeSchema();
         } catch (Exception ex) {
             ex.printStackTrace();
             assertEquals(ex, null);
@@ -130,14 +133,14 @@ public void testInconsistentOutputType() throws IOException {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
-        pulsarConfig.setSerDeClassName(TestSerDe.class.getName());
+        pulsarConfig.setSchemaTypeOrClassName(TestSerDe.class.getName());
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
         try {
-            pulsarSink.setupSerDe();
+            pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function output type and output serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
@@ -153,11 +156,10 @@ public void testDefaultSerDe() throws PulsarClientException {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        pulsarConfig.setSerDeClassName(null);
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
 
         try {
-            pulsarSink.setupSerDe();
+            pulsarSink.initializeSchema();
         } catch (Exception ex) {
             ex.printStackTrace();
             fail();
@@ -172,11 +174,11 @@ public void testExplicitDefaultSerDe() throws PulsarClientException {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        pulsarConfig.setSerDeClassName(DefaultSerDe.class.getName());
+        pulsarConfig.setSchemaTypeOrClassName(TopicSchema.DEFAULT_SERDE);
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
 
         try {
-            pulsarSink.setupSerDe();
+            pulsarSink.initializeSchema();
         } catch (Exception ex) {
             ex.printStackTrace();
             fail();
@@ -188,11 +190,11 @@ public void testComplexOuputType() throws PulsarClientException {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
-        pulsarConfig.setSerDeClassName(ComplexSerDe.class.getName());
+        pulsarConfig.setSchemaTypeOrClassName(ComplexSerDe.class.getName());
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
 
         try {
-            pulsarSink.setupSerDe();
+            pulsarSink.initializeSchema();
         } catch (Exception ex) {
             ex.printStackTrace();
             fail();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index baad625f83..d74c689ded 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,23 +18,6 @@
  */
 package org.apache.pulsar.functions.source;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.io.core.SourceContext;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
@@ -46,13 +29,35 @@
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import javax.xml.validation.Schema;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.io.core.SourceContext;
+import org.testng.annotations.Test;
+
 @Slf4j
 public class PulsarSourceTest {
 
-    private static final String SUBSCRIPTION_NAME = "test/test-namespace/example";
-    private static Map<String, String> topicSerdeClassNameMap = new HashMap<>();
+    private static Map<String, ConsumerConfig> consumerConfigs = new HashMap<>();
     static {
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName());
+        consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder()
+                .schemaTypeOrClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
     }
 
     public static class TestSerDe implements SerDe<String> {
@@ -72,24 +77,27 @@ public String deserialize(byte[] input) {
      * Verify that JavaInstance does not support functions that take Void type as input
      */
 
-    private static PulsarClient getPulsarClient() throws PulsarClientException {
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+    private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
         doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
         doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
         doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
         doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
+        doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
         Consumer consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
+        doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+        doReturn(CompletableFuture.completedFuture(Optional.empty())).when(pulsarClient).getSchema(anyString());
         return pulsarClient;
     }
 
     private static PulsarSourceConfig getPulsarConfigs() {
         PulsarSourceConfig pulsarConfig = new PulsarSourceConfig();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        pulsarConfig.setTopicSchema(consumerConfigs);
         pulsarConfig.setTypeClassName(String.class.getName());
         return pulsarConfig;
     }
@@ -141,9 +149,10 @@ public void testInconsistentInputType() throws IOException {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
-        Map<String, String> topicSerdeClassNameMap = new HashMap<>();
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", TestSerDe.class.getName());
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        Map<String, ConsumerConfig> topicSerdeClassNameMap = new HashMap<>();
+        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
+                ConsumerConfig.builder().schemaTypeOrClassName(TestSerDe.class.getName()).build());
+        pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -166,8 +175,9 @@ public void testDefaultSerDe() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", null);
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+                ConsumerConfig.builder().schemaTypeOrClassName(TopicSchema.DEFAULT_SERDE).build());
+        pulsarConfig.setTopicSchema(consumerConfigs);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -181,26 +191,24 @@ public void testExplicitDefaultSerDe() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName());
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+                ConsumerConfig.builder().schemaTypeOrClassName(TopicSchema.DEFAULT_SERDE).build());
+        pulsarConfig.setTopicSchema(consumerConfigs);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
 
     @Test
-    public void testComplexOuputType() throws PulsarClientException {
+    public void testComplexOuputType() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",ComplexSerDe.class.getName());
-        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+                ConsumerConfig.builder().schemaTypeOrClassName(ComplexSerDe.class.getName()).build());
+        pulsarConfig.setTopicSchema(consumerConfigs);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
 
-        try {
-            pulsarSource.setupSerDe();
-        } catch (Exception ex) {
-            fail();
-        }
+        pulsarSource.setupConsumerConfigs();
     }
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 4d56239f1e..da066cc0af 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -22,7 +22,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -116,7 +116,6 @@ public void setUp() {
         windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
         Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
 
-        Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
     }
@@ -174,7 +173,6 @@ public void testPrepareLateTupleStreamWithoutTs() throws Exception {
         Mockito.doReturn("test-function").when(context).getFunctionName();
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
-        Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
         WindowConfig windowConfig = new WindowConfig();
@@ -215,7 +213,6 @@ public void testExecuteWithLateTupleStream() throws Exception {
         }
         System.out.println(testWindowedPulsarFunction.windows);
         long event = events.get(events.size() - 1);
-        Mockito.verify(context).publish("$late", event, DefaultSerDe.class.getName());
     }
 
     @Test
@@ -259,7 +256,6 @@ public void testSettingSlidingCountWindow() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
@@ -352,7 +348,6 @@ public void testSettingSlidingTimeWindow() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
@@ -420,7 +415,6 @@ public void testSettingTumblingCountWindow() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
@@ -474,7 +468,6 @@ public void testSettingTumblingTimeWindow() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
@@ -528,7 +521,6 @@ public void testSettingLagTime() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
@@ -580,7 +572,6 @@ public void testSettingWaterMarkInterval() throws Exception {
                 Mockito.doReturn("test-function").when(context).getFunctionName();
                 Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
                 Record<?> record = Mockito.mock(Record.class);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 02d461636c..45cbd72703 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -20,7 +20,6 @@
 
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 
 /**
  * Example function that uses the built in publish function in the context
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
index 1d31c476ee..5bf04be99e 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
+import java.util.Optional;
+
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-
-import java.util.Optional;
 
 /**
  * An example demonstrate publishing messages through Context
@@ -33,7 +32,7 @@
     public Void process(String input, Context context) {
         Optional<Object> topicToWrite = context.getUserConfigValue("topic");
         if (topicToWrite.get() != null) {
-            context.publish((String)topicToWrite.get(), input, DefaultSerDe.class.getName());
+            context.publish((String) topicToWrite.get(), input);
         }
         return null;
     }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 77a9c2c0b6..46661af26f 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -61,6 +61,11 @@ message FunctionDetails {
     string packageUrl = 14; //present only if function submitted with package-url
 }
 
+message ConsumerSpec {
+	string schemaTypeOrClassName   = 1;
+	bool isRegexPattern = 2;
+}
+
 message SourceSpec {
     string className = 1;
     // map in json format
@@ -69,7 +74,15 @@ message SourceSpec {
 
     // configs used only when source feeds into functions
     SubscriptionType subscriptionType = 3;
-    map<string,string> topicsToSerDeClassName = 4;
+
+	// @deprecated -- use topicsToSchema
+    map<string,string> topicsToSerDeClassName = 4 [deprecated = true];
+
+	/**
+	 *
+	 */
+    map<string, ConsumerSpec> topicsToSchema = 10;
+
     uint64 timeoutMs = 6;
     string topicsPattern = 7;
 
@@ -87,11 +100,18 @@ message SinkSpec {
 
     // configs used only when functions output to sink
     string topic = 3;
+
+    // @deprecated -- use schemaType
     string serDeClassName = 4;
 
     /* If specified, this will refer to an archive that is
      * already present in the server */
     string builtin = 6;
+
+	/**
+	 * Builtin schema type or custom schema class name
+	 */
+    string schemaTypeOrClassName = 7;
 }
 
 message PackageLocationMetaData {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 966740da98..c573cf335b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,7 +22,9 @@
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
@@ -31,16 +33,21 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.utils.ConsumerConfig;
+import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
 
+import java.lang.reflect.Type;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
@@ -83,25 +90,25 @@
 
     @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
     protected String pulsarServiceUrl;
-    
+
     @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
     protected String clientAuthenticationPlugin;
-    
+
     @Parameter(names = "--client_auth_params", description = "Client auth param\n")
     protected String clientAuthenticationParameters;
-    
+
     @Parameter(names = "--use_tls", description = "Use tls connection\n")
     protected String useTls = Boolean.FALSE.toString();
-    
+
     @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
     protected String tlsAllowInsecureConnection = Boolean.TRUE.toString();
-    
+
     @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
     protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
-    
+
     @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
     protected String tlsTrustCertFilePath;
-    
+
     @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false)
     protected String stateStorageServiceUrl;
 
@@ -129,11 +136,8 @@
     @Parameter(names = "--source_subscription_type", description = "The source subscription type", required = true)
     protected String sourceSubscriptionType;
 
-    @Parameter(names = "--source_topics_serde_classname", description = "A map of topics to SerDe for the source")
-    protected String sourceTopicsSerdeClassName;
-    
-    @Parameter(names = "--topics_pattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs")
-    protected String topicsPattern;
+    @Parameter(names = "--source_topics_schema", description = "A map of topics to Schema for the source")
+    protected String sourceTopicsSchemaString;
 
     @Parameter(names = "--source_timeout_ms", description = "Source message timeout in milliseconds")
     protected Long sourceTimeoutMs;
@@ -150,6 +154,9 @@
     @Parameter(names = "--sink_topic", description = "The sink Topic Name\n")
     protected String sinkTopic;
 
+    @Parameter(names = "--sink_topic_schema_type", description = "The sink Topic schema\n")
+    protected String sinkSchemaTypeOrClassName;
+
     @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
     protected String sinkSerdeClassName;
 
@@ -192,10 +199,12 @@ public void start() throws Exception {
             sourceDetailsBuilder.setConfigs(sourceConfigs);
         }
         sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType));
-        sourceDetailsBuilder.putAllTopicsToSerDeClassName(new Gson().fromJson(sourceTopicsSerdeClassName, Map.class));
-        if (isNotBlank(topicsPattern)) {
-            sourceDetailsBuilder.setTopicsPattern(topicsPattern);
-        }
+
+        Type type = new TypeToken<Map<String, ConsumerSpec>>(){}.getType();
+
+        Map<String, ConsumerSpec> topicsSchema = new Gson().fromJson(sourceTopicsSchemaString, type);
+
+        sourceDetailsBuilder.putAllTopicsToSchema(topicsSchema);
         sourceDetailsBuilder.setTypeClassName(sourceTypeClassName);
         if (sourceTimeoutMs != null) {
             sourceDetailsBuilder.setTimeoutMs(sourceTimeoutMs);
@@ -217,6 +226,11 @@ public void start() throws Exception {
         if (sinkTopic != null && !sinkTopic.isEmpty()) {
             sinkSpecBuilder.setTopic(sinkTopic);
         }
+
+        if (!StringUtils.isEmpty(sinkSchemaTypeOrClassName)) {
+            sinkSpecBuilder.setSchemaTypeOrClassName(sinkSchemaTypeOrClassName);
+        }
+
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
         FunctionDetails functionDetails = functionDetailsBuilder.build();
@@ -378,7 +392,7 @@ public void resetMetrics(com.google.protobuf.Empty request,
                 }
             }
         }
-        
+
         @Override
         public void healthCheck(com.google.protobuf.Empty request,
                                 io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index f3ed170600..7b63bb7738 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -46,6 +46,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A function container implemented using java thread.
@@ -201,11 +202,31 @@
         }
         args.add("--source_subscription_type");
         args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString());
-        args.add("--source_topics_serde_classname");
-        args.add(new Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
-        if (isNotBlank(instanceConfig.getFunctionDetails().getSource().getTopicsPattern())) {
-            args.add("--topics_pattern");
-            args.add(instanceConfig.getFunctionDetails().getSource().getTopicsPattern());
+
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            args.add("--source_topics_schema");
+            args.add(new Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSchemaMap()));
+        } else {
+            // Python instance is still using the previous serde map
+            args.add("--source_topics_serde_classname");
+
+            // Transform schema map into serde map
+            Map<String, String> serdeMap = new TreeMap<>();
+            AtomicReference<String> topicPattern = new AtomicReference<>();
+            instanceConfig.getFunctionDetails().getSource().getTopicsToSchemaMap().forEach((topic, conf) -> {
+                serdeMap.put(topic, conf.getSchemaTypeOrClassName());
+
+                if (conf.getIsRegexPattern()) {
+                    topicPattern.set(topic);
+                }
+             });
+            args.add(new Gson().toJson(serdeMap));
+
+            // Check if there was topic pattern defined in the schema map
+            if (topicPattern.get() != null) {
+                args.add("--topics_pattern");
+                args.add(topicPattern.get());
+            }
         }
 
         // sink related configs
@@ -229,6 +250,12 @@
                 && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
             args.add("--sink_topic");
             args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
+
+            if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA
+                    && !StringUtils.isEmpty(instanceConfig.getFunctionDetails().getSink().getSchemaTypeOrClassName())) {
+                args.add("--sink_topic_schema_type");
+                args.add(instanceConfig.getFunctionDetails().getSink().getSchemaTypeOrClassName());
+            }
         }
         if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null
                 && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
@@ -390,7 +417,7 @@ public void onSuccess(InstanceCommunication.MetricsData t) {
         });
         return retval;
     }
-    
+
     public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() {
         CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>();
         if (stub == null) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index d9cef64829..968665f8ab 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,38 +19,35 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import static org.testng.Assert.assertEquals;
+
 import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.Test;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
 
 /**
  * Unit test of {@link ThreadRuntime}.
  */
-@Slf4j
 public class ProcessRuntimeTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProcessRuntimeTest.class);
-
     private static final String TEST_TENANT = "test-function-tenant";
     private static final String TEST_NAMESPACE = "test-function-namespace";
     private static final String TEST_NAME = "test-function-container";
     private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
+    private static final Map<String, ConsumerSpec> topicsToSchema = new HashMap<>();
     static {
-        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DefaultSerDe.class.getName());
+        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", "");
+        topicsToSchema.put("persistent://sample/standalone/ns1/test_src",
+                ConsumerSpec.newBuilder().setSchemaTypeOrClassName("").setIsRegexPattern(false).build());
     }
 
     private final ProcessRuntimeFactory factory;
@@ -93,8 +90,7 @@ FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
         functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
                 .setSubscriptionType(Function.SubscriptionType.FAILOVER)
-                .putAllTopicsToSerDeClassName(topicsToSerDeClassName)
-                .setTopicsPattern("persistent://tenant/ns/.*")
+                .putAllTopicsToSchema(topicsToSchema)
                 .setClassName("org.pulsar.pulsar.TestSource")
                 .setTypeClassName(String.class.getName()));
         return functionDetailsBuilder.build();
@@ -118,7 +114,7 @@ public void testJavaConstructor() {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 56);
+        assertEquals(args.size(), 54);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -138,14 +134,13 @@ public void testJavaConstructor() {
                 + " --source_classname " + config.getFunctionDetails().getSource().getClassName()
                 + " --source_type_classname \"" + config.getFunctionDetails().getSource().getTypeClassName() + "\""
                 + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName)
-                + " --topics_pattern " + config.getFunctionDetails().getSource().getTopicsPattern()
+                + " --source_topics_schema " + new Gson().toJson(topicsToSchema)
                 + " --sink_classname " + config.getFunctionDetails().getSink().getClassName()
                 + " --sink_type_classname \"" + config.getFunctionDetails().getSink().getTypeClassName() + "\""
                 + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
                 + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --state_storage_serviceurl " + stateStorageServiceUrl;
-        assertEquals(expectedArgs, String.join(" ", args));
+        assertEquals(String.join(" ", args), expectedArgs);
     }
 
     @Test
@@ -154,7 +149,7 @@ public void testPythonConstructor() {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 44);
+        assertEquals(args.size(), 42);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -170,10 +165,9 @@ public void testPythonConstructor() {
                 + " --max_buffered_tuples 1024 --port " + args.get(33)
                 + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name()
                 + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName)
-                + " --topics_pattern " + config.getFunctionDetails().getSource().getTopicsPattern()
                 + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
                 + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName();
-        assertEquals(expectedArgs, String.join(" ", args));
+        assertEquals(String.join(" ", args), expectedArgs);
     }
 
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ConsumerConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ConsumerConfig.java
new file mode 100644
index 0000000000..aaa9bec660
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ConsumerConfig.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConsumerConfig {
+    private String schemaTypeOrClassName;
+    private boolean isRegexPattern;
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index abeaea84e3..810c1dbe41 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -19,28 +19,23 @@
 package org.apache.pulsar.functions.utils;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Map;
+import java.util.TreeMap;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.utils.validation.ConfigValidation;
+
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
-
-import java.util.Collection;
-import java.util.Map;
 
 @Getter
 @Setter
@@ -56,7 +51,7 @@
         ATMOST_ONCE,
         EFFECTIVELY_ONCE
     }
-    
+
     public enum Runtime {
         JAVA,
         PYTHON
@@ -71,19 +66,19 @@
     private String name;
     @NotNull
     private String className;
-    @isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
-    private Collection<String> inputs;
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA)
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON)
-    private Map<String, String> customSerdeInputs;
-    @isValidTopicName
-    private String topicsPattern;
+
+    private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+
     @isValidTopicName
     private String output;
+
+    /**
+     * Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema or SerDe
+     * implementation
+     */
+    private String outputSchemaOrClassName;
+
     private boolean skipOutput;
-    @isImplementationOfClass(implementsClass = SerDe.class)
-    private String outputSerdeClassName;
     @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 7e97135131..5815c30682 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -18,21 +18,20 @@
  */
 package org.apache.pulsar.functions.utils;
 
+import java.util.Map;
+import java.util.TreeMap;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidSinkConfig;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
-import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
-
-import java.util.Map;
 
 @Getter
 @Setter
@@ -41,6 +40,8 @@
 @ToString
 @isValidSinkConfig
 public class SinkConfig {
+
+
     @NotNull
     private String tenant;
     @NotNull
@@ -50,11 +51,8 @@
     private String className;
     private String sourceSubscriptionName;
 
-    @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
-            valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
-    private Map<String, String> topicToSerdeClassName;
-    @isValidTopicName
-    private String topicsPattern;
+    private final Map<String, ConsumerConfig> topicsToSchema = new TreeMap<>();
+
     private Map<String, Object> configs;
     @isPositiveNumber
     private int parallelism = 1;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index 5c53a4fbfd..483f2f648c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -53,8 +53,9 @@
     @NotNull
     @isValidTopicName
     private String topicName;
-    @isImplementationOfClass(implementsClass = SerDe.class)
-    private String serdeClassName;
+
+    private String schemaTypeOrClassName;
+
     private Map<String, Object> configs;
     @isPositiveNumber
     private int parallelism = 1;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 925f7bc4f6..3160b14028 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -18,14 +18,28 @@
  */
 package org.apache.pulsar.functions.utils.validation;
 
+import static org.apache.pulsar.functions.utils.Utils.fileExists;
+import static org.apache.pulsar.functions.utils.Utils.getSinkType;
+import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
-import org.apache.commons.lang.StringUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Resources;
@@ -35,22 +49,13 @@
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.apache.pulsar.functions.utils.Utils.fileExists;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class ValidatorImpls {
+
+    private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
+
     /**
      * Validates a positive number.
      */
@@ -104,7 +109,6 @@ public void validateField(String name, Object o) {
         }
     }
 
-    @NoArgsConstructor
     public static class ResourcesValidator extends Validator {
         @Override
         public void validateField(String name, Object o) {
@@ -268,6 +272,21 @@ public void validateField(String name, Object o) {
         }
     }
 
+    @NoArgsConstructor
+    public static class SchemaOrSerdeValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            try {
+                new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o);
+            } catch (RuntimeException e) {
+                // If it's not a schema, try with SerDe
+                new ValidatorImpls.ImplementsClassValidator(Schema.class).validateField(name, o);
+            }
+        }
+    }
+
+
     /**
      * validates each key and each value against the respective arrays of validators.
      */
@@ -359,92 +378,44 @@ private static void doJavaChecks(FunctionConfig functionConfig, String name) {
 
             ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
             // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
-            // implements SerDe class
-            functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
+            // implements Schema or SerDe classes
 
-                Class<?> serdeClass;
-                try {
-                    serdeClass = loadClass(inputSerializer);
-                } catch (ClassNotFoundException e) {
-                    throw new IllegalArgumentException(
-                            String.format("The input serialization/deserialization class %s does not exist",
-                                    inputSerializer));
+            functionConfig.getInputSpecs().forEach((topicName, conf) -> {
+                if (StringUtils.isEmpty(conf.getSchemaTypeOrClassName())
+                        || conf.getSchemaTypeOrClassName().equals(DEFAULT_SERDE)) {
+                    // If it's empty, we use the default schema and no need to validate
+                    return;
+                }
+
+                if (getBuiltinSchemaType(conf.getSchemaTypeOrClassName()) != null) {
+                    // If it's built-in, no need to validate
+                    return;
                 }
 
                 try {
-                    new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, inputSerializer);
+                    new SchemaOrSerdeValidator().validateField(name, conf.getSchemaTypeOrClassName());
                 } catch (IllegalArgumentException ex) {
                     throw new IllegalArgumentException(
                             String.format("The input serialization/deserialization class %s does not not implement %s",
-
-                                    inputSerializer, SerDe.class.getCanonicalName()));
+                                    conf.getSchemaTypeOrClassName(), Schema.class.getCanonicalName()));
                 }
 
-                if (inputSerializer.equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                        throw new IllegalArgumentException("The default Serializer does not support type " +
-                                typeArgs[0]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
-                                inputSerializer));
-                    }
-                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in generic type
-                    // load the actual type class for verification
-                    Class<?> fnInputClass;
-                    Class<?> serdeInputClass;
-                    try {
-                        fnInputClass = Class.forName(typeArgs[0].getName(), true, clsLoader);
-                        serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new IllegalArgumentException("Failed to load type class", e);
-                    }
-
-                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                        throw new IllegalArgumentException("Serializer type mismatch " + typeArgs[0] + " vs " + serDeTypes[0]);
-                    }
-                }
+                validateSchemaOrSerDeType(conf.getSchemaTypeOrClassName(), typeArgs[0], clsLoader);
             });
-            functionConfig.getInputs().forEach((topicName) -> {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                    throw new RuntimeException("Default Serializer does not support type " + typeArgs[0]);
-                }
-            });
-            if (!Void.class.equals(typeArgs[1])) {
-                if (functionConfig.getOutputSerdeClassName() == null
-                        || functionConfig.getOutputSerdeClassName().isEmpty()
-                        || functionConfig.getOutputSerdeClassName().equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                        throw new RuntimeException("Default Serializer does not support type " + typeArgs[1]);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) Reflections.createInstance(functionConfig.getOutputSerdeClassName(),
-                            clsLoader);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(String.format("SerDe class %s does not exist",
-                                functionConfig.getOutputSerdeClassName()));
-                    }
-                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
 
-                    // type inheritance information seems to be lost in generic type
-                    // load the actual type class for verification
-                    Class<?> fnOutputClass;
-                    Class<?> serdeOutputClass;
-                    try {
-                        fnOutputClass = Class.forName(typeArgs[1].getName(), true, clsLoader);
-                        serdeOutputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new RuntimeException("Failed to load type class", e);
-                    }
+            if (Void.class.equals(typeArgs[1])) {
+                return;
+            }
 
-                    if (!serdeOutputClass.isAssignableFrom(fnOutputClass)) {
-                        throw new RuntimeException("Serializer type mismatch " + typeArgs[1] + " vs " + serDeTypes[0]);
-                    }
-                }
+            if (StringUtils.isEmpty(functionConfig.getOutputSchemaOrClassName())
+                    || functionConfig.getOutputSchemaOrClassName().equals(DEFAULT_SERDE)) {
+                // Default schema will be working for any type
+                return;
+            } else if (getBuiltinSchemaType(functionConfig.getOutputSchemaOrClassName()) != null) {
+                // If it's built-in, no need to validate
+                return;
+            } else {
+                validateSchemaOrSerDeType(functionConfig.getOutputSchemaOrClassName(), typeArgs[1], clsLoader);
             }
         }
 
@@ -457,9 +428,11 @@ private static void doPythonChecks(FunctionConfig functionConfig, String name) {
                 throw new IllegalArgumentException("There is currently no support windowing in python");
             }
 
-            if (StringUtils.isNotBlank(functionConfig.getTopicsPattern())) {
-                throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
-            }
+            functionConfig.getInputSpecs().forEach((topic, conf) -> {
+                if (conf.isRegexPattern()) {
+                    throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
+                }
+            });
         }
 
         private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
@@ -471,13 +444,12 @@ private static void verifyNoTopicClash(Collection<String> inputTopics, String ou
         }
 
         private static void doCommonChecks(FunctionConfig functionConfig) {
-            if ((functionConfig.getInputs().isEmpty() && StringUtils.isEmpty(functionConfig.getTopicsPattern()))
-                    && functionConfig.getCustomSerdeInputs().isEmpty()) {
+            if (functionConfig.getInputSpecs().isEmpty()) {
                 throw new RuntimeException("No input topic(s) specified for the function");
             }
 
             // Ensure that topics aren't being used as both input and output
-            verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
+            verifyNoTopicClash(functionConfig.getInputSpecs().keySet(), functionConfig.getOutput());
 
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (windowConfig != null) {
@@ -659,58 +631,28 @@ public void validateField(String name, Object o) {
 
             try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
                     Collections.emptySet())) {
-
                 Class<?> typeArg = getSourceType(sourceClassName, clsLoader);
-                String serdeClassname = sourceConfig.getSerdeClassName();
 
-                if (StringUtils.isEmpty(serdeClassname)) {
-                    serdeClassname = DefaultSerDe.class.getName();
+                if (StringUtils.isEmpty(sourceConfig.getSchemaTypeOrClassName())
+                        || sourceConfig.getSchemaTypeOrClassName().equals(DEFAULT_SERDE)) {
+                    // If it's empty, we use the default schema and no need to validate
+                    return;
                 }
 
-                try {
-                    loadClass(serdeClassname);
-                } catch (ClassNotFoundException e) {
-                    throw new IllegalArgumentException(String
-                            .format("The input serialization/deserialization class %s does not exist", serdeClassname));
+                if (getBuiltinSchemaType(sourceConfig.getSchemaTypeOrClassName()) != null) {
+                    // If it's built-in, no need to validate
+                    return;
                 }
 
                 try {
-                    new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, serdeClassname);
+                    new SchemaOrSerdeValidator().validateField(name, sourceConfig.getSchemaTypeOrClassName());
                 } catch (IllegalArgumentException ex) {
                     throw new IllegalArgumentException(
                             String.format("The input serialization/deserialization class %s does not not implement %s",
-                                    serdeClassname, SerDe.class.getCanonicalName()));
+                                    sourceConfig.getSchemaTypeOrClassName(), Schema.class.getCanonicalName()));
                 }
 
-                if (serdeClassname.equals(DefaultSerDe.class.getName())) {
-                    if (!DefaultSerDe.IsSupportedType(typeArg)) {
-                        throw new IllegalArgumentException("The default Serializer does not support type " + typeArg);
-                    }
-                } else {
-                    SerDe serDe = (SerDe) Reflections.createInstance(serdeClassname, clsLoader);
-                    if (serDe == null) {
-                        throw new IllegalArgumentException(
-                                String.format("The SerDe class %s does not exist", serdeClassname));
-                    }
-                    Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                    // type inheritance information seems to be lost in generic type
-                    // load the actual type class for verification
-                    Class<?> fnInputClass;
-                    Class<?> serdeInputClass;
-                    try {
-                        fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-                        // get output serde
-                        serdeInputClass = Class.forName(serDeTypes[1].getName(), true, clsLoader);
-                    } catch (ClassNotFoundException e) {
-                        throw new IllegalArgumentException("Failed to load type class", e);
-                    }
-
-                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                        throw new IllegalArgumentException(
-                                "Serializer type mismatch " + typeArg + " vs " + serDeTypes[1]);
-                    }
-                }
+                validateSchemaOrSerDeType(sourceConfig.getSchemaTypeOrClassName(), typeArg, clsLoader);
             } catch (IOException e) {
                 throw new IllegalArgumentException(e);
             }
@@ -733,8 +675,7 @@ public void validateField(String name, Object o) {
             }
 
             // make we sure we have one source of input
-            if ((sinkConfig.getTopicToSerdeClassName() == null || sinkConfig.getTopicToSerdeClassName().isEmpty())
-                    && isBlank(sinkConfig.getTopicsPattern())) {
+            if (sinkConfig.getTopicsToSchema().isEmpty()) {
                 throw new IllegalArgumentException("Must specify at least one topic of input via inputs, " +
                         "customSerdeInputs, or topicPattern");
             }
@@ -745,60 +686,30 @@ public void validateField(String name, Object o) {
                 String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
                 Class<?> typeArg = getSinkType(sinkClassName, clsLoader);
 
-                if (sinkConfig.getTopicToSerdeClassName() != null) {
-                    sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassname) -> {
-                        if (StringUtils.isEmpty(serdeClassname)) {
-                            serdeClassname = DefaultSerDe.class.getName();
-                        }
-
-                        try {
-                            loadClass(serdeClassname);
-                        } catch (ClassNotFoundException e) {
-                            throw new IllegalArgumentException(String.format(
-                                    "The input serialization/deserialization class %s does not exist", serdeClassname));
-                        }
-
-                        try {
-                            new ValidatorImpls.ImplementsClassValidator(SerDe.class).validateField(name, serdeClassname);
-
-                        } catch (IllegalArgumentException ex) {
-                            throw new IllegalArgumentException(String.format(
-                                    "The input serialization/deserialization class %s does not not " + "implement %s",
-                                    serdeClassname, SerDe.class.getCanonicalName()));
-                        }
-
-                        if (serdeClassname.equals(DefaultSerDe.class.getName())) {
-                            if (!DefaultSerDe.IsSupportedType(typeArg)) {
-                                throw new IllegalArgumentException("The default Serializer does not support type " +
-                                        typeArg);
-                            }
-                        } else {
-                            SerDe serDe = (SerDe) Reflections.createInstance(serdeClassname, clsLoader);
-                            if (serDe == null) {
-                                throw new IllegalArgumentException(
-                                        String.format("The SerDe class %s does not exist", serdeClassname));
-                            }
-                            Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
-                            // type inheritance information seems to be lost in generic type
-                            // load the actual type class for verification
-                            Class<?> fnInputClass;
-                            Class<?> serdeInputClass;
-                            try {
-                                fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
-                                // get input serde
-                                serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
-                            } catch (ClassNotFoundException e) {
-                                throw new IllegalArgumentException("Failed to load type class", e);
-                            }
-
-                            if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                                throw new IllegalArgumentException(
-                                        "Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
-                            }
-                        }
-                    });
-                }
+                sinkConfig.getTopicsToSchema().forEach((topicName, consumerSpec) -> {
+                    if (StringUtils.isEmpty(consumerSpec.getSchemaTypeOrClassName())
+                            || consumerSpec.getSchemaTypeOrClassName().equals(DEFAULT_SERDE)) {
+                        // If it's empty, we use the default schema and no need to validate
+                        return;
+                    }
+
+                    if (getBuiltinSchemaType(consumerSpec.getSchemaTypeOrClassName()) != null) {
+                        // If it's built-in, no need to validate
+                        return;
+                    }
+
+                    String className = consumerSpec.getSchemaTypeOrClassName();
+
+                    try {
+                        new SchemaOrSerdeValidator().validateField(name, className);
+                    } catch (IllegalArgumentException ex) {
+                        throw new IllegalArgumentException(
+                                String.format("The input serialization/deserialization class %s does not not implement %s",
+                                        className, Schema.class.getCanonicalName()));
+                    }
+
+                    validateSchemaOrSerDeType(className, typeArg, clsLoader);
+                });
             } catch (IOException e) {
                 throw new IllegalArgumentException(e);
             }
@@ -867,4 +778,72 @@ public void validateField(String name, Object o) {
         }
         return objectClass;
     }
+
+
+    private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
+        try {
+            return SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            // schemaType is not referring to builtin type
+            return null;
+        }
+    }
+
+    private static void validateSchemaOrSerDeType(String scheamOrSerdeClassName, Class<?> typeArg, ClassLoader clsLoader) {
+        try {
+            validateSerDeType(scheamOrSerdeClassName, typeArg, clsLoader);
+        } catch (Throwable t) {
+            validateCustomSchemaType(scheamOrSerdeClassName, typeArg, clsLoader);
+        }
+    }
+
+    private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) {
+        SerDe<?> serDe = (SerDe<?>) Reflections.createInstance(serdeClassName, clsLoader);
+        if (serDe == null) {
+            throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
+                    serdeClassName));
+        }
+        Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+
+        // type inheritance information seems to be lost in generic type
+        // load the actual type class for verification
+        Class<?> fnInputClass;
+        Class<?> serdeInputClass;
+        try {
+            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
+            serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Failed to load type class", e);
+        }
+
+        if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+            throw new IllegalArgumentException(
+                    "Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+        }
+    }
+
+    private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader) {
+        Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
+        if (schema == null) {
+            throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
+                    schemaClassName));
+        }
+        Class<?>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
+
+        // type inheritance information seems to be lost in generic type
+        // load the actual type class for verification
+        Class<?> fnInputClass;
+        Class<?> schemaInputClass;
+        try {
+            fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
+            schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Failed to load type class", e);
+        }
+
+        if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+            throw new IllegalArgumentException(
+                    "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e4cf2cc99d..eb543c4668 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -75,11 +75,11 @@ public WorkerService(WorkerConfig workerConfig) {
 
     public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
-        
+
         this.admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                 workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                 workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
-        
+
         try {
             log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
                     .writeValueAsString(workerConfig));
@@ -140,7 +140,7 @@ public void start(URI dlogUri) throws InterruptedException {
 
             // initialize function metadata manager
             this.functionMetaDataManager.initialize();
-            
+
             authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
 
             // Starting cluster services
@@ -163,7 +163,7 @@ public void start(URI dlogUri) throws InterruptedException {
             this.isInitialized = true;
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
-            
+
             int metricsSamplingPeriodSec = this.workerConfig.getMetricsSamplingPeriodSec();
             if (metricsSamplingPeriodSec > 0) {
                 this.statsUpdater.scheduleAtFixedRate(() -> this.functionRuntimeManager.updateRates(),
@@ -214,7 +214,7 @@ public void stop() {
         if (null != schedulerManager) {
             schedulerManager.close();
         }
-        
+
         if (null != this.admin) {
             this.admin.close();
         }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index c59d03dde4..8b12d5ed07 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -55,7 +55,6 @@
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
@@ -63,6 +62,7 @@
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
+import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -119,12 +119,12 @@ public void write(Record<byte[]> record) throws Exception {
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
     private static final String outputTopic = "test-output-topic";
-    private static final String outputSerdeClassName = DefaultSerDe.class.getName();
+    private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
     private static final String className = TestFunction.class.getName();
     private SubscriptionType subscriptionType = SubscriptionType.FAILOVER;
     private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
     static {
-        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DefaultSerDe.class.getName());
+        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE);
     }
     private static final int parallelism = 1;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message