pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #1698: Use Function ClassLoader to load types
Date Tue, 01 May 2018 01:53:34 GMT
srkukarni closed pull request #1698: Use Function ClassLoader to load types
URL: https://github.com/apache/incubator-pulsar/pull/1698
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e181204189..85cce9a242 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -159,7 +159,7 @@ JavaInstance setupJavaInstance() throws Exception {
         // start the output producer
         processor.setupOutput(outputSerDe);
         // start the input consumer
-        processor.setupInput(typeArgs[0]);
+        processor.setupInput(typeArgs[0], clsLoader);
         // start any log topic handler
         setupLogHandler();
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 0dcf12c556..6baa113e8a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -63,7 +63,7 @@ static MessageProcessor create(PulsarClient client,
      * @param inputType the input type of the function
      * @throws Exception
      */
-    void setupInput(Class<?> inputType)
+    void setupInput(Class<?> inputType, ClassLoader clsLoader)
         throws Exception;
 
     /**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index a2a5d8bc8a..ff57bd8744 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -61,7 +61,7 @@ protected MessageProcessorBase(PulsarClient client,
     //
 
     @Override
-    public void setupInput(Class<?> inputType) throws Exception {
+    public void setupInput(Class<?> inputType, ClassLoader clsLoader) throws Exception
{
 
         org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource();
         Object object;
@@ -75,6 +75,7 @@ public void setupInput(Class<?> inputType) throws Exception {
             pulsarConfig.setSubscriptionType(
                     FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name()));
             pulsarConfig.setTypeClassName(inputType.getName());
+            pulsarConfig.setClsLoader(clsLoader);
 
             Object[] params = {this.client, pulsarConfig};
             Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
index 2a5dc44d32..d33fc057ff 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
@@ -40,6 +40,7 @@
     private String subscriptionName;
     private Map<String, String> topicSerdeClassNameMap;
     private String typeClassName;
+    private ClassLoader clsLoader;
 
     public static PulsarConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index caaa7bf3f1..dae6dfbce1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -128,7 +128,7 @@ public void close() throws Exception {
 
     private void setupSerde() throws ClassNotFoundException {
 
-        Class<?> typeArg = Class.forName(this.pulsarConfig.getTypeClassName());
+        Class<?> typeArg = pulsarConfig.getClsLoader().loadClass(pulsarConfig.getTypeClassName());
         if (Void.class.equals(typeArg)) {
             throw new RuntimeException("Input type of Pulsar Function cannot be Void");
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message