pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [incubator-pulsar] branch master updated: Fix Handling of user defined nar sources/sinks (#2502)
Date Wed, 05 Sep 2018 16:05:46 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d58114  Fix Handling of user defined nar sources/sinks (#2502)
6d58114 is described below

commit 6d5811445c150b728a83194b2d3868333d1bfaa3
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Wed Sep 5 09:05:40 2018 -0700

    Fix Handling of user defined nar sources/sinks (#2502)
    
    * Fix handling of nar archives
    
    * Adress review comments
    
    * Added a integration test to test archive
    
    * Fixed build
    
    * Fix build
    
    * Fix integratin test
---
 .../functions/instance/JavaInstanceRunnable.java   |  9 +--
 .../integration/functions/PulsarFunctionsTest.java | 82 ++++++++++++++--------
 ...Tester.java => CassandraSinkArchiveTester.java} |  8 +--
 .../tests/integration/io/CassandraSinkTester.java  |  2 +-
 .../tests/integration/io/JdbcSinkTester.java       |  2 +-
 .../tests/integration/io/KafkaSinkTester.java      |  2 +-
 .../pulsar/tests/integration/io/SinkTester.java    | 24 ++++++-
 7 files changed, 85 insertions(+), 44 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cdcad60..344d9ee 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -27,6 +27,7 @@ import com.google.gson.reflect.TypeToken;
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -246,12 +247,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable
{
 
     private void loadJars() throws Exception {
 
-        if (jarFile.endsWith(".nar")) {
-            // The functions code is contained in a NAR archive
+        try {
+            // Let's first try to treat it as a nar archive
             fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), instanceConfig.getInstanceId(),
                     jarFile);
-        } else {
-            log.info("Loading JAR files for function {} from archive {}", instanceConfig,
jarFile);
+        } catch (FileNotFoundException e) {
+            log.info("For Function {} Loading as NAR failed with {}; treating it as Jar instead",
instanceConfig, e);
             // create the function class loader
             fnCache.registerFunctionInstance(
                     instanceConfig.getFunctionId(),
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5cefc6a..7b3cd4f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,13 +42,7 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
-import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
-import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
-import org.apache.pulsar.tests.integration.io.SinkTester;
-import org.apache.pulsar.tests.integration.io.SourceTester;
+import org.apache.pulsar.tests.integration.io.*;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.Assert;
@@ -66,20 +60,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester());
+        testSink(new KafkaSinkTester(), true);
     }
 
     @Test
     public void testCassandraSink() throws Exception {
-        testSink(new CassandraSinkTester());
+        testSink(new CassandraSinkTester(), true);
+    }
+
+    @Test
+    public void testCassandraArchiveSink() throws Exception {
+        testSink(new CassandraSinkArchiveTester(), false);
     }
 
     @Test
     public void testJdbcSink() throws Exception {
-        testSink(new JdbcSinkTester());
+        testSink(new JdbcSinkTester(), true);
     }
 
-    private void testSink(SinkTester tester) throws Exception {
+    private void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
 
         final String tenant = TopicName.PUBLIC_TENANT;
@@ -87,7 +86,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         final String inputTopicName = "test-sink-connector-"
             + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8);
         final String sinkName = "test-sink-connector-"
-            + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8);
+            + tester.getSinkType().name().toLowerCase() + "-" + functionRuntimeType + "-name-"
+ randomName(8);
         final int numMessages = 20;
 
         // prepare the testing environment for sink
@@ -97,7 +96,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
 
         // get sink info
-        getSinkInfoSuccess(tester, tenant, namespace, sinkName);
+        getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
 
         // get sink status
         getSinkStatus(tenant, namespace, sinkName);
@@ -105,7 +104,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         // produce messages
         Map<String, String> kvs;
         if (tester instanceof JdbcSinkTester) {
-            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class));
+            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
         } else {
             kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
         }
@@ -132,16 +131,31 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
                                        String namespace,
                                        String sinkName,
                                        String inputTopicName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink", "create",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName,
-            "--sink-type", tester.sinkType(),
-            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-            "--inputs", inputTopicName
-        };
+        String[] commands;
+        if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "create",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--sink-type", tester.sinkType().name().toLowerCase(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName
+            };
+        } else {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "create",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--archive", tester.getSinkArchive(),
+                    "--classname", tester.getSinkClassName(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName
+            };
+        }
         log.info("Run command : {}", StringUtils.join(commands, ' '));
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
@@ -152,7 +166,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
     protected void getSinkInfoSuccess(SinkTester tester,
                                       String tenant,
                                       String namespace,
-                                      String sinkName) throws Exception {
+                                      String sinkName,
+                                      boolean builtin) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
@@ -163,10 +178,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         };
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
-        assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + tester.getSinkType() + "\""),
-            result.getStdout()
-        );
+        if (builtin) {
+            assertTrue(
+                    result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase()
+ "\""),
+                    result.getStdout()
+            );
+        } else {
+            assertTrue(
+                    result.getStdout().contains("\"className\": \"" + tester.getSinkClassName()
+ "\""),
+                    result.getStdout()
+            );
+        }
     }
 
     protected void getSinkStatus(String tenant, String namespace, String sinkName) throws
Exception {
@@ -231,7 +253,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
 
-            Foo obj = new Foo();
+            JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
             obj.setField1("field1_" + i);
             obj.setField2("field2_" + i);
             obj.setField3(i);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
similarity index 93%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
index e31de9f..86c7689 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
@@ -35,10 +35,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * A tester for testing cassandra sink.
+ * A tester for testing cassandra sink submitted as an archive.
  */
 @Slf4j
-public class CassandraSinkTester extends SinkTester {
+public class CassandraSinkArchiveTester extends SinkTester {
 
     private static final String NAME = "cassandra";
 
@@ -54,8 +54,8 @@ public class CassandraSinkTester extends SinkTester {
     private Cluster cluster;
     private Session session;
 
-    public CassandraSinkTester() {
-        super("cassandra");
+    public CassandraSinkArchiveTester() {
+        super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink");
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index e31de9f..c9d3e5a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -55,7 +55,7 @@ public class CassandraSinkTester extends SinkTester {
     private Session session;
 
     public CassandraSinkTester() {
-        super("cassandra");
+        super(SinkType.CASSANDRA);
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 6a102f1..e4aa401 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -63,7 +63,7 @@ public class JdbcSinkTester extends SinkTester {
     private Connection connection;
 
     public JdbcSinkTester() {
-        super(NAME);
+        super(SinkType.JDBC);
 
         // container default value is test
         sinkConfig.put("userName", "test");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 1cd58f2..ff79e1a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -52,7 +52,7 @@ public class KafkaSinkTester extends SinkTester {
     private KafkaConsumer<String, String> kafkaConsumer;
 
     public KafkaSinkTester() {
-        super(NAME);
+        super(SinkType.KAFKA);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 3eee821..098b8bf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -29,17 +29,35 @@ import org.testng.collections.Maps;
 @Getter
 public abstract class SinkTester {
 
-    protected final String sinkType;
+    public enum SinkType {
+        UNDEFINED,
+        CASSANDRA,
+        KAFKA,
+        JDBC
+    }
+
+    protected final SinkType sinkType;
+    protected final String sinkArchive;
+    protected final String sinkClassName;
     protected final Map<String, Object> sinkConfig;
 
-    public SinkTester(String sinkType) {
+    public SinkTester(SinkType sinkType) {
         this.sinkType = sinkType;
+        this.sinkArchive = null;
+        this.sinkClassName = null;
+        this.sinkConfig = Maps.newHashMap();
+    }
+
+    public SinkTester(String sinkArchive, String sinkClassName) {
+        this.sinkType = SinkType.UNDEFINED;
+        this.sinkArchive = sinkArchive;
+        this.sinkClassName = sinkClassName;
         this.sinkConfig = Maps.newHashMap();
     }
 
     public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>>
externalServices);
 
-    public String sinkType() {
+    public SinkType sinkType() {
         return sinkType;
     }
 


Mime
View raw message