From commits-return-42-archive-asf-public=cust-asf.ponee.io@streampipes.apache.org Sun Dec 8 14:07:45 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 168E618037A for ; Sun, 8 Dec 2019 15:07:44 +0100 (CET) Received: (qmail 53046 invoked by uid 500); 8 Dec 2019 14:07:44 -0000 Mailing-List: contact commits-help@streampipes.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streampipes.apache.org Delivered-To: mailing list commits@streampipes.apache.org Received: (qmail 53037 invoked by uid 99); 8 Dec 2019 14:07:44 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 Dec 2019 14:07:44 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 482A08D809; Sun, 8 Dec 2019 14:07:44 +0000 (UTC) Date: Sun, 08 Dec 2019 14:07:44 +0000 To: "commits@streampipes.apache.org" Subject: [incubator-streampipes-extensions] branch dev updated: Add Adapter for TI Sensor MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157581406424.15551.6246227811132952885@gitbox.apache.org> From: zehnder@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-streampipes-extensions X-Git-Refname: refs/heads/dev X-Git-Reftype: branch X-Git-Oldrev: 3761e78147470d22c43f523ed19904e19d1bf93f X-Git-Newrev: 343c35809c95868c109c4da7c2204201ee8af959 X-Git-Rev: 343c35809c95868c109c4da7c2204201ee8af959 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git The following commit(s) were added to refs/heads/dev by this push: new 343c358 Add Adapter for TI Sensor 343c358 is described below commit 343c35809c95868c109c4da7c2204201ee8af959 Author: Philipp Zehnder AuthorDate: Sun Dec 8 14:48:33 2019 +0100 Add Adapter for TI Sensor --- .../streampipes/connect/ConnectAdapterInit.java | 2 + .../connect/adapters/ti/TISensorTag.java | 229 +++++++++++++++++++++ .../connect/protocol/stream/MqttProtocol.java | 6 - .../connect/adapters/ti/TISensorTagTest.java | 61 ++++++ 4 files changed, 292 insertions(+), 6 deletions(-) diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/ConnectAdapterInit.java index 3706ccc..8385848 100644 --- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/ConnectAdapterInit.java +++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/ConnectAdapterInit.java @@ -18,6 +18,7 @@ package org.streampipes.connect; import org.streampipes.connect.adapters.plc4x.passive.Plc4xPassiveAdapter; +import org.streampipes.connect.adapters.ti.TISensorTag; import org.streampipes.connect.protocol.set.HttpProtocol; import org.streampipes.connect.adapters.coindesk.CoindeskBitcoinAdapter; import org.streampipes.connect.adapters.gdelt.GdeltAdapter; @@ -78,6 +79,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer { .add(new OpcUaAdapter()) .add(new InfluxDbStreamAdapter()) .add(new InfluxDbSetAdapter()) + .add(new TISensorTag()) .add(new Plc4xS7Adapter()); String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl(); diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/adapters/ti/TISensorTag.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/adapters/ti/TISensorTag.java new file mode 100644 index 0000000..ec5fbec --- /dev/null +++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/adapters/ti/TISensorTag.java @@ -0,0 +1,229 @@ +/* + * Copyright 2018 FZI Forschungszentrum Informatik + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.streampipes.connect.adapters.ti; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.streampipes.connect.adapter.Adapter; +import org.streampipes.connect.adapter.exception.AdapterException; +import org.streampipes.connect.adapter.model.pipeline.AdapterPipeline; +import org.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter; +import org.streampipes.connect.protocol.stream.MqttConfig; +import org.streampipes.connect.protocol.stream.MqttConsumer; +import org.streampipes.messaging.InternalEventProcessor; +import org.streampipes.model.AdapterType; +import org.streampipes.model.connect.adapter.SpecificAdapterStreamDescription; +import org.streampipes.model.connect.guess.GuessSchema; +import org.streampipes.sdk.StaticProperties; +import org.streampipes.sdk.builder.adapter.GuessSchemaBuilder; +import org.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder; +import org.streampipes.sdk.extractor.StaticPropertyExtractor; +import org.streampipes.sdk.helpers.Alternatives; +import org.streampipes.sdk.helpers.Labels; +import org.streampipes.vocabulary.SO; +import org.streampipes.vocabulary.SPSensor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.streampipes.sdk.helpers.EpProperties.*; + +public class TISensorTag extends SpecificDataStreamAdapter { + + private Logger logger = LoggerFactory.getLogger(TISensorTag.class); + + public static final String ID = "http://streampipes.org/adapter/specific/tisensortag"; + + private static final String ACCESS_MODE = "access_mode"; + private static final String ANONYMOUS_ACCESS = "anonymous-alternative"; + private static final String USERNAME_ACCESS = "username-alternative"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + + + private static final String TIMESTAMP = "timestamp"; + private static final String AMBIENT_TEMP = "ambientTemp"; + private static final String OBJECT_TEMP = "objectTemp"; + private static final String HUMIDITY = "humidity"; + private static final String ACCELERATION_X = "accelX"; + private static final String ACCELERATION_Y = "accelY"; + private static final String ACCELERATION_Z = "accelZ"; + private static final String GYROSCOPE_X = "gyroX"; + private static final String GYROSCOPE_Y = "gyroY"; + private static final String GYROSCOPE_Z = "gyroZ"; + private static final String MAGNETOMETER_X = "magX"; + private static final String MAGNETOMETER_Y = "magY"; + private static final String MAGNETOMETER_Z = "magZ"; + private static final String LIGHT = "light"; + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + + private MqttConsumer mqttConsumer; + private MqttConfig mqttConfig; + private Thread thread; + + public TISensorTag() { + super(); + } + + public TISensorTag(SpecificAdapterStreamDescription adapterDescription, MqttConfig mqttConfig) { + super(adapterDescription); + this.mqttConfig = mqttConfig; + } + + @Override + public SpecificAdapterStreamDescription declareModel() { + + SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID, "TI Sensor Tag", "") + .iconUrl("ti_sensor_tag.png") + .category(AdapterType.Environment, AdapterType.OpenData) + .requiredTextParameter(Labels.from("broker_url", "Broker URL", + "Example: tcp://test-server.com:1883 (Protocol required. Port required)")) + .requiredAlternatives(Labels.from(ACCESS_MODE, "Access Mode", ""), + Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", "")), + Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""), + StaticProperties.group(Labels.withId("username-group"), + StaticProperties.stringFreeTextProperty(Labels.from(USERNAME, + "Username", "")), + StaticProperties.secretValue(Labels.from(PASSWORD, + "Password", ""))))) + .requiredTextParameter(Labels.from("topic", "Topic","Example: test/topic")) + .build(); + + description.setAppId(ID); + return description; + } + + @Override + public void startAdapter() throws AdapterException { + this.mqttConsumer = new MqttConsumer(this.mqttConfig, new EventProcessor(adapterPipeline)); + + thread = new Thread(this.mqttConsumer); + thread.start(); + } + + @Override + public void stopAdapter() throws AdapterException { + this.mqttConsumer.close(); + } + + @Override + public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) { + MqttConfig mqttConfig; + StaticPropertyExtractor extractor = + StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>()); + + String brokerUrl = extractor.singleValueParameter("broker_url", String.class); + String topic = extractor.singleValueParameter("topic", String.class); + String selectedAlternative = extractor.selectedAlternativeInternalId("access_mode"); + + if (selectedAlternative.equals(ANONYMOUS_ACCESS)) { + mqttConfig = new MqttConfig(brokerUrl, topic); + } else { + String username = extractor.singleValueParameter(USERNAME, String.class); + String password = extractor.secretValue(PASSWORD); + mqttConfig = new MqttConfig(brokerUrl, topic, username, password); + } + + return new TISensorTag(adapterDescription, mqttConfig); + } + + @Override + public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) { + return GuessSchemaBuilder.create() + .property(timestampProperty(TIMESTAMP)) + .property(doubleEp(Labels.from(AMBIENT_TEMP, "Ambient Temperature", ""), + AMBIENT_TEMP, SO.Number)) + .property(doubleEp(Labels.from(OBJECT_TEMP, "Object Temperature", ""), + OBJECT_TEMP, SO.Number)) + .property(doubleEp(Labels.from(HUMIDITY, "Humidity", ""), + HUMIDITY, SO.Number)) + .property(doubleEp(Labels.from(ACCELERATION_X, "Acceleration X", ""), + ACCELERATION_X, SPSensor.ACCELERATION_X)) + .property(doubleEp(Labels.from(ACCELERATION_Y, "Acceleration Y", ""), + ACCELERATION_Y, SPSensor.ACCELERATION_Y)) + .property(doubleEp(Labels.from(ACCELERATION_Z, "Acceleration Z", ""), + ACCELERATION_Z, SPSensor.ACCELERATION_Z)) + .property(doubleEp(Labels.from(GYROSCOPE_X, "Gyroscope X", ""), + GYROSCOPE_X, SO.Number)) + .property(doubleEp(Labels.from(GYROSCOPE_Y, "Gyroscope Y", ""), + GYROSCOPE_Y, SO.Number)) + .property(doubleEp(Labels.from(GYROSCOPE_Z, "Gyroscope Z", ""), + GYROSCOPE_Z, SO.Number)) + .property(doubleEp(Labels.from(MAGNETOMETER_X, "Magnetometer X", ""), + MAGNETOMETER_X, SO.Number)) + .property(doubleEp(Labels.from(MAGNETOMETER_Y, "Magnetometer Y", ""), + MAGNETOMETER_Y, SO.Number)) + .property(doubleEp(Labels.from(MAGNETOMETER_Z, "Magnetometer Z", ""), + MAGNETOMETER_Z, SO.Number)) + .property(doubleEp(Labels.from(LIGHT, "Light", ""), + LIGHT, SO.Number)) + .property(booleanEp(Labels.from(KEY_1, "Key 1", ""), + KEY_1, SO.Boolean)) + .property(booleanEp(Labels.from(KEY_2, "Key 2", ""), + KEY_2, SO.Boolean)) + .build(); + } + + @Override + public String getId() { + return ID; + } + + private class EventProcessor implements InternalEventProcessor { + private AdapterPipeline adapterPipeline; + + public EventProcessor(AdapterPipeline adapterpipeline) { + this.adapterPipeline = adapterpipeline; + } + + @Override + public void onEvent(byte[] payload) { + Map result = parseEvent(new String(payload)); + adapterPipeline.process(result); + } + } + + public static Map parseEvent(String s) { + Map result = new HashMap<>(); + String[] lines = s.split("\n"); + for (String line : lines) { + if (line.startsWith("\"")) { + line = line.replaceAll(",", "").replaceAll("\"", ""); + String[] keyValue = line.split(":"); + + // convert keys to boolean, other sensor values are doubles + if (keyValue[0].startsWith("key")) { + result.put(keyValue[0], Double.parseDouble(keyValue[1]) == 1.0); + } else { + result.put(keyValue[0], Double.parseDouble(keyValue[1])); + } + } + } + + if (!result.containsKey("key1") || !result.containsKey("key2")) { + result.put("key1", false); + result.put("key2", false); + } + + result.put(TIMESTAMP, System.currentTimeMillis()); + + return result; + } +} \ No newline at end of file diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/protocol/stream/MqttProtocol.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/protocol/stream/MqttProtocol.java index 5dad50e..089cd8c 100644 --- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/protocol/stream/MqttProtocol.java +++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/streampipes/connect/protocol/stream/MqttProtocol.java @@ -66,14 +66,8 @@ public class MqttProtocol extends BrokerProtocol { StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>()); String brokerUrl = extractor.singleValueParameter("broker_url", String.class); -// String brokerUrl = "tcp://ipe-girlitz.fzi.de:1883"; - String topic = extractor.singleValueParameter("topic", String.class); -// String topic = "acceleration"; - String selectedAlternative = extractor.selectedAlternativeInternalId("access_mode"); -// String selectedAlternative = ANONYMOUS_ACCESS; - if (selectedAlternative.equals(ANONYMOUS_ACCESS)) { mqttConfig = new MqttConfig(brokerUrl, topic); diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/test/java/org/streampipes/connect/adapters/ti/TISensorTagTest.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/test/java/org/streampipes/connect/adapters/ti/TISensorTagTest.java new file mode 100644 index 0000000..a590abc --- /dev/null +++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/test/java/org/streampipes/connect/adapters/ti/TISensorTagTest.java @@ -0,0 +1,61 @@ +package org.streampipes.connect.adapters.ti; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.*; + +/* +Copyright 2019 FZI Forschungszentrum Informatik + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +public class TISensorTagTest { + String event = "{\n" + + "\"ambientTemp\":\"19.58\",\n" + + "\"objectTemp\":\"16.59\",\n" + + "\"humidity\":\"38.3917\",\n" + + "\"accelX\":\"0.28\",\n" + + "\"accelY\":\"-0.48\",\n" + + "\"accelZ\":\"-0.91\",\n" + + "\"gyroX\":\"4.88\",\n" + + "\"gyroY\":\"4.53\",\n" + + "\"gyroZ\":\"-2.96\",\n" + + "\"magX\":\"134.16\",\n" + + "\"magY\":\"-17.84\" \n" + + "\"magZ\":\"18.44\" \n" + + "\"light\":\"0.00\" \n" + + "\n" + + "}"; + + @Test + public void parseEvent() { + Map result = TISensorTag.parseEvent(event); + + assertEquals(14, result.keySet().size()); + assertEquals(result.get("ambientTemp"), 19.58); + assertEquals(result.get("objectTemp"), 16.59); + assertEquals(result.get("humidity"), 38.3917); + assertEquals(result.get("accelX"), 0.28); + assertEquals(result.get("accelY"), -0.48); + assertEquals(result.get("accelZ"),-0.91); + assertEquals(result.get("gyroX"), 4.88); + assertEquals(result.get("gyroY"), 4.53); + assertEquals(result.get("gyroZ"), -2.96); + assertEquals(result.get("magX"), 134.16); + assertEquals(result.get("magY"), -17.84); + assertEquals(result.get("magZ"), 18.44); + assertEquals(result.get("light"), 0.00); + } +} \ No newline at end of file