pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Source/Sink Endpoint validations (#2807)
Date Fri, 19 Oct 2018 04:59:32 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1386e6d  Source/Sink Endpoint validations (#2807)
1386e6d is described below

commit 1386e6dfdba16bb1ee0a3c7649cb7daed50a5d8c
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Thu Oct 18 21:59:27 2018 -0700

    Source/Sink Endpoint validations (#2807)
    
    * Added Get and List source/sink functionality
    
    * Fixed compile
    
    * Removed test that doesnt make sense any more
    
    * Fixed build
    
    * Fixed logic
    
    * Return error response
    
    * Return response on error
    
    * Fix unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Added get/list sinks tests
    
    * Added get/list tests
    
    * Add more unittests
    
    * Added more unittests
    
    * Added TODO
    
    * Took feedback
    
    * Fix unittest
    
    * Fix unittest
    
    * Fix unittest
    
    * Fixed integration tests
    
    * Fixed integration test
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  10 +-
 .../apache/pulsar/broker/admin/impl/SinkBase.java  |  10 +-
 .../pulsar/broker/admin/impl/SourceBase.java       |  10 +-
 .../worker/PulsarWorkerAssignmentTest.java         |   2 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  30 ---
 .../org/apache/pulsar/client/admin/Functions.java  |   3 +-
 .../java/org/apache/pulsar/client/admin/Sink.java  |   6 +-
 .../org/apache/pulsar/client/admin/Source.java     |   6 +-
 .../client/admin/internal/FunctionsImpl.java       |   8 +-
 .../pulsar/client/admin/internal/SinkImpl.java     |  12 +-
 .../pulsar/client/admin/internal/SourceImpl.java   |  12 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   4 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  86 +++++--
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  84 +++++--
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |   8 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   8 +-
 .../functions/utils/FunctionConfigUtils.java       |  82 +++++++
 .../pulsar/functions/utils/SinkConfigUtils.java    |  57 +++++
 .../pulsar/functions/utils/SourceConfigUtils.java  |  44 +++-
 .../org/apache/pulsar/functions/utils/Utils.java   |  20 ++
 .../functions/utils/FunctionConfigUtilsTest.java   |  61 +++++
 .../functions/utils/SinkConfigUtilsTest.java       |  59 +++++
 .../functions/utils/SourceConfigUtilsTest.java     |  53 +++++
 .../functions/worker/FunctionMetaDataManager.java  |   6 +-
 .../functions/worker/rest/api/FunctionsImpl.java   | 252 ++++++++++++---------
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  11 +-
 .../worker/rest/api/v2/SinkApiV2Resource.java      |  11 +-
 .../worker/rest/api/v2/SourceApiV2Resource.java    |  11 +-
 .../worker/FunctionMetaDataManagerTest.java        |  23 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  80 +++++--
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  | 168 ++++++++------
 .../rest/api/v2/SourceApiV2ResourceTest.java       | 143 +++++++-----
 .../integration/functions/PulsarFunctionsTest.java |  16 +-
 33 files changed, 984 insertions(+), 412 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def4b2b..b50da21 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @PUT
@@ -106,7 +106,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -124,7 +124,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @GET
@@ -143,7 +143,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @GET
@@ -196,7 +196,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.FUNCTION);
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2fe3989..0f5a5c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -72,7 +72,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                                  final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
     }
 
     @PUT
@@ -93,7 +93,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                                final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -111,7 +111,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public Response deregisterSink(final @PathParam("tenant") String tenant,
                                    final @PathParam("namespace") String namespace,
                                    final @PathParam("sinkName") String sinkName) {
-        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
     }
 
     @GET
@@ -129,7 +129,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public Response getSinkInfo(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sinkName);
+        return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
@@ -180,7 +180,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     @Path("/{tenant}/{namespace}")
     public Response listSinks(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 1a82ac2..4bda489 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -73,7 +73,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @PUT
@@ -94,7 +94,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                                  final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -112,7 +112,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public Response deregisterSource(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("sourceName") String sourceName) {
-        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @GET
@@ -131,7 +131,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sourceName") String sourceName) throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, sourceName);
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
@@ -183,7 +183,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public Response listSources(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.SOURCE);
 
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index afa7ef1..acd6ed3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -308,7 +308,7 @@ public class PulsarWorkerAssignmentTest {
         // validate updated function prop = auto-ack=false and instnaceid
         for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
             String functionName = baseFunctionName + i;
-            assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck());
+            assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).isAutoAck());
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index babcdd6..21133d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -407,36 +407,6 @@ public class PulsarFunctionE2ETest {
         }
     }
 
-    /**
-     * Test to verify: function-server loads jar using file-url and derives type-args classes if not provided
-     * @throws Exception
-     */
-    @Test(timeOut = 20000)
-    public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
-
-        final String namespacePortion = "io";
-        final String replNamespace = tenant + "/" + namespacePortion;
-        final String sinkTopic = "persistent://" + replNamespace + "/output";
-        final String functionName = "PulsarSink-test";
-        final String subscriptionName = "test-sub";
-        admin.namespaces().createNamespace(replNamespace);
-        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
-        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
-
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
-
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                "my.*", sinkTopic, subscriptionName);
-
-        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
-
-        FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName);
-
-        assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName());
-        assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName());
-
-    }
-
     @Test(timeOut = 20000)
     public void testFunctionStopAndRestartApi() throws Exception {
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 10c890c..a722605 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -25,7 +25,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -77,7 +76,7 @@ public interface Functions {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
+    FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
      * Create a new function.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 3f8fe2f..afad6f3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SinkConfig;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * Admin interface for Sink management.
@@ -50,7 +48,7 @@ public interface Sink {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
+    List<String> listSinks(String tenant, String namespace) throws PulsarAdminException;
 
     /**
      * Get the configuration for the specified sink.
@@ -77,7 +75,7 @@ public interface Sink {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+    SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
 
     /**
      * Create a new sink.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 3c43cf2..9d1a318 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -22,13 +22,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SourceConfig;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * Admin interface for Source management.
@@ -50,7 +48,7 @@ public interface Source {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
+    List<String> listSources(String tenant, String namespace) throws PulsarAdminException;
 
     /**
      * Get the configuration for the specified source.
@@ -77,7 +75,7 @@ public interface Source {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
+    SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException;
 
     /**
      * Create a new source.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index b9ea1a5..77cc3d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -80,16 +79,13 @@ public class FunctionsImpl extends BaseResource implements Functions {
     }
 
     @Override
-    public FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
+    public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
         try {
              Response response = request(functions.path(tenant).path(namespace).path(function)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(FunctionConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 4e13693..d117374 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.client.admin.Sink;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SinkConfig;
@@ -44,8 +43,6 @@ import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 @Slf4j
 public class SinkImpl extends BaseResource implements Sink {
@@ -58,7 +55,7 @@ public class SinkImpl extends BaseResource implements Sink {
     }
 
     @Override
-    public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
+    public List<String> listSinks(String tenant, String namespace) throws PulsarAdminException {
         try {
             Response response = request(sink.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public class SinkImpl extends BaseResource implements Sink {
     }
 
     @Override
-    public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+    public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
         try {
              Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(SinkConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 65a2bfc..0c7a1df 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.client.admin.Source;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SourceConfig;
@@ -44,8 +43,6 @@ import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 @Slf4j
 public class SourceImpl extends BaseResource implements Source {
@@ -58,7 +55,7 @@ public class SourceImpl extends BaseResource implements Source {
     }
 
     @Override
-    public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
+    public List<String> listSources(String tenant, String namespace) throws PulsarAdminException {
         try {
             Response response = request(source.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public class SourceImpl extends BaseResource implements Source {
     }
 
     @Override
-    public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+    public SourceConfig getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
         try {
              Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(SourceConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 2f23a63..7e44b44 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -697,9 +697,9 @@ public class CmdFunctions extends CmdBase {
     class GetFunction extends FunctionCommand {
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName));
+            FunctionConfig functionConfig = admin.functions().getFunction(tenant, namespace, functionName);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
-            System.out.println(gson.toJson(new JsonParser().parse(json)));
+            System.out.println(gson.toJson(functionConfig));
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index e56a343..3b9159e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -30,16 +30,14 @@ import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import lombok.Getter;
@@ -67,6 +65,8 @@ public class CmdSinks extends CmdBase {
     private final CreateSink createSink;
     private final UpdateSink updateSink;
     private final DeleteSink deleteSink;
+    private final ListSinks listSinks;
+    private final GetSink getSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
@@ -74,11 +74,15 @@ public class CmdSinks extends CmdBase {
         createSink = new CreateSink();
         updateSink = new UpdateSink();
         deleteSink = new DeleteSink();
+        listSinks = new ListSinks();
+        getSink = new GetSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
         jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
+        jcommander.addCommand("list", listSinks);
+        jcommander.addCommand("get", getSink);
         jcommander.addCommand("localrun", localSinkRunner);
         jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
@@ -184,7 +188,7 @@ public class CmdSinks extends CmdBase {
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster")
-    protected class CreateSink extends SinkCommand {
+    protected class CreateSink extends SinkDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -197,7 +201,7 @@ public class CmdSinks extends CmdBase {
     }
 
     @Parameters(commandDescription = "Update a Pulsar IO sink connector")
-    protected class UpdateSink extends SinkCommand {
+    protected class UpdateSink extends SinkDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -209,7 +213,7 @@ public class CmdSinks extends CmdBase {
         }
     }
 
-    abstract class SinkCommand extends BaseCommand {
+    abstract class SinkDetailsCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The sink's namespace")
@@ -506,25 +510,70 @@ public class CmdSinks extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
-    protected class DeleteSink extends BaseCommand {
-
-        @Parameter(names = "--tenant", description = "The tenant of the sink")
+    /**
+     * Sink level command
+     */
+    @Getter
+    abstract class SinkCommand extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of the sink")
+        @Parameter(names = "--namespace", description = "The sink's namespace")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of the sink")
-        protected String name;
+        @Parameter(names = "--name", description = "The sink's name")
+        protected String sinkName;
 
         @Override
         void processArguments() throws Exception {
             super.processArguments();
-            if (null == name) {
-                throw new ParameterException(
+            if (tenant == null) {
+                tenant = PUBLIC_TENANT;
+            }
+            if (namespace == null) {
+                namespace = DEFAULT_NAMESPACE;
+            }
+            if (null == sinkName) {
+                throw new RuntimeException(
                         "You must specify a name for the sink");
             }
+        }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
+    protected class DeleteSink extends SinkCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            admin.sink().deleteSink(tenant, namespace, sinkName);
+            print("Deleted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Gets the information about a Pulsar IO sink connector")
+    protected class GetSink extends SinkCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, sinkName);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sinkConfig));
+        }
+    }
+
+    /**
+     * List Sources command
+     */
+    @Parameters(commandDescription = "List all running Pulsar IO sink connectors")
+    protected class ListSinks extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The sink's namespace")
+        protected String namespace;
+
+        @Override
+        public void processArguments() {
             if (tenant == null) {
                 tenant = PUBLIC_TENANT;
             }
@@ -535,8 +584,9 @@ public class CmdSinks extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            admin.sink().deleteSink(tenant, namespace, name);
-            print("Deleted successfully");
+            List<String> sinks = admin.sink().listSinks(tenant, namespace);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sinks));
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f2d7681..f27b0a5 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -28,6 +28,7 @@ import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -66,6 +68,8 @@ public class CmdSources extends CmdBase {
 
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
+    private final GetSource getSource;
+    private final ListSources listSources;
     private final UpdateSource updateSource;
     private final LocalSourceRunner localSourceRunner;
 
@@ -74,11 +78,15 @@ public class CmdSources extends CmdBase {
         createSource = new CreateSource();
         updateSource = new UpdateSource();
         deleteSource = new DeleteSource();
+        listSources = new ListSources();
+        getSource = new GetSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
+        jcommander.addCommand("get", getSource);
+        jcommander.addCommand("list", listSources);
         jcommander.addCommand("localrun", localSourceRunner);
         jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
@@ -184,7 +192,7 @@ public class CmdSources extends CmdBase {
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster")
-    protected class CreateSource extends SourceCommand {
+    protected class CreateSource extends SourceDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
@@ -197,7 +205,7 @@ public class CmdSources extends CmdBase {
     }
 
     @Parameters(commandDescription = "Update a Pulsar IO source connector")
-    protected class UpdateSource extends SourceCommand {
+    protected class UpdateSource extends SourceDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -209,7 +217,7 @@ public class CmdSources extends CmdBase {
         }
     }
 
-    abstract class SourceCommand extends BaseCommand {
+    abstract class SourceDetailsCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The source's namespace")
@@ -460,40 +468,86 @@ public class CmdSources extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Stops a Pulsar IO source connector")
-    protected class DeleteSource extends BaseCommand {
-
-        @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+    /**
+     * Function level command
+     */
+    @Getter
+    abstract class SourceCommand extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+        @Parameter(names = "--namespace", description = "The source's namespace")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of a sink or source")
-        protected String name;
+        @Parameter(names = "--name", description = "The source's name")
+        protected String sourceName;
 
         @Override
         void processArguments() throws Exception {
             super.processArguments();
-            if (null == name) {
-                throw new ParameterException(
-                        "You must specify a name for the source");
-            }
             if (tenant == null) {
                 tenant = PUBLIC_TENANT;
             }
             if (namespace == null) {
                 namespace = DEFAULT_NAMESPACE;
             }
+            if (null == sourceName) {
+                throw new RuntimeException(
+                            "You must specify a name for the source");
+            }
         }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar IO source connector")
+    protected class DeleteSource extends SourceCommand {
 
         @Override
         void runCmd() throws Exception {
-            admin.source().deleteSource(tenant, namespace, name);
+            admin.source().deleteSource(tenant, namespace, sourceName);
             print("Delete source successfully");
         }
     }
 
+    @Parameters(commandDescription = "Gets the information about a Pulsar IO source connector")
+    protected class GetSource extends SourceCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            SourceConfig sourceConfig = admin.source().getSource(tenant, namespace, sourceName);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sourceConfig));
+        }
+    }
+
+    /**
+     * List Sources command
+     */
+    @Parameters(commandDescription = "List all running Pulsar IO source connectors")
+    protected class ListSources extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The sink's namespace")
+        protected String namespace;
+
+        @Override
+        public void processArguments() {
+            if (tenant == null) {
+                tenant = PUBLIC_TENANT;
+            }
+            if (namespace == null) {
+                namespace = DEFAULT_NAMESPACE;
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            List<String> sources = admin.source().listSources(tenant, namespace);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sources));
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
     public class ListBuiltInSources extends BaseCommand {
         @Override
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 0c56145..b52bc17 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -962,7 +962,7 @@ public class TestCmdSinks {
     public void testDeleteMissingTenant() throws Exception {
         deleteSink.tenant = null;
         deleteSink.namespace = NAMESPACE;
-        deleteSink.name = NAME;
+        deleteSink.sinkName = NAME;
 
         deleteSink.processArguments();
 
@@ -975,7 +975,7 @@ public class TestCmdSinks {
     public void testDeleteMissingNamespace() throws Exception {
         deleteSink.tenant = TENANT;
         deleteSink.namespace = null;
-        deleteSink.name = NAME;
+        deleteSink.sinkName = NAME;
 
         deleteSink.processArguments();
 
@@ -984,11 +984,11 @@ public class TestCmdSinks {
         verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
     public void testDeleteMissingName() throws Exception {
         deleteSink.tenant = TENANT;
         deleteSink.namespace = NAMESPACE;
-        deleteSink.name = null;
+        deleteSink.sinkName = null;
 
         deleteSink.processArguments();
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index fe799bc..4a3b3cc 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -820,7 +820,7 @@ public class TestCmdSources {
     public void testDeleteMissingTenant() throws Exception {
         deleteSource.tenant = null;
         deleteSource.namespace = NAMESPACE;
-        deleteSource.name = NAME;
+        deleteSource.sourceName = NAME;
 
         deleteSource.processArguments();
 
@@ -833,7 +833,7 @@ public class TestCmdSources {
     public void testDeleteMissingNamespace() throws Exception {
         deleteSource.tenant = TENANT;
         deleteSource.namespace = null;
-        deleteSource.name = NAME;
+        deleteSource.sourceName = NAME;
 
         deleteSource.processArguments();
 
@@ -842,11 +842,11 @@ public class TestCmdSources {
         verify(source).deleteSource(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
     public void testDeleteMissingName() throws Exception {
         deleteSource.tenant = TENANT;
         deleteSource.namespace = NAMESPACE;
-        deleteSource.name = null;
+        deleteSource.sourceName = null;
 
         deleteSource.processArguments();
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index cf182a8..11b623d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -20,15 +20,18 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
+import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 public class FunctionConfigUtils {
 
@@ -195,4 +198,83 @@ public class FunctionConfigUtils {
         }
         return functionDetailsBuilder.build();
     }
+
+    public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(functionDetails.getTenant());
+        functionConfig.setNamespace(functionDetails.getNamespace());
+        functionConfig.setName(functionDetails.getName());
+        functionConfig.setParallelism(functionDetails.getParallelism());
+        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+        for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+            ConsumerConfig consumerConfig = new ConsumerConfig();
+            if (!isEmpty(input.getValue().getSerdeClassName())) {
+                consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+            }
+            if (!isEmpty(input.getValue().getSchemaType())) {
+                consumerConfig.setSchemaType(input.getValue().getSchemaType());
+            }
+            consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+            consumerConfigMap.put(input.getKey(), consumerConfig);
+        }
+        functionConfig.setInputSpecs(consumerConfigMap);
+        if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+            functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
+        }
+        if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+            functionConfig.setRetainOrdering(true);
+            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        } else {
+            functionConfig.setRetainOrdering(false);
+            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        }
+        functionConfig.setAutoAck(functionDetails.getAutoAck());
+        functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (!isEmpty(functionDetails.getSink().getTopic())) {
+            functionConfig.setOutput(functionDetails.getSink().getTopic());
+        }
+        if (!isEmpty(functionDetails.getSink().getSerDeClassName())) {
+            functionConfig.setOutputSerdeClassName(functionDetails.getSink().getSerDeClassName());
+        }
+        if (!isEmpty(functionDetails.getSink().getSchemaType())) {
+            functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
+        }
+        if (!isEmpty(functionDetails.getLogTopic())) {
+            functionConfig.setLogTopic(functionDetails.getLogTopic());
+        }
+        functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime()));
+        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        if (functionDetails.hasRetryDetails()) {
+            functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
+            if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
+                functionConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
+            }
+        }
+        Map<String, Object> userConfig;
+        if (!isEmpty(functionDetails.getUserConfig())) {
+            Type type = new TypeToken<Map<String, Object>>() {}.getType();
+            userConfig = new Gson().fromJson(functionDetails.getUserConfig(), type);
+        } else {
+            userConfig = new HashMap<>();
+        }
+        if (userConfig.containsKey(WindowConfig.WINDOW_CONFIG_KEY)) {
+            WindowConfig windowConfig = (WindowConfig) userConfig.get(WindowConfig.WINDOW_CONFIG_KEY);
+            userConfig.remove(WindowConfig.WINDOW_CONFIG_KEY);
+            functionConfig.setClassName(windowConfig.getActualWindowFunctionClassName());
+            functionConfig.setWindowConfig(windowConfig);
+        } else {
+            functionConfig.setClassName(functionDetails.getClassName());
+        }
+        functionConfig.setUserConfig(userConfig);
+
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+        }
+
+        return functionConfig;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 95803ab..545d344 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -29,9 +30,13 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 
@@ -175,4 +180,56 @@ public class SinkConfigUtils {
         }
         return functionDetailsBuilder.build();
     }
+
+    public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(functionDetails.getTenant());
+        sinkConfig.setNamespace(functionDetails.getNamespace());
+        sinkConfig.setName(functionDetails.getName());
+        sinkConfig.setParallelism(functionDetails.getParallelism());
+        sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+        for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+            ConsumerConfig consumerConfig = new ConsumerConfig();
+            if (!isEmpty(input.getValue().getSerdeClassName())) {
+                consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+            }
+            if (!isEmpty(input.getValue().getSchemaType())) {
+                consumerConfig.setSchemaType(input.getValue().getSchemaType());
+            }
+            consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+            consumerConfigMap.put(input.getKey(), consumerConfig);
+        }
+        sinkConfig.setInputSpecs(consumerConfigMap);
+        if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+            sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
+        }
+        if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+            sinkConfig.setRetainOrdering(true);
+            sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        } else {
+            sinkConfig.setRetainOrdering(false);
+            sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        }
+        sinkConfig.setAutoAck(functionDetails.getAutoAck());
+        sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (!isEmpty(functionDetails.getSink().getClassName())) {
+            sinkConfig.setClassName(functionDetails.getSink().getClassName());
+        }
+        if (!isEmpty(functionDetails.getSink().getBuiltin())) {
+            sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin());
+        }
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type));
+        }
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+        }
+
+        return sinkConfig;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a132c8a..3424062 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,6 +20,8 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -27,6 +29,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
 
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -41,10 +45,10 @@ public class SourceConfigUtils {
 
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
-        boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if (sourceConfig.getArchive().startsWith(Utils.FILE)) {
+            if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) {
                 if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be present for archive with file-url");
                 }
@@ -127,4 +131,40 @@ public class SourceConfigUtils {
 
         return functionDetailsBuilder.build();
     }
+
+    public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(functionDetails.getTenant());
+        sourceConfig.setNamespace(functionDetails.getNamespace());
+        sourceConfig.setName(functionDetails.getName());
+        sourceConfig.setParallelism(functionDetails.getParallelism());
+        sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Function.SourceSpec sourceSpec = functionDetails.getSource();
+        if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
+            sourceConfig.setClassName(sourceSpec.getClassName());
+        }
+        if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
+        }
+        if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type));
+        }
+        Function.SinkSpec sinkSpec = functionDetails.getSink();
+        sourceConfig.setTopicName(sinkSpec.getTopic());
+        if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
+            sourceConfig.setSchemaType(sinkSpec.getSchemaType());
+        }
+        if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
+            sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
+        }
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+            sourceConfig.setResources(resources);
+        }
+        return sourceConfig;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index d35be61..adeaee1 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -151,6 +151,15 @@ public class Utils {
         throw new RuntimeException("Unrecognized runtime: " + runtime.name());
     }
 
+    public static FunctionConfig.Runtime convertRuntime(Runtime runtime) {
+        for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) {
+            if (type.name().equals(runtime.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
+    }
+
     public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees convertProcessingGuarantee(
             FunctionConfig.ProcessingGuarantees processingGuarantees) {
         for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type : org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.values()) {
@@ -161,6 +170,17 @@ public class Utils {
         throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
     }
 
+    public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
+            org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) {
+        for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) {
+            if (type.name().equals(processingGuarantees.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
+    }
+
+
     public static Class<?> getSourceType(String className, ClassLoader classloader) {
 
         Object userClass = Reflections.createInstance(className, classloader);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
new file mode 100644
index 0000000..1f67798
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class FunctionConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant("test-tenant");
+        functionConfig.setNamespace("test-namespace");
+        functionConfig.setName("test-function");
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        functionConfig.setInputSpecs(inputSpecs);
+        functionConfig.setOutput("test-output");
+        functionConfig.setOutputSerdeClassName("test-serde");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        functionConfig.setRetainOrdering(false);
+        functionConfig.setUserConfig(new HashMap<>());
+        functionConfig.setAutoAck(true);
+        functionConfig.setTimeoutMs(2000l);
+        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
new file mode 100644
index 0000000..c5d1ea0
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SinkConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() throws IOException  {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant("test-tenant");
+        sinkConfig.setNamespace("test-namespace");
+        sinkConfig.setName("test-source");
+        sinkConfig.setArchive("builtin://jdbc");
+        sinkConfig.setSourceSubscriptionName("test-subscription");
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        sinkConfig.setInputSpecs(inputSpecs);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setConfigs(new HashMap<>());
+        sinkConfig.setRetainOrdering(false);
+        sinkConfig.setAutoAck(true);
+        sinkConfig.setTimeoutMs(2000l);
+        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, null);
+        SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
new file mode 100644
index 0000000..ef4ce61
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SourceConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() throws IOException  {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant("test-tenant");
+        sourceConfig.setNamespace("test-namespace");
+        sourceConfig.setName("test-source");
+        sourceConfig.setArchive("builtin://jdbc");
+        sourceConfig.setTopicName("test-output");
+        sourceConfig.setSerdeClassName("test-serde");
+        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setConfigs(new HashMap<>());
+        Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, null);
+        SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 4faed11..920063e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -141,8 +141,8 @@ public class FunctionMetaDataManager implements AutoCloseable {
      * @param namespace the namespace
      * @return a list of function names
      */
-    public synchronized Collection<String> listFunctions(String tenant, String namespace) {
-        List<String> ret = new LinkedList<>();
+    public synchronized Collection<FunctionMetaData> listFunctions(String tenant, String namespace) {
+        List<FunctionMetaData> ret = new LinkedList<>();
 
         if (!this.functionMetaDataMap.containsKey(tenant)) {
             return ret;
@@ -152,7 +152,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
             return ret;
         }
         for (FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
-            ret.add(functionMetaData.getFunctionDetails().getName());
+            ret.add(functionMetaData);
         }
         return ret;
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0b245cf..44bb3bd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -97,9 +98,15 @@ import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import net.jodah.typetools.TypeResolver;
 
+// TODO:-Currently The source/sink/functions all share this backend. In the future it might make sense
+// to seperate them out in their own implementations as well.
 @Slf4j
 public class FunctionsImpl {
 
+    public static final String FUNCTION = "Function";
+    public static final String SOURCE = "Source";
+    public static final String SINK = "Sink";
+
     private final Supplier<WorkerService> workerServiceSupplier;
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
@@ -126,11 +133,10 @@ public class FunctionsImpl {
         return true;
     }
 
-    public Response registerFunction(final String tenant, final String namespace, final String functionName,
+    public Response registerFunction(final String tenant, final String namespace, final String componentName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
-            final String sourceConfigJson, final String sinkConfigJson,
-            final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+            final String componentType, final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -138,23 +144,23 @@ public class FunctionsImpl {
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
+        if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s already exists", componentType, componentName))).build();
         }
 
         FunctionDetails functionDetails;
@@ -166,14 +172,14 @@ public class FunctionsImpl {
         // validate parameters
         try {
             if (isPkgUrlProvided) {
-                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+                        functionDetailsJson, componentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, componentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -181,9 +187,9 @@ public class FunctionsImpl {
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
-            log.error("Function {}/{}/{} cannot be admitted by the runtime factory", tenant, namespace, functionName);
+            log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
         }
 
         // function state
@@ -196,7 +202,7 @@ public class FunctionsImpl {
             packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
         } else {
             packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                    : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+                    : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
             if (!isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
             }
@@ -207,10 +213,10 @@ public class FunctionsImpl {
                 : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
-    public Response updateFunction(final String tenant, final String namespace, final String functionName,
+    public Response updateFunction(final String tenant, final String namespace, final String componentName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
-            final String sourceConfigJson, final String sinkConfigJson, final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+            final String componentType, final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -218,22 +224,22 @@ public class FunctionsImpl {
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
 
         FunctionDetails functionDetails;
@@ -245,14 +251,14 @@ public class FunctionsImpl {
         // validate parameters
         try {
             if (isPkgUrlProvided) {
-                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+                        functionDetailsJson, componentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, componentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -260,9 +266,9 @@ public class FunctionsImpl {
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
-            log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", tenant, namespace, functionName);
+            log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
         }
 
         // function state
@@ -276,7 +282,7 @@ public class FunctionsImpl {
             packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
         } else {
             packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                    : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+                    : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
             if (!isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
             }
@@ -287,8 +293,8 @@ public class FunctionsImpl {
                 : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
-    public Response deregisterFunction(final String tenant, final String namespace, final String functionName,
-            String clientRole) {
+    public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
+            String componentType, String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -296,35 +302,41 @@ public class FunctionsImpl {
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         // validate parameters
         try {
-            validateDeregisterRequestParams(tenant, namespace, functionName);
+            validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid deregister function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function to deregister does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
 
         CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, functionName);
+                namespace, componentName);
 
         RequestResult requestResult = null;
         try {
@@ -334,12 +346,12 @@ public class FunctionsImpl {
                         .entity(new ErrorData(requestResult.getMessage())).build();
             }
         } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+            log.error("Execution Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
                     e);
             return Response.serverError().type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getCause().getMessage())).build();
         } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
                     e);
             return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON).build();
         }
@@ -347,7 +359,8 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+    public Response getFunctionInfo(final String tenant, final String namespace, final String componentName,
+                                    final String componentType)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -356,25 +369,38 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunction request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunction does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
-        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
-                functionName);
-        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils
-                .printJson(functionMetaData.getFunctionDetails());
-        return Response.status(Status.OK).entity(functionDetailsJson).build();
+        String retval;
+        if (componentType.equals(FUNCTION)) {
+            FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        } else if (componentType.equals(SOURCE)) {
+            SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        } else {
+            SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        }
+        return Response.status(Status.OK).entity(retval).build();
     }
 
     public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
@@ -485,7 +511,7 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         } catch (IllegalArgumentException e) {
             log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -519,7 +545,7 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -552,7 +578,7 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
-    public Response listFunctions(final String tenant, final String namespace) {
+    public Response listFunctions(final String tenant, final String namespace, String componentType) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -562,16 +588,22 @@ public class FunctionsImpl {
         try {
             validateListFunctionRequestParams(tenant, namespace);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid listFunctions request @ /{}/{}", tenant, namespace, e);
+            log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+        Collection<FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+        List<String> retval = new LinkedList<>();
+        for (FunctionMetaData functionMetaData : functionStateList) {
+            if (calculateSubjectType(functionMetaData).equals(componentType)) {
+                retval.add(functionMetaData.getFunctionDetails().getName());
+            }
+        }
 
-        return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+        return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build();
     }
 
     private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) {
@@ -840,13 +872,13 @@ public class FunctionsImpl {
 
     private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
             String instanceId) throws IllegalArgumentException {
-        validateGetFunctionRequestParams(tenant, namespace, functionName);
+        validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         if (instanceId == null) {
             throw new IllegalArgumentException("Function Instance Id is not provided");
         }
     }
 
-    private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName)
+    private void validateGetFunctionRequestParams(String tenant, String namespace, String subject, String subjectType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -855,12 +887,12 @@ public class FunctionsImpl {
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (subject == null) {
+            throw new IllegalArgumentException(subjectType + " Name is not provided");
         }
     }
 
-    private void validateDeregisterRequestParams(String tenant, String namespace, String functionName)
+    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, String subjectType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -869,30 +901,30 @@ public class FunctionsImpl {
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (subject == null) {
+            throw new IllegalArgumentException(subjectType + " Name is not provided");
         }
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
-            String functionPkgUrl, String functionDetailsJson, String functionConfigJson,
-            String sourceConfigJson, String sinkConfigJson)
+    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
+            String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
+            String componentType)
             throws IllegalArgumentException, IOException, URISyntaxException {
         if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
-        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null);
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+                functionDetailsJson, componentConfigJson, componentType, functionPkgUrl, null);
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
+    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
             File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
-            String functionConfigJson, String sourceConfigJson, String sinkConfigJson)
+            String componentConfigJson, String componentType)
             throws IllegalArgumentException, IOException, URISyntaxException {
 
-        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile);
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+                functionDetailsJson, componentConfigJson, componentType,null, uploadedInputStreamAsFile);
         if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not provided");
         }
@@ -964,40 +996,21 @@ public class FunctionsImpl {
         return null;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
-            String functionDetailsJson, String functionConfigJson, String sourceConfigJson,
-            String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
+            String functionDetailsJson, String componentConfigJson, String componentType,
+            String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (componentName == null) {
+            throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
         }
 
-        int numDefinitions = 0;
-        if (!StringUtils.isEmpty(functionDetailsJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(functionConfigJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(sourceConfigJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(sinkConfigJson)) {
-            numDefinitions++;
-        }
-        if (numDefinitions == 0) {
-            throw new IllegalArgumentException("Function Info is not provided");
-        }
-        if (numDefinitions > 1) {
-            throw new IllegalArgumentException("Conflicting Info provided");
-        }
-        if (!StringUtils.isEmpty(functionConfigJson)) {
-            FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
+        if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
+            FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
             ClassLoader clsLoader = null;
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                 clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
@@ -1008,14 +1021,14 @@ public class FunctionsImpl {
             ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
-        if (!StringUtils.isEmpty(sourceConfigJson)) {
-            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
+        if (componentType.equals(SOURCE)) {
+            SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
             NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
             ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SourceConfigUtils.convert(sourceConfig, clsLoader);
         }
-        if (!StringUtils.isEmpty(sinkConfigJson)) {
-            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
+        if (componentType.equals(SINK)) {
+            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
             NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
             ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SinkConfigUtils.convert(sinkConfig, clsLoader);
@@ -1260,4 +1273,23 @@ public class FunctionsImpl {
         return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
+    public String calculateSubjectType(FunctionMetaData functionMetaData) {
+        SourceSpec sourceSpec = functionMetaData.getFunctionDetails().getSource();
+        SinkSpec sinkSpec = functionMetaData.getFunctionDetails().getSink();
+        if (sourceSpec.getInputSpecsCount() == 0) {
+            return SOURCE;
+        }
+        // Now its between sink and function
+
+        if (!isEmpty(sinkSpec.getBuiltin())) {
+            // if its built in, its a sink
+            return SINK;
+        }
+
+        if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+            return FUNCTION;
+        }
+        return SINK;
+    }
+
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index d6e1439..405f88f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -60,7 +61,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -86,7 +87,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}/{functionName}")
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @GET
@@ -96,7 +97,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                     final @PathParam("functionName") String functionName)
             throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @GET
@@ -123,7 +124,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.FUNCTION);
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 151c2c1..488f47d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -52,7 +53,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                  final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -68,7 +69,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}/{sinkName}")
     public Response deregisterSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
     }
 
     @GET
@@ -86,7 +87,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("sinkName") String sinkName)
             throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sinkName);
+        return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
@@ -111,7 +112,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}")
     public Response listSink(final @PathParam("tenant") String tenant,
                              final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 44fac19..3b1222e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -52,7 +53,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -68,7 +69,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
                                  final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}/{sourceName}")
     public Response deregisterSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @GET
@@ -86,7 +87,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sourceName") String sourceName)
             throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sourceName);
+        return functions.getFunctionInfo(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
@@ -111,7 +112,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}")
     public Response listSources(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SOURCE);
 
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a..7fc7c7f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -66,15 +66,16 @@ public class FunctionMetaDataManagerTest {
                         mockPulsarClient()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
-        functionMetaDataMap1.put("func-1", Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-1")).build());
-        functionMetaDataMap1.put("func-2",
-                Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-2")).build());
+        Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1")).build();
+        functionMetaDataMap1.put("func-1", f1);
+        Function.FunctionMetaData f2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-2")).build();
+        functionMetaDataMap1.put("func-2", f2);
+        Function.FunctionMetaData f3 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-3")).build();
         Map<String, Function.FunctionMetaData> functionMetaDataInfoMap2 = new HashMap<>();
-        functionMetaDataInfoMap2.put("func-3",
-                Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-3")).build());
+        functionMetaDataInfoMap2.put("func-3", f3);
 
 
         functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
@@ -86,13 +87,13 @@ public class FunctionMetaDataManagerTest {
         Assert.assertEquals(2, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-1").contains("func-1"));
+                "tenant-1", "namespace-1").contains(f1));
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-1").contains("func-2"));
+                "tenant-1", "namespace-1").contains(f2));
         Assert.assertEquals(1, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-2").contains("func-3"));
+                "tenant-1", "namespace-2").contains(f3));
     }
 
     @Test
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index d514fab..460d050 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,10 +21,9 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
@@ -62,6 +61,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -145,6 +145,7 @@ public class FunctionApiV2ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn("Function").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -311,8 +312,7 @@ public class FunctionApiV2ResourceTest {
                 null,
                 null,
                 new Gson().toJson(functionConfig),
-                null,
-                null,
+                FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -339,8 +339,7 @@ public class FunctionApiV2ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
     }
 
@@ -600,8 +599,7 @@ public class FunctionApiV2ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -629,8 +627,7 @@ public class FunctionApiV2ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
     }
 
@@ -714,8 +711,7 @@ public class FunctionApiV2ResourceTest {
             filePackageUrl,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -804,7 +800,8 @@ public class FunctionApiV2ResourceTest {
             tenant,
             namespace,
             function,
-            null);
+            FunctionsImpl.FUNCTION,
+                null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -815,7 +812,8 @@ public class FunctionApiV2ResourceTest {
             tenant,
             namespace,
             function,
-            null);
+            FunctionsImpl.FUNCTION,
+                 null);
     }
 
     @Test
@@ -910,7 +908,8 @@ public class FunctionApiV2ResourceTest {
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            function,
+                FunctionsImpl.FUNCTION);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -920,7 +919,8 @@ public class FunctionApiV2ResourceTest {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            function,
+                FunctionsImpl.FUNCTION);
     }
 
     @Test
@@ -960,8 +960,8 @@ public class FunctionApiV2ResourceTest {
         Response response = getDefaultFunctionInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
-            response.getEntity());
+                new Gson().toJson(FunctionConfigUtils.convertFromDetails(functionDetails)),
+                response.getEntity());
     }
 
     //
@@ -991,7 +991,8 @@ public class FunctionApiV2ResourceTest {
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.FUNCTION);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -1000,13 +1001,46 @@ public class FunctionApiV2ResourceTest {
     private Response listDefaultFunctions() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.FUNCTION);
     }
 
     @Test
     public void testListFunctionsSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> metaDataList = new LinkedList<>();
+        FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build();
+        FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build();
+        metaDataList.add(functionMetaData1);
+        metaDataList.add(functionMetaData2);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
+
+        Response response = listDefaultFunctions();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
+
+    @Test
+    public void testOnlyGetSources() throws Exception {
+        List<String> functions = Lists.newArrayList("test-2");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultFunctions();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -1068,7 +1102,7 @@ public class FunctionApiV2ResourceTest {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null, null, null);
+                null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 4e52c95..315f56d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,8 @@ import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -38,6 +41,7 @@ import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
@@ -51,6 +55,8 @@ import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -143,6 +149,7 @@ public class SinkApiV2ResourceTest {
         doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SinkConfigUtils.class);
         when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -188,7 +195,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Sink Name is not provided");
     }
 
     @Test
@@ -257,9 +264,8 @@ public class SinkApiV2ResourceTest {
                 details,
                 null,
                 null,
-                null,
-                null,
                 new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +288,8 @@ public class SinkApiV2ResourceTest {
             mockedFormData,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
     }
 
@@ -296,7 +301,7 @@ public class SinkApiV2ResourceTest {
 
         Response response = registerDefaultSink();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -421,7 +426,7 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Sink Name is not provided");
     }
 
     @Test
@@ -492,9 +497,8 @@ public class SinkApiV2ResourceTest {
             details,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -518,9 +522,8 @@ public class SinkApiV2ResourceTest {
             mockedFormData,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
     }
 
@@ -530,7 +533,7 @@ public class SinkApiV2ResourceTest {
 
         Response response = updateDefaultSink();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -600,9 +603,8 @@ public class SinkApiV2ResourceTest {
             null,
             filePackageUrl,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -678,7 +680,7 @@ public class SinkApiV2ResourceTest {
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Sink Name");
     }
 
     private void testDeregisterSinkMissingArguments(
@@ -691,6 +693,7 @@ public class SinkApiV2ResourceTest {
             tenant,
             namespace,
             sink,
+            FunctionsImpl.SINK,
             null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -702,6 +705,7 @@ public class SinkApiV2ResourceTest {
             tenant,
             namespace,
                 sink,
+            FunctionsImpl.SINK,
             null);
     }
 
@@ -711,7 +715,7 @@ public class SinkApiV2ResourceTest {
 
         Response response = deregisterDefaultSink();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -757,109 +761,116 @@ public class SinkApiV2ResourceTest {
         assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    // Source Info doesn't exist. Maybe one day they might be added
     //
-    // Get Function Info
+    // Get Sink Info
     //
 
-    /*
     @Test
-    public void testGetFunctionMissingTenant() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingTenant() throws Exception {
+        testGetSinkMissingArguments(
             null,
             namespace,
-                source,
+                sink,
             "Tenant");
     }
 
     @Test
-    public void testGetFunctionMissingNamespace() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingNamespace() throws Exception {
+        testGetSinkMissingArguments(
             tenant,
             null,
-                source,
+                sink,
             "Namespace");
     }
 
     @Test
-    public void testGetFunctionMissingFunctionName() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingFunctionName() throws Exception {
+        testGetSinkMissingArguments(
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Sink Name");
     }
 
-    private void testGetFunctionMissingArguments(
+    private void testGetSinkMissingArguments(
         String tenant,
         String namespace,
-        String function,
+        String sink,
         String missingFieldName
     ) throws IOException {
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            sink,
+                FunctionsImpl.SINK);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultFunctionInfo() throws IOException {
+    private Response getDefaultSinkInfo() throws IOException {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-                source);
+                sink,
+                FunctionsImpl.SINK);
     }
 
     @Test
-    public void testGetNotExistedFunction() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+    public void testGetNotExistedSink() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSinkInfo();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
-    public void testGetFunctionSuccess() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+    public void testGetSinkSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        SinkSpec sinkSpec = SinkSpec.newBuilder()
-                .setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
+        Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
+                .setSubscriptionType(Function.SubscriptionType.SHARED)
+                .setSubscriptionName(subscriptionName)
+                .putInputSpecs("input", Function.ConsumerSpec.newBuilder()
+                .setSerdeClassName(TopicSchema.DEFAULT_SERDE)
+                .setIsRegexPattern(false)
+                .build()).build();
+        Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder()
+                .setBuiltin("jdbc")
+                .build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setClassName(className)
+                .setClassName(IdentityFunction.class.getName())
                 .setSink(sinkSpec)
-                .setName(source)
+                .setName(sink)
                 .setNamespace(namespace)
-                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+                .setRuntime(FunctionDetails.Runtime.JAVA)
+                .setSource(sourceSpec).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
             .setCreateTime(System.currentTimeMillis())
             .setFunctionDetails(functionDetails)
-            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+            .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
             .setVersion(1234)
             .build();
-        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSinkInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            new Gson().toJson(SinkConfigUtils.convertFromDetails(functionDetails)),
             response.getEntity());
     }
 
     //
-    // List Functions
+    // List Sinks
     //
 
     @Test
-    public void testListFunctionsMissingTenant() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSinksMissingTenant() throws Exception {
+        testListSinksMissingArguments(
             null,
             namespace,
             "Tenant");
@@ -867,39 +878,70 @@ public class SinkApiV2ResourceTest {
 
     @Test
     public void testListFunctionsMissingNamespace() throws Exception {
-        testListFunctionsMissingArguments(
+        testListSinksMissingArguments(
             tenant,
             null,
             "Namespace");
     }
 
-    private void testListFunctionsMissingArguments(
+    private void testListSinksMissingArguments(
         String tenant,
         String namespace,
         String missingFieldName
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SINK);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultFunctions() {
+    private Response listDefaultSinks() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SINK);
     }
 
     @Test
-    public void testListFunctionsSuccess() throws Exception {
+    public void testListSinksSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build());
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build());
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+        Response response = listDefaultSinks();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
 
-        Response response = listDefaultFunctions();
+    @Test
+    public void testOnlyGetSinks() throws Exception {
+        List<String> functions = Lists.newArrayList("test-3");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+        Response response = listDefaultSinks();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    */
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 1ca869e..eee684f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,7 @@ import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
@@ -37,6 +39,7 @@ import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
@@ -48,6 +51,8 @@ import org.testng.annotations.Test;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -133,6 +138,7 @@ public class SourceApiV2ResourceTest {
         doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SourceConfigUtils.class);
         when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -181,7 +187,7 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Source Name is not provided");
     }
 
     @Test
@@ -256,9 +262,8 @@ public class SourceApiV2ResourceTest {
                 details,
                 null,
                 null,
-                null,
                 new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +287,8 @@ public class SourceApiV2ResourceTest {
             mockedFormData,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
     }
 
@@ -296,7 +300,7 @@ public class SourceApiV2ResourceTest {
 
         Response response = registerDefaultSource();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -424,7 +428,7 @@ public class SourceApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Source Name is not provided");
     }
 
     @Test
@@ -501,9 +505,8 @@ public class SourceApiV2ResourceTest {
             details,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -528,9 +531,8 @@ public class SourceApiV2ResourceTest {
             mockedFormData,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
     }
 
@@ -540,7 +542,7 @@ public class SourceApiV2ResourceTest {
 
         Response response = updateDefaultSource();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -611,9 +613,8 @@ public class SourceApiV2ResourceTest {
             null,
             filePackageUrl,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -689,7 +690,7 @@ public class SourceApiV2ResourceTest {
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Source Name");
     }
 
     private void testDeregisterSourceMissingArguments(
@@ -702,6 +703,7 @@ public class SourceApiV2ResourceTest {
             tenant,
             namespace,
             function,
+            FunctionsImpl.SOURCE,
             null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -713,6 +715,7 @@ public class SourceApiV2ResourceTest {
             tenant,
             namespace,
                 source,
+            FunctionsImpl.SOURCE,
             null);
     }
 
@@ -722,7 +725,7 @@ public class SourceApiV2ResourceTest {
 
         Response response = deregisterDefaultSource();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -768,15 +771,13 @@ public class SourceApiV2ResourceTest {
         assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    // Source Info doesn't exist. Maybe one day they might be added
     //
-    // Get Function Info
+    // Get Source Info
     //
 
-    /*
     @Test
-    public void testGetFunctionMissingTenant() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingTenant() throws Exception {
+        testGetSourceMissingArguments(
             null,
             namespace,
                 source,
@@ -784,8 +785,8 @@ public class SourceApiV2ResourceTest {
     }
 
     @Test
-    public void testGetFunctionMissingNamespace() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingNamespace() throws Exception {
+        testGetSourceMissingArguments(
             tenant,
             null,
                 source,
@@ -793,62 +794,66 @@ public class SourceApiV2ResourceTest {
     }
 
     @Test
-    public void testGetFunctionMissingFunctionName() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingFunctionName() throws Exception {
+        testGetSourceMissingArguments(
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Source Name");
     }
 
-    private void testGetFunctionMissingArguments(
+    private void testGetSourceMissingArguments(
         String tenant,
         String namespace,
-        String function,
+        String source,
         String missingFieldName
     ) throws IOException {
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            source,
+                FunctionsImpl.SOURCE);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultFunctionInfo() throws IOException {
+    private Response getDefaultSourceInfo() throws IOException {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-                source);
+                source,
+                FunctionsImpl.SOURCE);
     }
 
     @Test
-    public void testGetNotExistedFunction() throws IOException {
+    public void testGetNotExistedSource() throws IOException {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSourceInfo();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
-    public void testGetFunctionSuccess() throws Exception {
+    public void testGetSourceSuccess() throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
+        SourceSpec sourceSpec = SourceSpec.newBuilder().setBuiltin("jdbc").build();
         SinkSpec sinkSpec = SinkSpec.newBuilder()
                 .setTopic(outputTopic)
                 .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setClassName(className)
+                .setClassName(IdentityFunction.class.getName())
                 .setSink(sinkSpec)
                 .setName(source)
                 .setNamespace(namespace)
-                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
+                .setRuntime(FunctionDetails.Runtime.JAVA)
+                .setAutoAck(true)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+                .setSource(sourceSpec).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
             .setCreateTime(System.currentTimeMillis())
             .setFunctionDetails(functionDetails)
@@ -857,60 +862,90 @@ public class SourceApiV2ResourceTest {
             .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSourceInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            new Gson().toJson(SourceConfigUtils.convertFromDetails(functionDetails)),
             response.getEntity());
     }
 
     //
-    // List Functions
+    // List Sources
     //
 
     @Test
-    public void testListFunctionsMissingTenant() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSourcesMissingTenant() throws Exception {
+        testListSourcesMissingArguments(
             null,
             namespace,
             "Tenant");
     }
 
     @Test
-    public void testListFunctionsMissingNamespace() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSourcesMissingNamespace() throws Exception {
+        testListSourcesMissingArguments(
             tenant,
             null,
             "Namespace");
     }
 
-    private void testListFunctionsMissingArguments(
+    private void testListSourcesMissingArguments(
         String tenant,
         String namespace,
         String missingFieldName
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SOURCE);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultFunctions() {
+    private Response listDefaultSources() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,FunctionsImpl.SOURCE);
     }
 
     @Test
-    public void testListFunctionsSuccess() throws Exception {
+    public void testListSourcesSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build());
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build());
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+        Response response = listDefaultSources();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
 
-        Response response = listDefaultFunctions();
+    @Test
+    public void testOnlyGetSources() throws Exception {
+        List<String> functions = Lists.newArrayList("test-1");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+        Response response = listDefaultSources();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    */
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8452f1..6659fcf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -214,7 +214,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                       boolean builtin) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -224,7 +224,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         log.info("Get sink info : {}", result.getStdout());
         if (builtin) {
             assertTrue(
-                    result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase() + "\""),
+                    result.getStdout().contains("\"archive\": \"builtin://" + tester.getSinkType().name().toLowerCase() + "\""),
                     result.getStdout()
             );
         } else {
@@ -366,7 +366,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -376,7 +376,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             pulsarCluster.getAnyWorker().execCmd(commands);
             fail("Command should have exited with non-zero");
         } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " + sinkName + " doesn't exist"));
+            assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
         }
     }
 
@@ -465,7 +465,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                         String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -474,7 +474,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get source info : {}", result.getStdout());
         assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + tester.getSourceType() + "\""),
+            result.getStdout().contains("\"archive\": \"builtin://" + tester.getSourceType() + "\""),
             result.getStdout()
         );
     }
@@ -564,7 +564,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -574,7 +574,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             pulsarCluster.getAnyWorker().execCmd(commands);
             fail("Command should have exited with non-zero");
         } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " + sourceName + " doesn't exist"));
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
         }
     }
 


Mime
View raw message