streampipes-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zehn...@apache.org
Subject [incubator-streampipes] 09/25: Add examples for runtime-resolveable properties
Date Tue, 17 Dec 2019 09:48:10 GMT
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5eb7aa5b3d295ed841ae3aabdc94706fbc014ac3
Author: Dominik Riemer <riemer@fzi.de>
AuthorDate: Mon Jul 8 06:56:12 2019 +0200

    Add examples for runtime-resolveable properties
---
 pom.xml                                            |  4 +--
 .../pom.xml                                        |  6 +++-
 .../streampipes/pe/examples/jvm/ExamplesInit.java  |  2 ++
 .../RuntimeResolvableSingleValue.java              | 33 +++++++++++++++++---
 ...a => StaticPropertyAlternativesController.java} | 36 ++++++++++------------
 5 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3c0b75a..be1da97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,13 +7,13 @@
     <groupId>org.streampipes</groupId>
     <artifactId>streampipes-pipeline-elements-examples</artifactId>
     <packaging>pom</packaging>
-    <version>0.60.2-SNAPSHOT</version>
+    <version>0.62.1-SNAPSHOT</version>
     <modules>
         <module>streampipes-pipeline-elements-examples-processors-jvm</module>
     </modules>
 
     <properties>
-        <streampipes.version>0.60.2-SNAPSHOT</streampipes.version>
+        <streampipes.version>0.62.1-SNAPSHOT</streampipes.version>
         <lightcouch.version>0.1.8</lightcouch.version>
     </properties>
 
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/pom.xml b/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
index 2aeb124..60d4215 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
+++ b/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <artifactId>streampipes-pipeline-elements-examples</artifactId>
         <groupId>org.streampipes</groupId>
-        <version>0.60.2-SNAPSHOT</version>
+        <version>0.62.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
@@ -36,6 +36,10 @@
             <groupId>org.streampipes</groupId>
             <artifactId>streampipes-messaging-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.streampipes</groupId>
+            <artifactId>streampipes-messaging-jms</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
index 90f1303..ec91503 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/ExamplesInit.java
@@ -34,6 +34,7 @@ import org.streampipes.pe.examples.jvm.staticproperty.NumberParameterExampleCont
 import org.streampipes.pe.examples.jvm.staticproperty.NumberParameterWithRangeExampleController;
 import org.streampipes.pe.examples.jvm.staticproperty.RuntimeResolvableSingleValue;
 import org.streampipes.pe.examples.jvm.staticproperty.SingleValueSelectionExampleController;
+import org.streampipes.pe.examples.jvm.staticproperty.StaticPropertyAlternativesController;
 import org.streampipes.pe.examples.jvm.staticproperty.TextParameterExampleController;
 import org.streampipes.pe.examples.jvm.staticproperty.UnaryMappingPropertyExampleController;
 
@@ -51,6 +52,7 @@ public class ExamplesInit extends StandaloneModelSubmitter {
             .add(new MultiValueSelectionExampleController())
             .add(new CollectionExampleController())
             .add(new RuntimeResolvableSingleValue())
+            .add(new StaticPropertyAlternativesController())
 
             .add(new AppendOutputController())
             .add(new CustomOutputController())
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
index 26850f2..aa1bcf0 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
@@ -15,16 +15,17 @@ limitations under the License.
 */
 package org.streampipes.pe.examples.jvm.staticproperty;
 
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.streampipes.model.graph.DataProcessorDescription;
 import org.streampipes.model.graph.DataProcessorInvocation;
-import org.streampipes.model.runtime.RuntimeOptions;
-import org.streampipes.model.schema.EventProperty;
+import org.streampipes.model.staticproperty.Option;
 import org.streampipes.pe.examples.jvm.base.DummyEngine;
 import org.streampipes.pe.examples.jvm.base.DummyParameters;
 import org.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.streampipes.sdk.helpers.EpRequirements;
 import org.streampipes.sdk.helpers.Labels;
 import org.streampipes.sdk.helpers.OutputStrategies;
@@ -35,10 +36,16 @@ import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDecl
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class RuntimeResolvableSingleValue extends
         StandaloneEventProcessingDeclarer<DummyParameters> implements ResolvesContainerProvidedOptions
{
 
+  private static final String KafkaHost = "kafka-host";
+  private static final String KafkaPort = "kafka-port";
+
   @Override
   public DataProcessorDescription declareModel() {
     return ProcessingElementBuilder.create("org.streampipes.examples.staticproperty" +
@@ -50,10 +57,12 @@ public class RuntimeResolvableSingleValue extends
             .outputStrategy(OutputStrategies.keep())
             .supportedProtocols(SupportedProtocols.kafka())
             .supportedFormats(SupportedFormats.jsonFormat())
+            .requiredTextParameter(Labels.from(KafkaHost, "Kafka Host", ""))
+            .requiredIntegerParameter(Labels.from(KafkaPort, "Kafka Port", ""))
 
             // create a single value selection parameter that is resolved at runtime
             .requiredSingleValueSelectionFromContainer(Labels.from("id", "Example Name",
"Example " +
-                    "Description"))
+                    "Description"), Arrays.asList(KafkaHost, KafkaPort))
 
             .build();
   }
@@ -70,7 +79,21 @@ public class RuntimeResolvableSingleValue extends
   }
 
   @Override
-  public List<RuntimeOptions> resolveOptions(String requestId, EventProperty linkedEventProperty)
{
-    return Arrays.asList(new RuntimeOptions("I was defined at runtime", ""));
+  public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor)
{
+    String host = extractor.singleValueParameter(KafkaHost, String.class);
+    Integer port = extractor.singleValueParameter(KafkaPort, Integer.class);
+
+    String kafkaAddress = host + ":" + port;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", kafkaAddress);
+    props.put("group.id", "test-consumer-group");
+    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+    Set<String> topics = consumer.listTopics().keySet();
+    consumer.close();
+    return topics.stream().map(Option::new).collect(Collectors.toList());
   }
 }
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
similarity index 67%
copy from streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
copy to streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
index 26850f2..24b45c9 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
@@ -15,16 +15,15 @@ limitations under the License.
 */
 package org.streampipes.pe.examples.jvm.staticproperty;
 
-import org.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.streampipes.model.graph.DataProcessorDescription;
 import org.streampipes.model.graph.DataProcessorInvocation;
-import org.streampipes.model.runtime.RuntimeOptions;
-import org.streampipes.model.schema.EventProperty;
 import org.streampipes.pe.examples.jvm.base.DummyEngine;
 import org.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.streampipes.sdk.StaticProperties;
 import org.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.streampipes.sdk.helpers.Alternatives;
 import org.streampipes.sdk.helpers.EpRequirements;
 import org.streampipes.sdk.helpers.Labels;
 import org.streampipes.sdk.helpers.OutputStrategies;
@@ -33,16 +32,16 @@ import org.streampipes.sdk.helpers.SupportedProtocols;
 import org.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
 
-import java.util.Arrays;
-import java.util.List;
+public class StaticPropertyAlternativesController extends
+        StandaloneEventProcessingDeclarer<DummyParameters> {
 
-public class RuntimeResolvableSingleValue extends
-        StandaloneEventProcessingDeclarer<DummyParameters> implements ResolvesContainerProvidedOptions
{
+  private static final String KafkaHost = "kafka-host";
+  private static final String KafkaPort = "kafka-port";
 
   @Override
   public DataProcessorDescription declareModel() {
     return ProcessingElementBuilder.create("org.streampipes.examples.staticproperty" +
-            ".runtimeresolvablesingle", "Runtime-resolvable single value example", "")
+            ".alternatives", "Static property alternatives example", "")
             .requiredStream(StreamRequirementsBuilder.
                     create()
                     .requiredProperty(EpRequirements.anyProperty())
@@ -50,27 +49,24 @@ public class RuntimeResolvableSingleValue extends
             .outputStrategy(OutputStrategies.keep())
             .supportedProtocols(SupportedProtocols.kafka())
             .supportedFormats(SupportedFormats.jsonFormat())
+            .requiredTextParameter(Labels.from(KafkaHost, "Kafka Host", ""))
 
-            // create a single value selection parameter that is resolved at runtime
-            .requiredSingleValueSelectionFromContainer(Labels.from("id", "Example Name",
"Example " +
-                    "Description"))
-
+            .requiredAlternatives(Labels.from("window", "Window", ""),
+                    Alternatives.from(Labels.from("count", "Count Window", ""),
+                            StaticProperties.integerFreeTextProperty(Labels.from("count-window-size",
+                                    "Count Window Size", ""))),
+                    Alternatives.from(Labels.from("time", "Time Window", ""),
+                            StaticProperties.integerFreeTextProperty(Labels.from("time" +
+                                    "-window-size", "Time Window Size", ""))))
             .build();
   }
 
   @Override
   public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
 
-    // Extract the text parameter value
-    String selectedSingleValue = extractor.selectedSingleValue("id", String.class);
-
-    // now the text parameter would be added to a parameter class (omitted for this example)
+System.out.println("incov");
 
     return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
   }
 
-  @Override
-  public List<RuntimeOptions> resolveOptions(String requestId, EventProperty linkedEventProperty)
{
-    return Arrays.asList(new RuntimeOptions("I was defined at runtime", ""));
-  }
 }


Mime
View raw message