streampipes-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zehn...@apache.org
Subject [incubator-streampipes] 05/25: Add examples for defining output strategies
Date Tue, 17 Dec 2019 09:48:06 GMT
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 772b55f07181c3293ed3f7f97ecd1ccedeb5ac4d
Author: Dominik Riemer <riemer@fzi.de>
AuthorDate: Mon Feb 18 19:32:25 2019 +0100

    Add examples for defining output strategies
---
 .gitlab-ci.yml                                     | 34 +++++-----
 .../streampipes/pe/examples/jvm/ExamplesInit.java  | 15 ++++-
 .../jvm/outputstrategy/AppendOutputController.java | 60 ++++++++++++++++++
 .../jvm/outputstrategy/CustomOutputController.java | 59 +++++++++++++++++
 .../CustomTransformOutputController.java           | 74 ++++++++++++++++++++++
 .../jvm/outputstrategy/FixedOutputController.java  | 59 +++++++++++++++++
 .../jvm/outputstrategy/KeepOutputController.java   | 56 ++++++++++++++++
 .../outputstrategy/TransformOutputController.java  | 65 +++++++++++++++++++
 8 files changed, 404 insertions(+), 18 deletions(-)

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 0837c89..393fda6 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -10,25 +10,25 @@ variables:
 
 
 stages:
-  - build
+#  - build
   - github
 
-build:
-  image: maven:3-jdk-8
-  stage: build
-  script:
-    - echo "$GPG_PRIVATE_KEY" | gpg --batch --import --passphrase "$GPG_PASSPHRASE"
-    - echo "$MAVEN_CREDENTIALS" > /root/.m2/settings.xml
-    - mvn clean package javadoc:aggregate -U -DskipTests
-    - export MVN_VERSION=$(mvn org.apache.maven.plugins:maven-help-plugin:2.1.1:evaluate
-Dexpression=project.version | grep -v '\[')
-    - "echo $MVN_VERSION >> ./target/mvn_version"
-  artifacts:
-    paths:
-      - ./*/target/*.jar
-      - ./target/mvn_version
-    expire_in:  1 hour
-  except:
-      - /release-.*$/
+#build:
+#  image: maven:3-jdk-8
+#  stage: build
+#  script:
+#    - echo "$GPG_PRIVATE_KEY" | gpg --batch --import --passphrase "$GPG_PASSPHRASE"
+#    - echo "$MAVEN_CREDENTIALS" > /root/.m2/settings.xml
+#    - mvn clean package javadoc:aggregate -U -DskipTests
+#    - export MVN_VERSION=$(mvn org.apache.maven.plugins:maven-help-plugin:2.1.1:evaluate
-Dexpression=project.version | grep -v '\[')
+#    - "echo $MVN_VERSION >> ./target/mvn_version"
+#  artifacts:
+#    paths:
+#      - ./*/target/*.jar
+#      - ./target/mvn_version
+#    expire_in:  1 hour
+#  except:
+#      - /release-.*$/
 
 
 github:
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
index 548722b..90f1303 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
@@ -21,6 +21,12 @@ import org.streampipes.dataformat.json.JsonDataFormatFactory;
 import org.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.streampipes.pe.examples.jvm.config.ExamplesJvmConfig;
+import org.streampipes.pe.examples.jvm.outputstrategy.AppendOutputController;
+import org.streampipes.pe.examples.jvm.outputstrategy.CustomOutputController;
+import org.streampipes.pe.examples.jvm.outputstrategy.CustomTransformOutputController;
+import org.streampipes.pe.examples.jvm.outputstrategy.FixedOutputController;
+import org.streampipes.pe.examples.jvm.outputstrategy.KeepOutputController;
+import org.streampipes.pe.examples.jvm.outputstrategy.TransformOutputController;
 import org.streampipes.pe.examples.jvm.staticproperty.CollectionExampleController;
 import org.streampipes.pe.examples.jvm.staticproperty.MultiValueSelectionExampleController;
 import org.streampipes.pe.examples.jvm.staticproperty.NaryMappingPropertyExampleController;
@@ -44,7 +50,14 @@ public class ExamplesInit extends StandaloneModelSubmitter {
             .add(new SingleValueSelectionExampleController())
             .add(new MultiValueSelectionExampleController())
             .add(new CollectionExampleController())
-            .add(new RuntimeResolvableSingleValue());
+            .add(new RuntimeResolvableSingleValue())
+
+            .add(new AppendOutputController())
+            .add(new CustomOutputController())
+            .add(new FixedOutputController())
+            .add(new CustomTransformOutputController())
+            .add(new TransformOutputController())
+            .add(new KeepOutputController());
 
     DeclarersSingleton.getInstance().registerDataFormat(new JsonDataFormatFactory());
     DeclarersSingleton.getInstance().registerProtocol(new SpKafkaProtocolFactory());
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
new file mode 100644
index 0000000..cb5d44b
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
@@ -0,0 +1,60 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpProperties;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.Labels;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.vocabulary.SO;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class AppendOutputController extends StandaloneEventProcessingDeclarer<DummyParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".append", "Append output example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.append(EpProperties.integerEp(Labels.from("avg",
+                    "The average value", ""), "avg", SO.Number)))
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
new file mode 100644
index 0000000..d8a4d71
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
@@ -0,0 +1,59 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class CustomOutputController extends StandaloneEventProcessingDeclarer<DummyParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".custom", "Custom output example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.custom())
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+    List<String> outputSelectors = extractor.outputKeySelectors();
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
new file mode 100644
index 0000000..772bda7
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
@@ -0,0 +1,74 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.commons.exceptions.SpRuntimeException;
+import org.streampipes.container.api.ResolvesContainerProvidedOutputStrategy;
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.model.schema.EventSchema;
+import org.streampipes.model.schema.PropertyScope;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpProperties;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.Labels;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.vocabulary.SO;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.Arrays;
+
+public class CustomTransformOutputController extends
+        StandaloneEventProcessingDeclarer<DummyParameters> implements
+        ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".customtransform", "Custom transform output example example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
+                            ("str", "The date property as a string", ""), PropertyScope.NONE)
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.customTransformation())
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+
+  @Override
+  public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor
parameterExtractor) throws SpRuntimeException {
+    return new EventSchema(Arrays
+            .asList(EpProperties
+                    .stringEp(Labels.from("runtime", "I was added at runtime", ""), "runtime",
SO.Text)));
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
new file mode 100644
index 0000000..ccdb6e0
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
@@ -0,0 +1,59 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpProperties;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.Labels;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.vocabulary.SO;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class FixedOutputController extends StandaloneEventProcessingDeclarer<DummyParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".fixed", "Fixed output example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.fixed(EpProperties.timestampProperty("timestamp"),
+                    EpProperties.doubleEp(Labels.from("avg", "Average value", ""), "avg",
SO.Number)))
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
new file mode 100644
index 0000000..8b95adf
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
@@ -0,0 +1,56 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class KeepOutputController extends StandaloneEventProcessingDeclarer<DummyParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".keep", "Keep output example example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.keep())
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
new file mode 100644
index 0000000..ef5b5e2
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
@@ -0,0 +1,65 @@
+/*
+Copyright 2019 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package org.streampipes.pe.examples.jvm.outputstrategy;
+
+import org.streampipes.model.graph.DataProcessorDescription;
+import org.streampipes.model.graph.DataProcessorInvocation;
+import org.streampipes.model.schema.PropertyScope;
+import org.streampipes.pe.examples.jvm.base.DummyEngine;
+import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.EpRequirements;
+import org.streampipes.sdk.helpers.Labels;
+import org.streampipes.sdk.helpers.OutputStrategies;
+import org.streampipes.sdk.helpers.SupportedFormats;
+import org.streampipes.sdk.helpers.SupportedProtocols;
+import org.streampipes.sdk.helpers.TransformOperations;
+import org.streampipes.sdk.utils.Datatypes;
+import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.util.List;
+
+public class TransformOutputController extends StandaloneEventProcessingDeclarer<DummyParameters>
{
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.streampipes.examples.outputstrategy" +
+            ".transform", "Transform output example example", "")
+            .requiredStream(StreamRequirementsBuilder.
+                    create()
+                    .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
+                            ("str", "The date property as a string", ""), PropertyScope.NONE)
+                    .build())
+            .supportedProtocols(SupportedProtocols.kafka())
+            .supportedFormats(SupportedFormats.jsonFormat())
+
+            .outputStrategy(OutputStrategies.transform(TransformOperations
+                    .staticDatatypeTransformation("str", Datatypes.Long)))
+
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
+
+    List<String> outputSelectors = extractor.outputKeySelectors();
+
+    return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+  }
+}


Mime
View raw message