streampipes-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zehn...@apache.org
Subject [incubator-streampipes] 13/25: Add example to invoke external processor
Date Tue, 17 Dec 2019 09:48:14 GMT
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 0aadf00e776d54b41c5d2cff838321f8f5bde5ab
Author: Dominik Riemer <riemer@fzi.de>
AuthorDate: Tue Aug 27 19:51:59 2019 +0200

    Add example to invoke external processor
---
 pom.xml                                            |  7 ++++
 .../streampipes/pe/examples/jvm/ExamplesInit.java  |  5 ++-
 .../examples/jvm/engine/ExampleExternalEngine.java | 39 ++++++++++++++++++
 .../engine/ExampleExternalEngineController.java    | 48 ++++++++++++++++++++++
 .../engine/ExampleExternalEngineParameters.java    | 26 ++++++++++++
 5 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 8c2dee4..9fe40c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,6 +16,8 @@
         <streampipes.version>0.62.1-SNAPSHOT</streampipes.version>
         <lightcouch.version>0.1.8</lightcouch.version>
         <spring.version>2.1.6.RELEASE</spring.version>
+
+        <maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
     </properties>
 
     <dependencyManagement>
@@ -200,6 +202,11 @@
                     <version>3.0.0</version>
                 </plugin>
                 <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-shade-plugin</artifactId>
+                    <version>${maven-shade-plugin.version}</version>
+                </plugin>
+                <plugin>
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-maven-plugin</artifactId>
                     <version>${spring.version}</version>
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
index 131d6aa..b7e5ae5 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
@@ -21,6 +21,7 @@ import org.streampipes.dataformat.json.JsonDataFormatFactory;
 import org.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.streampipes.pe.examples.jvm.config.ExamplesJvmConfig;
+import org.streampipes.pe.examples.jvm.engine.ExampleExternalEngineController;
 import org.streampipes.pe.examples.jvm.outputstrategy.AppendOutputController;
 import org.streampipes.pe.examples.jvm.outputstrategy.CustomOutputController;
 import org.streampipes.pe.examples.jvm.outputstrategy.CustomTransformOutputController;
@@ -61,7 +62,9 @@ public class ExamplesInit extends StandaloneModelSubmitter {
             .add(new FixedOutputController())
             .add(new CustomTransformOutputController())
             .add(new TransformOutputController())
-            .add(new KeepOutputController());
+            .add(new KeepOutputController())
+
+            .add(new ExampleExternalEngineController());
 
     DeclarersSingleton.getInstance().registerDataFormat(new JsonDataFormatFactory());
     DeclarersSingleton.getInstance().registerProtocol(new SpKafkaProtocolFactory());
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java
new file mode 100644
index 0000000..5ebf93d
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java
@@ -0,0 +1,39 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.engine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.streampipes.commons.exceptions.SpRuntimeException;
+import org.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.streampipes.wrapper.runtime.ExternalEventProcessor;
+
+public class ExampleExternalEngine
+        implements ExternalEventProcessor<ExampleExternalEngineParameters> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExampleExternalEngine.class);
+
+  @Override
+  public void onInvocation(ExampleExternalEngineParameters parameters,
+                             EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException
{
+    LOG.info("I'm invoked!");
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    LOG.info("I'm detached!");
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java
new file mode 100644
index 0000000..b68475d
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java
@@ -0,0 +1,48 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.engine;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+
+public class ExampleExternalEngineController
+        extends StandaloneExternalEventProcessingDeclarer<ExampleExternalEngineParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.engine.external", "Example
" +
+            "External Engine", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .outputStrategy(OutputStrategies.keep())
+            .build();
+  }
+
+  @Override
+  public ConfiguredExternalEventProcessor<ExampleExternalEngineParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+    return new ConfiguredExternalEventProcessor<>(new ExampleExternalEngineParameters(graph),
+            ExampleExternalEngine::new);
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java
new file mode 100644
index 0000000..9f49bf7
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java
@@ -0,0 +1,26 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.engine;
+
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class ExampleExternalEngineParameters extends EventProcessorBindingParams {
+
+  public ExampleExternalEngineParameters(DataProcessorInvocation graph) {
+    super(graph);
+  }
+}


Mime
View raw message