Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E228C200BFB for ; Wed, 11 Jan 2017 11:35:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E0B6F160B4E; Wed, 11 Jan 2017 10:35:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9375160B2E for ; Wed, 11 Jan 2017 11:35:40 +0100 (CET) Received: (qmail 86093 invoked by uid 500); 11 Jan 2017 10:35:40 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 86084 invoked by uid 99); 11 Jan 2017 10:35:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jan 2017 10:35:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6860DFAEB; Wed, 11 Jan 2017 10:35:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: acosentino@apache.org To: commits@camel.apache.org Message-Id: <8c2cfa98e2dc42db9e87184bc5480716@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: camel git commit: CAMEL-10690: Camel-InfluxDB: Support Querying Date: Wed, 11 Jan 2017 10:35:39 +0000 (UTC) archived-at: Wed, 11 Jan 2017 10:35:42 -0000 Repository: camel Updated Branches: refs/heads/master 14095d0e7 -> c92845358 CAMEL-10690: Camel-InfluxDB: Support Querying Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c9284535 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c9284535 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c9284535 Branch: refs/heads/master Commit: c92845358d9f1a4e81416d3c3b560a1a474186af Parents: 14095d0 Author: Andrea Cosentino Authored: Wed Jan 11 11:33:57 2017 +0100 Committer: Andrea Cosentino Committed: Wed Jan 11 11:33:57 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/influxdb-component.adoc | 4 +- .../component/influxdb/InfluxDbConstants.java | 1 + .../component/influxdb/InfluxDbEndpoint.java | 26 ++++++ .../component/influxdb/InfluxDbOperations.java | 22 +++++ .../component/influxdb/InfluxDbProducer.java | 39 +++++++++ .../influxdb/InfluxDbProducerQueryTest.java | 87 ++++++++++++++++++++ .../influxdb/MockedInfluxDbConfiguration.java | 1 + .../src/test/resources/log4j2.properties | 2 +- 8 files changed, 180 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/docs/influxdb-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/docs/influxdb-component.adoc b/components/camel-influxdb/src/main/docs/influxdb-component.adoc index ff0a5f3..d171c5f 100644 --- a/components/camel-influxdb/src/main/docs/influxdb-component.adoc +++ b/components/camel-influxdb/src/main/docs/influxdb-component.adoc @@ -52,7 +52,7 @@ The InfluxDB component has no options. // endpoint options: START -The InfluxDB component supports 5 endpoint options which are listed below: +The InfluxDB component supports 7 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -61,6 +61,8 @@ The InfluxDB component supports 5 endpoint options which are listed below: | connectionBean | producer | | String | *Required* Connection to the influx database of class InfluxDB.class | batch | producer | false | boolean | Define if this operation is a batch operation or not | databaseName | producer | | String | The name of the database where the time series will be stored +| operation | producer | insert | String | Define if this operation is an insert or a query +| query | producer | | String | Define the query in case of operation query | retentionPolicy | producer | default | String | The string that defines the retention policy to the data created by the endpoint | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). |======================================================================= http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java index 06a7124..867d4f6 100644 --- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java @@ -21,6 +21,7 @@ public final class InfluxDbConstants { public static final String MEASUREMENT_NAME = "camelInfluxDB.MeasurementName"; public static final String RETENTION_POLICY_HEADER = "camelInfluxDB.RetentionPolicy"; public static final String DBNAME_HEADER = "camelInfluxDB.databaseName"; + public static final String INFLUXDB_QUERY = "camelInfluxDB.query"; private InfluxDbConstants() { http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java index efd6d09..1dac976 100644 --- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java @@ -48,6 +48,10 @@ public class InfluxDbEndpoint extends DefaultEndpoint { private String retentionPolicy = "default"; @UriParam(defaultValue = "false") private boolean batch; + @UriParam(defaultValue = InfluxDbOperations.INSERT) + private String operation = InfluxDbOperations.INSERT; + @UriParam + private String query; public InfluxDbEndpoint(String uri, InfluxDbComponent component) { super(uri, component); @@ -134,4 +138,26 @@ public class InfluxDbEndpoint extends DefaultEndpoint { public void setBatch(boolean batch) { this.batch = batch; } + + public String getOperation() { + return operation; + } + + /** + * Define if this operation is an insert or a query + */ + public void setOperation(String operation) { + this.operation = operation; + } + + public String getQuery() { + return query; + } + + /** + * Define the query in case of operation query + */ + public void setQuery(String query) { + this.query = query; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java new file mode 100644 index 0000000..3a6f114 --- /dev/null +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.influxdb; + +public interface InfluxDbOperations { + String INSERT = "insert"; + String QUERY = "query"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java index b071c13..569784f 100644 --- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java @@ -17,10 +17,14 @@ package org.apache.camel.component.influxdb; import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +64,19 @@ public class InfluxDbProducer extends DefaultProducer { String dataBaseName = calculateDatabaseName(exchange); String retentionPolicy = calculateRetentionPolicy(exchange); + switch (endpoint.getOperation()) { + case InfluxDbOperations.INSERT: + doInsert(exchange, dataBaseName, retentionPolicy); + break; + case InfluxDbOperations.QUERY: + doQuery(exchange, dataBaseName, retentionPolicy); + break; + default: + throw new IllegalArgumentException("The operation " + endpoint.getOperation() + " is not supported"); + } + } + + private void doInsert(Exchange exchange, String dataBaseName, String retentionPolicy) throws InvalidPayloadException { if (!endpoint.isBatch()) { Point p = exchange.getIn().getMandatoryBody(Point.class); @@ -83,6 +100,13 @@ public class InfluxDbProducer extends DefaultProducer { } } + private void doQuery(Exchange exchange, String dataBaseName, String retentionPolicy) { + String query = calculateQuery(exchange); + Query influxdbQuery = new Query(query, dataBaseName); + QueryResult resultSet = connection.query(influxdbQuery); + exchange.getOut().setBody(resultSet); + } + private String calculateRetentionPolicy(Exchange exchange) { String retentionPolicy = exchange.getIn().getHeader(InfluxDbConstants.RETENTION_POLICY_HEADER, String.class); @@ -102,5 +126,20 @@ public class InfluxDbProducer extends DefaultProducer { return endpoint.getDatabaseName(); } + + private String calculateQuery(Exchange exchange) { + String query = exchange.getIn().getHeader(InfluxDbConstants.INFLUXDB_QUERY, String.class); + + if (query != null) { + return query; + } else { + query = endpoint.getQuery(); + } + + if (ObjectHelper.isEmpty(query)) { + throw new IllegalArgumentException("The query option must be set if you want to run a query operation"); + } + return query; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java new file mode 100644 index 0000000..b35e662 --- /dev/null +++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.influxdb; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Before; +import org.junit.Test; + +public class InfluxDbProducerQueryTest extends AbstractInfluxDbTest { + + @EndpointInject(uri = "mock:test") + MockEndpoint successEndpoint; + + @EndpointInject(uri = "mock:error") + MockEndpoint errorEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(0)); + + //test route + from("direct:test") + .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}") + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfluxDbConstants.INFLUXDB_QUERY, "select * from cpu"); + } + }) + .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}&operation=query") + .to("mock:test"); + } + }; + } + + @Before + public void resetEndpoints() { + errorEndpoint.reset(); + successEndpoint.reset(); + } + + @Test + public void writePointFromMapAndStaticDbName() throws InterruptedException { + + errorEndpoint.expectedMessageCount(0); + successEndpoint.expectedMessageCount(1); + + Map pointMap = createMapPoint(); + sendBody("direct:test", pointMap); + + errorEndpoint.assertIsSatisfied(); + successEndpoint.assertIsSatisfied(); + } + + private Map createMapPoint() { + Map pointMap = new HashMap<>(); + pointMap.put(InfluxDbConstants.MEASUREMENT_NAME, "MyTestMeasurement"); + pointMap.put("CPU", 1); + return pointMap; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java index 1705aa2..632200d 100644 --- a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java +++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import static junit.framework.TestCase.assertNotNull; import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/test/resources/log4j2.properties b/components/camel-influxdb/src/test/resources/log4j2.properties index aa36063..58c249f 100644 --- a/components/camel-influxdb/src/test/resources/log4j2.properties +++ b/components/camel-influxdb/src/test/resources/log4j2.properties @@ -24,5 +24,5 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n -rootLogger.level = INFO +rootLogger.level = DEBUG rootLogger.appenderRef.file.ref = file