pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia commented on a change in pull request #2114: Submit and run locally builtin connectors
Date Tue, 10 Jul 2018 20:15:07 GMT
rdhabalia commented on a change in pull request #2114: Submit and run locally builtin connectors
URL: https://github.com/apache/incubator-pulsar/pull/2114#discussion_r201478836
 
 

 ##########
 File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 ##########
 @@ -243,4 +257,70 @@ private String getDownloadPackagePath(FunctionMetaData functionMetaData,
int ins
                 },
                 File.separatorChar);
     }
+
+    public static boolean isFunctionCodeBuiltin(FunctionDetailsOrBuilder functionDetails)
{
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException
{
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
+                String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
+                SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource());
+                builder.setClassName(sourceClass);
+                functionDetails.setSource(builder);
+
+                fillSourceSinkTypeClass(functionDetails, archive, sourceClass);
+                return archive;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
 
 Review comment:
   KinesisSink has an option to provide a classname of a credential-generation-provider using
which it can generate aws-credential and it loads that provider dynamically at runtime (same
as pulsar-broker loads authentication-provider dynamically). so, if user uses `built-in` kinesis-connector
then credential-generation provider jar can't be part of the NAR archive. so, right now, we
keep this provider-jar into function-worker classpath and `ThreadRuntime` can access it while
initializing kinesisSink.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message