pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2907: Move all validation/inferring missing args to serverside
Date Fri, 02 Nov 2018 21:44:14 GMT
srkukarni closed pull request #2907: Move all validation/inferring missing args to serverside
URL: https://github.com/apache/pulsar/pull/2907
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 7cd870f519..077a74217b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -273,38 +273,6 @@ public void stopFunctionInstances() throws Exception {
         verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
     }
 
-    @Test
-    public void testCreateFunctionWithHttpUrl() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = TEST_NAME + "-input-topic";
-        String outputTopicName = TEST_NAME + "-output-topic";
-
-        ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";
-        cmd.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--inputs", inputTopicName,
-            "--output", outputTopicName,
-            "--jar", url,
-            "--tenant", "sample",
-            "--namespace", "ns1",
-            "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupted Jar File"));
-        assertEquals(fnName, creater.getFunctionName());
-        assertEquals(inputTopicName, creater.getInputs());
-        assertEquals(outputTopicName, creater.getOutput());
-    }
-
     @Test
     public void testGetFunctionStatus() throws Exception {
         String fnName = TEST_NAME + "-function";
@@ -346,60 +314,6 @@ public void testCreateFunctionWithFileUrl() throws Exception {
         verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
     }
 
-    @Test
-    public void testCreateSink() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = TEST_NAME + "-input-topic";
-
-
-        ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";
-        cmdSinks.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--inputs", inputTopicName,
-            "--archive", url,
-            "--tenant", "sample",
-            "--namespace", "ns1"
-        });
-
-        CreateSink creater = cmdSinks.getCreateSink();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupt User PackageFile " + url));
-        assertEquals(url, creater.archive);
-    }
-
-    @Test
-    public void testCreateSource() throws Exception {
-        String fnName = TEST_NAME + "-function";
-
-        ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
-        consoleOutputCapturer.start();
-
-        final String url = "http://localhost:1234/test";
-        cmdSources.run(new String[] {
-            "create",
-            "--name", fnName,
-            "--archive", url,
-            "--tenant", "sample",
-            "--namespace", "ns1",
-            "--destination-topic-name", "input",
-        });
-
-        CreateSource creater = cmdSources.getCreateSource();
-
-        consoleOutputCapturer.stop();
-        String output = consoleOutputCapturer.getStderr();
-
-        assertTrue(output.contains("Corrupt User PackageFile " + url));
-        assertEquals(url, creater.archive);
-    }
-
     @Test
     public void testCreateFunctionWithTopicPatterns() throws Exception {
         String fnName = TEST_NAME + "-function";
@@ -425,46 +339,6 @@ public void testCreateFunctionWithTopicPatterns() throws Exception {
 
     }
 
-    @Test
-    public void testCreateWithoutTenant() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
-        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("public", creater.getFunctionConfig().getTenant());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
-    }
-
-    @Test
-    public void testCreateWithoutNamespace() throws Exception {
-        String fnName = TEST_NAME + "-function";
-        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
-        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("public", creater.getFunctionConfig().getTenant());
-        assertEquals("default", creater.getFunctionConfig().getNamespace());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
-    }
-
     @Test
     public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
@@ -490,25 +364,6 @@ public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
-    @Test
-    public void testCreateWithoutFunctionName() throws Exception {
-        String inputTopicName = TEST_NAME + "-input-topic";
-        String outputTopicName = TEST_NAME + "-output-topic";
-        cmd.run(new String[] {
-                "create",
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        });
-
-        CreateFunction creater = cmd.getCreater();
-        assertEquals("CmdFunctionsTest$DummyFunction", creater.getFunctionConfig().getName());
-        verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
-    }
-
     @Test
     public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
@@ -728,89 +583,6 @@ private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorre
         }
     }
 
-    @Test
-    public void TestCreateFunctionParallelism() throws Exception{
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-                "--parallelism", "1"
-        };
-
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-                "--parallelism", "-1"
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Function parallelism should positive number");
-
-    }
-
-    @Test
-    public void TestCreateTopicName() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String wrongOutputTopicName = TEST_NAME + "-output-topic/test:";
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", wrongOutputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Output topic " + wrongOutputTopicName + " is invalid");
-    }
-
-    @Test
-    public void TestCreateClassName() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String cannotLoadClass = "com.test.Function";
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", cannotLoadClass,
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "User class must be in class path");
-    }
-
     @Test
     public void testCreateFunctionWithCpu() throws Exception {
         String fnName = TEST_NAME + "-function";
@@ -998,35 +770,6 @@ public void testUpdateFunctionWithDisk() throws Exception {
         verify(functions, times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString());
     }
 
-    @Test
-    public void TestCreateSameInOutTopic() throws Exception {
-
-        String[] correctArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", outputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        String[] incorrectArgs = new String[]{
-                "--name", fnName,
-                "--inputs", inputTopicName,
-                "--output", inputTopicName,
-                "--jar", JAR_NAME,
-                "--tenant", "sample",
-                "--namespace", "ns1",
-                "--className", DummyFunction.class.getName(),
-        };
-
-        testValidateFunctionsConfigs(correctArgs, incorrectArgs,
-                "Output topic " + inputTopicName
-                        + " is also being used as an input topic (topics must be one or the other)");
-
-    }
-
 
     public static class ConsoleOutputCapturer {
         private ByteArrayOutputStream stdout;
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 73faa52189..8e168f3169 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -69,38 +69,11 @@
 
     <!-- functions related dependencies (begin) -->
 
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-functions-worker</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>typetools</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
     </dependency>
 
-    <!-- functions related dependencies (end) -->
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-cassandra</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-twitter</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
 
   <build>
@@ -118,12 +91,7 @@
             </goals>
             <configuration>
               <tasks>
-                <echo>copy test sink package</echo>
-                <mkdir dir="${basedir}/src/test/resources"/>
-                <copy file="${basedir}/../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/>
-                <echo>copy test source package</echo>
-                <mkdir dir="${basedir}/src/test/resources"/>
-                <copy file="${basedir}/../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/>
+                <copy file="${basedir}/pom.xml" tofile="${basedir}/src/test/resources/dummy.nar"/>
               </tasks>
             </configuration>
           </execution>
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 2ec5affca8..24597eb202 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -65,7 +65,6 @@
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -297,9 +296,6 @@ void processArguments() throws Exception {
         protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
-        // The classLoader associated with this function defn
-        protected ClassLoader classLoader;
-
 
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
@@ -476,9 +472,6 @@ void processArguments() throws Exception {
                 userCodeFile = functionConfig.getPy();
             }
 
-            // infer default vaues
-            FunctionConfigUtils.inferMissingArguments(functionConfig);
-
             // check if configs are valid
             validateFunctionConfigs(functionConfig);
         }
@@ -501,15 +494,7 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
             }
             if (!isBlank(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) &&
                     !new File(functionConfig.getPy()).exists()) {
-                throw new ParameterException("The specified jar file does not exist");
-            }
-
-            try {
-                // Need to load jar and set context class loader before calling
-                String functionPkgUrl = Utils.isFunctionPackageUrlSupported(userCodeFile) ? userCodeFile : null;
-                classLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, null);
-            } catch (Exception e) {
-                throw new IllegalArgumentException(e.getMessage());
+                throw new ParameterException("The specified python file does not exist");
             }
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 48601b2ed5..d14c0534b0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -22,7 +22,6 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.SinkConfigUtils.inferMissingArguments;
 import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
@@ -38,7 +37,6 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -51,12 +49,10 @@
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -306,8 +302,6 @@ void runCmd() throws Exception {
 
         protected SinkConfig sinkConfig;
 
-        protected NarClassLoader classLoader;
-
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName;
             if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern;
@@ -421,8 +415,6 @@ void processArguments() throws Exception {
             if (null != sinkConfigString) {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
-            
-            inferMissingArguments(sinkConfig);
 
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
@@ -445,24 +437,6 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                     throw new IllegalArgumentException(String.format("Sink Archive file %s does not exist", sinkConfig.getArchive()));
                 }
             }
-
-            try {
-                // Need to load jar and set context class loader before calling
-                String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) ? sinkConfig.getArchive() : null;
-                Path archivePath = (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) || sinkConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sinkConfig.getArchive()).toPath();
-                classLoader = SinkConfigUtils.validate(sinkConfig, archivePath, sourcePkgUrl, null);
-            } catch (Exception e) {
-                throw new ParameterException(e.getMessage());
-            }
-        }
-
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkConfigProto2(SinkConfig sinkConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
-                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, classLoader)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
         }
 
         protected String validateSinkType(String sinkType) throws IOException {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index a2271e511f..32ebfc996b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -22,7 +22,6 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.SourceConfigUtils.inferMissingArguments;
 import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
@@ -38,7 +37,6 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,13 +52,10 @@
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -296,8 +291,6 @@ void runCmd() throws Exception {
 
         protected SourceConfig sourceConfig;
 
-        protected NarClassLoader classLoader;
-
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
             if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
@@ -380,8 +373,6 @@ void processArguments() throws Exception {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
             }
 
-            inferMissingArguments(sourceConfig);
-
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
         }
@@ -402,23 +393,6 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
                     throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive()));
                 }
             }
-
-            try {
-             // Need to load jar and set context class loader before calling
-                String sourcePkgUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) ? sourceConfig.getArchive() : null;
-                Path archivePath = (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) || sourceConfig.getArchive().startsWith(BUILTIN)) ? null : new File(sourceConfig.getArchive()).toPath();
-                classLoader = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePkgUrl, null);
-            } catch (Exception e) {
-                throw new ParameterException(e.getMessage());
-            }
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
-                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, classLoader)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
         }
 
         protected String validateSourceType(String sourceType) throws IOException {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index aea19fd0bb..cbe52961f9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -44,8 +44,6 @@
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.io.cassandra.CassandraStringSink;
-import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -68,7 +66,7 @@ public IObjectFactory getObjectFactory() {
     private static final String TENANT = "test-tenant";
     private static final String NAMESPACE = "test-namespace";
     private static final String NAME = "test";
-    private static final String CLASS_NAME = CassandraStringSink.class.getName();
+    private static final String CLASS_NAME = "SomeRandomClassName";
     private static final String INPUTS = "test-src1,test-src2";
     private static final List<String> INPUTS_LIST;
     static {
@@ -86,8 +84,7 @@ public IObjectFactory getObjectFactory() {
     private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES
             = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
     private static final Integer PARALLELISM = 1;
-    private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar";
-    private static final String WRONG_JAR_FILE_NAME = "pulsar-io-twitter.nar";
+    private static final String JAR_FILE_NAME = "dummy.nar";
     private String JAR_FILE_PATH;
     private String WRONG_JAR_PATH;
     private static final Double CPU = 100.0;
@@ -123,7 +120,6 @@ public void setup() throws Exception {
             throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
         }
         JAR_FILE_PATH = file.getFile();
-        WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
     }
 
@@ -167,72 +163,6 @@ public void testCliCorrect() throws Exception {
         );
     }
 
-    @Test
-    public void testMissingTenant() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setTenant(PUBLIC_TENANT);
-        testCmdSinkCliMissingArgs(
-                null,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test
-    public void testMissingNamespace() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                null,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null")
-    public void testMissingName() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setName(null);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                null,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
     @Test
     public void testMissingInput() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -303,27 +233,6 @@ public void testMissingTopicPattern() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs")
-    public void testMissingAllInputTopics() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                null,
-                null,
-                null,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
     @Test
     public void testMissingProcessingGuarantees() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -367,51 +276,7 @@ public void testMissingParallelism() throws Exception {
                 sinkConfig
         );
     }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testNegativeParallelism() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setParallelism(-1);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                -1,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testZeroParallelism() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setParallelism(0);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                0,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
+    
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied")
     public void testMissingArchive() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -434,28 +299,6 @@ public void testMissingArchive() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
-    public void testInvalidJarWithNoSource() throws Exception {
-        SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSinkCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                INPUTS,
-                TOPIC_PATTERN,
-                CUSTOM_SERDE_INPUT_STRING,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                WRONG_JAR_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sinkConfig
-        );
-    }
-
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar" +
             " does not exist")
     public void testInvalidJar() throws Exception {
@@ -587,36 +430,6 @@ public void testCmdSinkConfigFileCorrect() throws Exception {
         testCmdSinkConfigFile(sinkConfig, sinkConfig);
     }
 
-    @Test
-    public void testCmdSinkConfigFileMissingTenant() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setTenant(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setTenant(PUBLIC_TENANT);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test
-    public void testCmdSinkConfigFileMissingNamespace() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setNamespace(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink name cannot be null")
-    public void testCmdSinkConfigFileMissingName() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setName(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setName(null);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     @Test
     public void testCmdSinkConfigFileMissingTopicToSerdeClassName() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
@@ -633,19 +446,6 @@ public void testCmdSinkConfigFileMissingTopicsPattern() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs")
-    public void testCmdSinkConfigFileMissingAllInput() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setInputs(null);
-        testSinkConfig.setTopicToSchemaType(null);
-        testSinkConfig.setTopicToSerdeClassName(null);
-        testSinkConfig.setTopicsPattern(null);
-        testSinkConfig.setInputSpecs(null);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     @Test
     public void testCmdSinkConfigFileMissingConfig() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
@@ -656,26 +456,6 @@ public void testCmdSinkConfigFileMissingConfig() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testCmdSinkConfigFileZeroParallelism() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setParallelism(0);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setParallelism(0);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
-    public void testCmdSinkConfigFileNegativeParallelism() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setParallelism(-1);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setParallelism(-1);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     @Test
     public void testCmdSinkConfigFileMissingProcessingGuarantees() throws Exception {
         SinkConfig testSinkConfig = getSinkConfig();
@@ -716,16 +496,6 @@ public void testCmdSinkConfigFileInvalidJar() throws Exception {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
-    public void testCmdSinkConfigFileInvalidJarNoSink() throws Exception {
-        SinkConfig testSinkConfig = getSinkConfig();
-        testSinkConfig.setArchive(WRONG_JAR_PATH);
-
-        SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
-    }
-
     public void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig expectedSinkConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 32a6c60186..33d75139c7 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -70,10 +70,8 @@ public IObjectFactory getObjectFactory() {
     private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES
             = FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
     private static final Integer PARALLELISM = 1;
-    private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar";
-    private static final String WRONG_JAR_FILE_NAME = "pulsar-io-cassandra.nar";
+    private static final String JAR_FILE_NAME = "dummy.nar";
     private String JAR_FILE_PATH;
-    private String WRONG_JAR_PATH;
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
@@ -103,7 +101,6 @@ public void setup() throws Exception {
         mockStatic(CmdFunctions.class);
         PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
-        WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
     }
 
@@ -141,84 +138,6 @@ public void testCliCorrect() throws Exception {
         );
     }
 
-    @Test
-    public void testMissingTenant() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setTenant(PUBLIC_TENANT);
-        testCmdSourceCliMissingArgs(
-                null,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test
-    public void testMissingNamespace() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                null,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null")
-    public void testMissingName() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setName(null);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                null,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null")
-    public void testMissingTopicName() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setTopicName(null);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                null,
-                SERDE_CLASS_NAME,
-                PROCESSING_GUARANTEES,
-                PARALLELISM,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
     @Test
     public void testMissingSerdeClassName() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
@@ -278,44 +197,6 @@ public void testMissingParallelism() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testNegativeParallelism() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setParallelism(-1);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                -1,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testZeroParallelism() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setParallelism(0);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                0,
-                JAR_FILE_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied")
     public void testMissingArchive() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
@@ -355,25 +236,6 @@ public void testInvalidJar() throws Exception {
         );
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
-    public void testInvalidJarWithNoSource() throws Exception {
-        SourceConfig sourceConfig = getSourceConfig();
-        sourceConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSourceCliMissingArgs(
-                TENANT,
-                NAMESPACE,
-                NAME,
-                TOPIC_NAME, SERDE_CLASS_NAME, PROCESSING_GUARANTEES,
-                PARALLELISM,
-                WRONG_JAR_PATH,
-                CPU,
-                RAM,
-                DISK,
-                SINK_CONFIG_STRING,
-                sourceConfig
-        );
-    }
-
     @Test
     public void testMissingConfig() throws Exception {
         SourceConfig sourceConfig = getSourceConfig();
@@ -473,46 +335,6 @@ public void testCmdSourceConfigFileCorrect() throws Exception {
         testCmdSourceConfigFile(sourceConfig, sourceConfig);
     }
 
-    @Test
-    public void testCmdSourceConfigFileMissingTenant() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setTenant(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setTenant(PUBLIC_TENANT);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test
-    public void testCmdSourceConfigFileMissingNamespace() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setNamespace(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setNamespace(DEFAULT_NAMESPACE);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source name cannot be null")
-    public void testCmdSourceConfigFileMissingName() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setName(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setName(null);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null")
-    public void testCmdSourceConfigFileMissingTopicName() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setTopicName(null);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setTopicName(null);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
     @Test
     public void testCmdSourceConfigFileMissingSerdeClassname() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
@@ -533,26 +355,6 @@ public void testCmdSourceConfigFileMissingConfig() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testCmdSourceConfigFileZeroParallelism() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setParallelism(0);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setParallelism(0);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
-    public void testCmdSourceConfigFileNegativeParallelism() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setParallelism(-1);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setParallelism(-1);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
     @Test
     public void testCmdSourceConfigFileMissingProcessingGuarantees() throws Exception {
         SourceConfig testSourceConfig = getSourceConfig();
@@ -593,16 +395,6 @@ public void testCmdSourceConfigFileInvalidJar() throws Exception {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
-    public void testCmdSourceConfigFileInvalidJarNoSource() throws Exception {
-        SourceConfig testSourceConfig = getSourceConfig();
-        testSourceConfig.setArchive(WRONG_JAR_PATH);
-
-        SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setArchive(WRONG_JAR_PATH);
-        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
-    }
-
     public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 558f33bfe7..2cbd460245 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -56,10 +56,10 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
 
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
-        boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
+            if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.FILE)) {
                 if (isBlank(sinkConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be present for archive with file-url");
                 }
@@ -286,9 +286,7 @@ public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, S
 
         NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
         if (classLoader == null) {
-            // This happens at the cli for builtin. There is no need to check this since
-            // the actual check will be done at serverside
-            return null;
+            throw new IllegalArgumentException("Sink Package is not provided");
         }
 
         String sinkClassName;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 07d593d23c..6ce1db3fd0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -210,9 +210,7 @@ public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePat
 
         NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
         if (classLoader == null) {
-            // This happens at the cli for builtin. There is no need to check this since
-            // the actual check will be done at serverside
-            return null;
+            throw new IllegalArgumentException("Source Package is not provided");
         }
 
         String sourceClassName;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index c6feb5ac72..878067122d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,8 +46,7 @@
     /**
      * Extract the Pulsar IO Source class from a connector archive.
      */
-    public static String getIOSourceClass(ClassLoader classLoader) throws IOException {
-        NarClassLoader ncl = (NarClassLoader) classLoader;
+    public static String getIOSourceClass(NarClassLoader ncl) throws IOException {
         String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
 
         ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 24bbd6e0cc..eb677774f8 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -129,6 +129,50 @@
       </exclusions>
     </dependency>
 
+    <!-- functions related dependencies (end) -->
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-cassandra</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-twitter</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
+  <build>
+    <plugins>
+      <!-- this task will copy nar files to resources for the test to work -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <tasks>
+                <echo>copy test sink package</echo>
+                <mkdir dir="${basedir}/src/test/resources"/>
+                <copy file="${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-cassandra.nar"/>
+                <echo>copy test source package</echo>
+                <mkdir dir="${basedir}/src/test/resources"/>
+                <copy file="${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar" tofile="${basedir}/src/test/resources/pulsar-io-twitter.nar"/>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 89e3b48620..914a691524 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -1058,7 +1058,7 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
             String functionDetailsJson, String componentConfigJson, String componentType,
-            String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+            String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -1071,12 +1071,14 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            FunctionConfigUtils.inferMissingArguments(functionConfig);
             ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
+            SourceConfigUtils.inferMissingArguments(sourceConfig);
             if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
                 String builtinArchive = sourceConfig.getArchive();
                 if (builtinArchive.startsWith(BUILTIN)) {
@@ -1094,6 +1096,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
+            SinkConfigUtils.inferMissingArguments(sinkConfig);
             if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
                 String builtinArchive = sinkConfig.getArchive();
                 if (builtinArchive.startsWith(BUILTIN)) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index cd55916dea..1a7b51a977 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -165,6 +165,7 @@ public void testRegisterFunctionMissingTenant() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -181,6 +182,7 @@ public void testRegisterFunctionMissingNamespace() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
@@ -197,6 +199,7 @@ public void testRegisterFunctionMissingFunctionName() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Name is not provided");
     }
 
@@ -213,6 +216,7 @@ public void testRegisterFunctionMissingPackage() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Package is not provided");
     }
 
@@ -229,6 +233,7 @@ public void testRegisterFunctionMissingInputTopics() throws IOException {
                 outputSerdeClassName,
                 className,
                 parallelism,
+                null,
                 "No input topic(s) specified for the function");
     }
 
@@ -245,6 +250,7 @@ public void testRegisterFunctionMissingPackageDetails() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Function Package is not provided");
     }
 
@@ -261,9 +267,96 @@ public void testRegisterFunctionMissingClassName() throws IOException {
                 outputSerdeClassName,
             null,
             parallelism,
+                null,
                 "Function classname cannot be null");
     }
 
+    @Test
+    public void testRegisterFunctionWrongClassName() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                "UnknownClass",
+                parallelism,
+                null,
+                "User class must be in class path");
+    }
+
+    @Test
+    public void testRegisterFunctionWrongParallelism() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                -2,
+                null,
+                "Function parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterFunctionSameInputOutput() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                topicsToSerDeClassName.keySet().iterator().next(),
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Output topic " + topicsToSerDeClassName.keySet().iterator().next()
+                        + " is also being used as an input topic (topics must be one or the other)");
+    }
+
+    @Test
+    public void testRegisterFunctionWrongOutputTopic() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                function + "-output-topic/test:",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Output topic " + function + "-output-topic/test:" + " is invalid");
+    }
+
+    @Test
+    public void testRegisterFunctionHttpUrl() throws IOException {
+        testRegisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test",
+                "Corrupted Jar File");
+    }
+
     private void testRegisterFunctionMissingArguments(
             String tenant,
             String namespace,
@@ -275,6 +368,7 @@ private void testRegisterFunctionMissingArguments(
             String outputSerdeClassName,
             String className,
             Integer parallelism,
+            String functionPkgUrl,
             String errorExpected) throws IOException {
         FunctionConfig functionConfig = new FunctionConfig();
         if (tenant != null) {
@@ -309,7 +403,7 @@ private void testRegisterFunctionMissingArguments(
                 function,
                 inputStream,
                 details,
-                null,
+                functionPkgUrl,
                 null,
                 new Gson().toJson(functionConfig),
                 FunctionsImpl.FUNCTION,
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 7bf1a4633c..57e3cc5e4b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -26,7 +26,6 @@
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -38,8 +37,7 @@
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -53,8 +51,10 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -62,7 +62,6 @@
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -78,7 +77,7 @@
  * Unit test of {@link SinkApiV2Resource}.
  */
 @PrepareForTest({Utils.class, SinkConfigUtils.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SinkApiV2ResourceTest {
 
@@ -87,16 +86,6 @@ public IObjectFactory getObjectFactory() {
         return new org.powermock.modules.testng.PowerMockObjectFactory();
     }
 
-    private static final class TestSink implements Sink<String> {
-
-        @Override public void open(final Map<String, Object> config, SinkContext sinkContext) {
-        }
-
-        @Override public void write(Record<String> record) { }
-
-        @Override public void close() { }
-    }
-
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String sink = "test-sink";
@@ -105,9 +94,12 @@ public IObjectFactory getObjectFactory() {
         topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE);
     }
     private static final String subscriptionName = "test-subscription";
-    private static final String className = TestSink.class.getName();
-    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final String className = CassandraStringSink.class.getName();
     private static final int parallelism = 1;
+    private static final String JAR_FILE_NAME = "pulsar-io-cassandra.nar";
+    private static final String INVALID_JAR_FILE_NAME = "pulsar-io-twitter.nar";
+    private String JAR_FILE_PATH;
+    private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
@@ -135,6 +127,14 @@ public void setup() throws Exception {
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
 
+        URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
+        if (file == null)  {
+            throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
+        }
+        JAR_FILE_PATH = file.getFile();
+        INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
+
+
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
@@ -146,9 +146,6 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        mockStatic(SinkConfigUtils.class);
-        when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
-        when(SinkConfigUtils.validate(any(), any(), any(), any())).thenReturn(null);
         Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
     }
 
@@ -167,6 +164,7 @@ public void testRegisterSinkMissingTenant() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -181,6 +179,7 @@ public void testRegisterSinkMissingNamespace() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
@@ -195,6 +194,7 @@ public void testRegisterSinkMissingFunctionName() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
+                null,
                 "Sink Name is not provided");
     }
 
@@ -209,7 +209,8 @@ public void testRegisterSinkMissingPackage() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "Sink Package is not provided");
     }
 
     @Test
@@ -223,7 +224,84 @@ public void testRegisterSinkMissingPackageDetails() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "zip file is empty");
+    }
+
+    @Test
+    public void testRegisterSinkInvalidJarNoSink() throws IOException {
+        FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                inputStream,
+                null,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                null,
+                "Failed to extract sink class from archive");
+    }
+
+    @Test
+    public void testRegisterSinkNoInput() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                className,
+                parallelism,
+                null,
+                "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs");
+    }
+
+    @Test
+    public void testRegisterSinkNegativeParallelism() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                -2,
+                null,
+                "Sink parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterSinkZeroParallelism() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                0,
+                null,
+                "Sink parallelism should positive number");
+    }
+
+    @Test
+    public void testRegisterSinkHttpUrl() throws IOException {
+        testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                null,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test",
+                "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)");
     }
 
     private void testRegisterSinkMissingArguments(
@@ -235,6 +313,7 @@ private void testRegisterSinkMissingArguments(
             Map<String, String> inputTopicMap,
             String className,
             Integer parallelism,
+            String pkgUrl,
             String errorExpected) throws IOException {
         SinkConfig sinkConfig = new SinkConfig();
         if (tenant != null) {
@@ -262,7 +341,7 @@ private void testRegisterSinkMissingArguments(
                 sink,
                 inputStream,
                 details,
-                null,
+                pkgUrl,
                 null,
                 new Gson().toJson(sinkConfig),
                 FunctionsImpl.SINK,
@@ -272,7 +351,7 @@ private void testRegisterSinkMissingArguments(
         Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSink() {
+    private Response registerDefaultSink() throws IOException {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(tenant);
         sinkConfig.setNamespace(namespace);
@@ -284,7 +363,7 @@ private Response registerDefaultSink() {
             tenant,
             namespace,
                 sink,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -440,7 +519,7 @@ public void testUpdateSinkMissingPackage() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Sink Package is not provided");
     }
 
     @Test
@@ -454,7 +533,7 @@ public void testUpdateSinkMissingPackageDetails() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "zip file is empty");
     }
 
     private void testUpdateSinkMissingArguments(
@@ -518,7 +597,7 @@ private Response updateDefaultSink() throws IOException {
             tenant,
             namespace,
                 sink,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -577,8 +656,7 @@ public void testUpdateSinkSuccess() throws Exception {
     public void testUpdateSinkWithUrl() throws IOException {
         Configurator.setRootLevel(Level.DEBUG);
 
-        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String filePackageUrl = "file://" + fileLocation;
+        String filePackageUrl = "file://" + JAR_FILE_PATH;
 
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 2002b3a584..3a4785f15c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -26,7 +26,6 @@
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -36,8 +35,7 @@
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -51,9 +49,9 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
+import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.*;
@@ -69,8 +67,8 @@
 /**
  * Unit test of {@link SourceApiV2Resource}.
  */
-@PrepareForTest({Utils.class,SourceConfigUtils.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
+@PrepareForTest({Utils.class})
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SourceApiV2ResourceTest {
 
@@ -79,24 +77,17 @@ public IObjectFactory getObjectFactory() {
         return new org.powermock.modules.testng.PowerMockObjectFactory();
     }
 
-    private static final class TestSource implements Source<String> {
-
-        @Override public void open(final Map<String, Object> config, SourceContext sourceContext) {
-        }
-
-        @Override public Record<String> read() { return null; }
-
-        @Override public void close() { }
-    }
-
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String source = "test-source";
     private static final String outputTopic = "test-output-topic";
     private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
-    private static final String className = TestSource.class.getName();
-    private static final String serde = TopicSchema.DEFAULT_SERDE;
+    private static final String className = TwitterFireHose.class.getName();
     private static final int parallelism = 1;
+    private static final String JAR_FILE_NAME = "pulsar-io-twitter.nar";
+    private static final String INVALID_JAR_FILE_NAME = "pulsar-io-cassandra.nar";
+    private String JAR_FILE_PATH;
+    private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
@@ -124,6 +115,13 @@ public void setup() throws Exception {
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
 
+        URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
+        if (file == null)  {
+            throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
+        }
+        JAR_FILE_PATH = file.getFile();
+        INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
+
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
@@ -135,9 +133,6 @@ public void setup() throws Exception {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
-        mockStatic(SourceConfigUtils.class);
-        when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
-        when(SourceConfigUtils.validate(any(), any(), any(), any())).thenReturn(null);
         Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
     }
 
@@ -157,6 +152,7 @@ public void testRegisterSourceMissingTenant() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Tenant is not provided");
     }
 
@@ -172,11 +168,12 @@ public void testRegisterSourceMissingNamespace() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Namespace is not provided");
     }
 
     @Test
-    public void testRegisterSourceMissingFunctionName() throws IOException {
+    public void testRegisterSourceMissingSourceName() throws IOException {
         testRegisterSourceMissingArguments(
             tenant,
             namespace,
@@ -187,6 +184,7 @@ public void testRegisterSourceMissingFunctionName() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
+                null,
                 "Source Name is not provided");
     }
 
@@ -202,7 +200,8 @@ public void testRegisterSourceMissingPackage() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "Source Package is not provided");
     }
 
     @Test
@@ -217,7 +216,58 @@ public void testRegisterSourceMissingPackageDetails() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                null,
+                "zip file is empty");
+    }
+
+    @Test
+    public void testRegisterSourceInvalidJarWithNoSource() throws IOException {
+        FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                inputStream,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Failed to extract source class from archive");
+    }
+
+    @Test
+    public void testRegisterSourceNoOutputTopic() throws IOException {
+        FileInputStream inputStream = new FileInputStream(JAR_FILE_PATH);
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                inputStream,
+                mockedFormData,
+                null,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null,
+                "Topic name cannot be null");
+    }
+
+    @Test
+    public void testRegisterSourceHttpUrl() throws IOException {
+        testRegisterSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                null,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "http://localhost:1234/test",
+                "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)");
     }
 
     private void testRegisterSourceMissingArguments(
@@ -230,7 +280,8 @@ private void testRegisterSourceMissingArguments(
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String errorExpected) throws IOException {
+            String pkgUrl,
+            String errorExpected) {
         SourceConfig sourceConfig = new SourceConfig();
         if (tenant != null) {
             sourceConfig.setTenant(tenant);
@@ -260,7 +311,7 @@ private void testRegisterSourceMissingArguments(
                 function,
                 inputStream,
                 details,
-                null,
+                pkgUrl,
                 null,
                 new Gson().toJson(sourceConfig),
                 FunctionsImpl.SOURCE,
@@ -270,7 +321,7 @@ private void testRegisterSourceMissingArguments(
         Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSource() {
+    private Response registerDefaultSource() throws IOException {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(tenant);
         sourceConfig.setNamespace(namespace);
@@ -283,7 +334,7 @@ private Response registerDefaultSource() {
             tenant,
             namespace,
                 source,
-            mockedInputStream,
+            new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -443,7 +494,7 @@ public void testUpdateSourceMissingPackage() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Source Package is not provided");
     }
 
     @Test
@@ -458,7 +509,52 @@ public void testUpdateSourceMissingPackageDetails() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "zip file is empty");
+    }
+
+    @Test
+    public void testUpdateSourceMissingTopicName() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                null,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "Topic name cannot be null");
+    }
+
+    @Test
+    public void testUpdateSourceNegativeParallelism() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                -2,
+                "Source parallelism should positive number");
+    }
+
+    @Test
+    public void testUpdateSourceZeroParallelism() throws IOException {
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                0,
+                "Source parallelism should positive number");
     }
 
     private void testUpdateSourceMissingArguments(
@@ -527,7 +623,7 @@ private Response updateDefaultSource() throws IOException {
             tenant,
             namespace,
                 source,
-            mockedInputStream,
+                new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
             null,
@@ -586,8 +682,7 @@ public void testUpdateSourceSuccess() throws Exception {
     public void testUpdateSourceWithUrl() throws IOException {
         Configurator.setRootLevel(Level.DEBUG);
 
-        String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        String filePackageUrl = "file://" + fileLocation;
+        String filePackageUrl = "file://" + JAR_FILE_PATH;
 
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTopicName(outputTopic);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message