flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [7/7] flink git commit: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter
Date Wed, 10 May 2017 07:06:35 GMT
[FLINK-6013][metrics] Add Datadog HTTP metrics reporter

This closes #3736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54ceec16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54ceec16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54ceec16

Branch: refs/heads/master
Commit: 54ceec16c11655da4181c0816a3b12d1c4bab465
Parents: 50baec6
Author: Bowen Li <bowenli86@gmail.com>
Authored: Tue Apr 18 10:27:17 2017 -0700
Committer: zentol <chesnay@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  24 +++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-datadog/pom.xml     | 108 ++++++++++
 .../apache/flink/metrics/datadog/DCounter.java  |  44 ++++
 .../apache/flink/metrics/datadog/DGauge.java    |  45 ++++
 .../apache/flink/metrics/datadog/DMeter.java    |  42 ++++
 .../apache/flink/metrics/datadog/DMetric.java   |  84 ++++++++
 .../apache/flink/metrics/datadog/DSeries.java   |  45 ++++
 .../metrics/datadog/DatadogHttpClient.java      |  97 +++++++++
 .../metrics/datadog/DatadogHttpReporter.java    | 210 +++++++++++++++++++
 .../flink/metrics/datadog/MetricType.java       |  30 +++
 .../metrics/datadog/DatadogHttpClientTest.java  | 199 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 +++
 flink-metrics/pom.xml                           |   1 +
 15 files changed, 970 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 290a452..2bc65a6 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar`
into the `/lib` folder
+of your Flink distribution.
+
+Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`,
`<subtask_index>`, `<task_name>`, and `<operator_name>`,
+will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog.
Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+{% endhighlight %}
+
 ## System metrics
 
 By default Flink gathers several metrics that provide deep insights on the current state.

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 9773991..6d8debf 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -202,6 +202,13 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-datadog</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 		<!-- end optional Flink metrics reporters -->
 
 		<!-- start optional Flink libraries -->

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 95218d7..0386b92 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -105,6 +105,13 @@
 		</file>
 
 		<file>
+			<source>../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-datadog-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
 			<source>../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-shaded-hadoop2-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml
new file mode 100644
index 0000000..0d473fc
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-datadog</artifactId>
+	<name>flink-metrics-datadog</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp</artifactId>
+			<version>3.7.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.squareup.okio</groupId>
+			<artifactId>okio</artifactId>
+			<version>1.12.0</version>
+		</dependency>
+
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>okhttp3</pattern>
+									<shadedPattern>org.apache.flink.shaded.okhttp3</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>okio</pattern>
+									<shadedPattern>org.apache.flink.shaded.okio</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
new file mode 100644
index 0000000..58abbd6
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.List;
+
+/**
+ * Mapping of counter between Flink and Datadog
+ * */
+public class DCounter extends DMetric {
+	private final Counter counter;
+
+	public DCounter(Counter c, String metricName, String host, List<String> tags) {
+		super(MetricType.counter, metricName, host, tags);
+		counter = c;
+	}
+
+	/**
+	 * Visibility of this method must not be changed
+	 * since we deliberately not map it to json object in a Datadog-defined format
+	 * */
+	@Override
+	public Number getMetricValue() {
+		return counter.getCount();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
new file mode 100644
index 0000000..8deb117
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+
+import org.apache.flink.metrics.Gauge;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
+	private final Gauge<Number> gauge;
+
+	public DGauge(Gauge<Number> g, String metricName, String host, List<String>
tags) {
+		super(MetricType.gauge, metricName, host, tags);
+		gauge = g;
+	}
+
+	/**
+	 * Visibility of this method must not be changed
+	 * since we deliberately not map it to json object in a Datadog-defined format
+	 * */
+	@Override
+	public Number getMetricValue() {
+		return gauge.getValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
new file mode 100644
index 0000000..181a00c
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
+ * */
+public class DMeter extends DMetric {
+	private final Meter meter;
+
+	public DMeter(Meter m, String metricName, String host, List<String> tags) {
+		super(MetricType.gauge, metricName, host, tags);
+		meter = m;
+	}
+
+	@Override
+	public Number getMetricValue() {
+		return meter.getRate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
new file mode 100644
index 0000000..3f9d6ff
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class DMetric {
+	private static final long MILLIS_TO_SEC = 1000L;
+
+	/**
+	 * Names of metric/type/tags field and their getters must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	private final String metric; // Metric name
+	private final MetricType type;
+	private final String host;
+	private final List<String> tags;
+
+	public DMetric(MetricType metricType, String metric, String host, List<String> tags)
{
+		this.type = metricType;
+		this.metric = metric;
+		this.host = host;
+		this.tags = tags;
+	}
+
+	public MetricType getType() {
+		return type;
+	}
+
+	public String getMetric() {
+		return metric;
+	}
+
+	public String getHost() {
+		return host;
+	}
+
+	public List<String> getTags() {
+		return tags;
+	}
+
+	public List<List<Number>> getPoints() {
+		// One single data point
+		List<Number> point = new ArrayList<>();
+		point.add(getUnixEpochTimestamp());
+		point.add(getMetricValue());
+
+		List<List<Number>> points = new ArrayList<>();
+		points.add(point);
+
+		return points;
+	}
+
+	@JsonIgnore
+	public abstract Number getMetricValue();
+
+	public static long getUnixEpochTimestamp() {
+		return (System.currentTimeMillis() / MILLIS_TO_SEC);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
new file mode 100644
index 0000000..fb0bb09
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Json serialization between Flink and Datadog
+ **/
+public class DSeries {
+	/**
+	 * Names of series field and its getters must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	private List<DMetric> series;
+
+	public DSeries() {
+		series = new ArrayList<>();
+	}
+
+	public void addMetric(DMetric metric) {
+		series.add(metric);
+	}
+
+	public List<DMetric> getSeries() {
+		return series;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
new file mode 100644
index 0000000..dfbcee1
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+	private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s";
+	private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s";
+	private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
+	private static final int TIMEOUT = 3;
+	private static final ObjectMapper MAPPER = new ObjectMapper();
+
+	private final String seriesUrl;
+	private final String validateUrl;
+	private final OkHttpClient client;
+	private final String apiKey;
+
+	public DatadogHttpClient(String dgApiKey) {
+		if (dgApiKey == null || dgApiKey.isEmpty()) {
+			throw new IllegalArgumentException("Invalid API key:" + dgApiKey);
+		}
+
+		apiKey = dgApiKey;
+		client = new OkHttpClient.Builder()
+			.connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.readTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.build();
+
+		seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+		validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+		validateApiKey();
+	}
+
+	private void validateApiKey() {
+		Request r = new Request.Builder().url(validateUrl).get().build();
+
+		try {
+			Response response = client.newCall(r).execute();
+			if (!response.isSuccessful()) {
+				throw new IllegalArgumentException(
+					String.format("API key: %s is invalid", apiKey));
+			}
+		} catch(IOException e) {
+			throw new IllegalStateException("Failed contacting Datadog to validate API key", e);
+		}
+	}
+
+	public void send(DatadogHttpReporter.DatadogHttpRequest request) throws Exception {
+		String postBody = serialize(request.getSeries());
+
+		Request r = new Request.Builder()
+			.url(seriesUrl)
+			.post(RequestBody.create(MEDIA_TYPE, postBody))
+			.build();
+
+		client.newCall(r).execute().close();
+	}
+
+	public static String serialize(Object obj) throws JsonProcessingException {
+		return MAPPER.writeValueAsString(obj);
+	}
+
+	public void close() {
+		client.dispatcher().executorService().shutdown();
+		client.connectionPool().evictAll();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
new file mode 100644
index 0000000..fcb5c4b
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
+	private static final String HOST_VARIABLE = "<host>";
+
+	// Both Flink's Gauge and Meter values are taken as gauge in Datadog
+	private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>();
+	private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>();
+	private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>();
+
+	private DatadogHttpClient client;
+	private List<String> configTags;
+
+	public static final String API_KEY = "apikey";
+	public static final String TAGS = "tags";
+
+	@Override
+	public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+		final String name = group.getMetricIdentifier(metricName);
+
+		List<String> tags = new ArrayList<>(configTags);
+		tags.addAll(getTagsFromMetricGroup(group));
+		String host = getHostFromMetricGroup(group);
+
+		if (metric instanceof Counter) {
+			Counter c = (Counter) metric;
+			counters.put(c, new DCounter(c, name, host, tags));
+		} else if (metric instanceof Gauge) {
+			Gauge g = (Gauge) metric;
+			gauges.put(g, new DGauge(g, name, host, tags));
+		} else if (metric instanceof Meter) {
+			Meter m = (Meter) metric;
+			// Only consider rate
+			meters.put(m, new DMeter(m, name, host, tags));
+		} else if (metric instanceof Histogram) {
+			LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
+		} else {
+			LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
+				"does not support this metric type.", metric.getClass().getName());
+		}
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+		if (metric instanceof Counter) {
+			counters.remove(metric);
+		} else if (metric instanceof Gauge) {
+			gauges.remove(metric);
+		} else if (metric instanceof Meter) {
+			meters.remove(metric);
+		} else if (metric instanceof Histogram) {
+			// No Histogram is registered
+		} else {
+			LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter "
+
+				"does not support this metric type.", metric.getClass().getName());
+		}
+	}
+
+	@Override
+	public void open(MetricConfig config) {
+		client = new DatadogHttpClient(config.getString(API_KEY, null));
+		LOGGER.info("Configured DatadogHttpReporter");
+
+		configTags = getTagsFromConfig(config.getString(TAGS, ""));
+	}
+
+	@Override
+	public void close() {
+		client.close();
+		LOGGER.info("Shut down DatadogHttpReporter");
+	}
+
+	@Override
+	public void report() {
+		DatadogHttpRequest request = new DatadogHttpRequest();
+
+		for (Map.Entry<Gauge, DGauge> entry : gauges.entrySet()) {
+			DGauge g = entry.getValue();
+			try {
+				// Will throw exception if the Gauge is not of Number type
+				// Flink uses Gauge to store many types other than Number
+				g.getMetricValue();
+				request.addGauge(g);
+			} catch (Exception e) {
+				// Remove that Gauge if it's not of Number type
+				gauges.remove(entry.getKey());
+			}
+		}
+
+		for (DCounter c : counters.values()) {
+			request.addCounter(c);
+		}
+
+		for (DMeter m : meters.values()) {
+			request.addMeter(m);
+		}
+
+		try {
+			client.send(request);
+		} catch (Exception e) {
+			LOGGER.warn("Failed reporting metrics to Datadog.", e);
+		}
+	}
+
+	/**
+	 * Get config tags from config 'metrics.reporter.dghttp.tags'
+	 * */
+	private List<String> getTagsFromConfig(String str) {
+		return Arrays.asList(str.split(","));
+	}
+
+	/**
+	 * Get tags from MetricGroup#getAllVariables(), excluding 'host'
+	 * */
+	private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
+		List<String> tags = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) {
+			if(!entry.getKey().equals(HOST_VARIABLE)) {
+				tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue());
+			}
+		}
+
+		return tags;
+	}
+
+	/**
+	 * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
+	 * */
+	private String getHostFromMetricGroup(MetricGroup metricGroup) {
+		return metricGroup.getAllVariables().get(HOST_VARIABLE);
+	}
+
+	/**
+	 * Given "<xxx>", return "xxx"
+	 * */
+	private String getVariableName(String str) {
+		return str.substring(1, str.length() - 1);
+	}
+
+	/**
+	 * Compact metrics in batch, serialize them, and send to Datadog via HTTP
+	 * */
+	static class DatadogHttpRequest {
+		private final DSeries series;
+
+		public DatadogHttpRequest() {
+			series = new DSeries();
+		}
+
+		public void addGauge(DGauge gauge) {
+			series.addMetric(gauge);
+		}
+
+		public void addCounter(DCounter counter) {
+			series.addMetric(counter);
+		}
+
+		public void addMeter(DMeter meter) {
+			series.addMetric(meter);
+		}
+
+		public DSeries getSeries() {
+			return series;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
new file mode 100644
index 0000000..97f9b29
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+/**
+ * Metric types supported by Datadog
+ * */
+public enum MetricType {
+	/**
+	 * Names of 'gauge' and 'counter' must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	gauge, counter
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
new file mode 100644
index 0000000..bda5d47
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+	public static class TestApiKey {
+		@Test(expected = IllegalArgumentException.class)
+		public void testClientWithEmptyKey() {
+			new DatadogHttpClient("");
+		}
+
+		@Test(expected = IllegalArgumentException.class)
+		public void testClientWithNullKey() {
+			new DatadogHttpClient(null);
+		}
+	}
+
+	@RunWith(PowerMockRunner.class)
+	@PrepareForTest(DMetric.class)
+	public static class TestSerialization {
+		private static List<String> tags = Arrays.asList("tag1", "tag2");
+
+		private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+		@Before
+		public void mockSystemMillis() {
+			PowerMockito.mockStatic(DMetric.class);
+			PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+		}
+
+		@Test
+		public void serializeGauge() throws JsonProcessingException {
+
+			DGauge g = new DGauge(new Gauge<Number>() {
+				@Override
+				public Number getValue() {
+					return 1;
+				}
+			}, "testCounter", "localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(g));
+		}
+
+		@Test
+		public void serializeGaugeWithoutHost() throws JsonProcessingException {
+
+			DGauge g = new DGauge(new Gauge<Number>() {
+				@Override
+				public Number getValue() {
+					return 1;
+				}
+			}, "testCounter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(g));
+		}
+
+		@Test
+		public void serializeCounter() throws JsonProcessingException {
+			DCounter c = new DCounter(new Counter() {
+				@Override
+				public void inc() {}
+
+				@Override
+				public void inc(long n) {}
+
+				@Override
+				public void dec() {}
+
+				@Override
+				public void dec(long n) {}
+
+				@Override
+				public long getCount() {
+					return 1;
+				}
+			}, "testCounter", "localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(c));
+		}
+
+		@Test
+		public void serializeCounterWithoutHost() throws JsonProcessingException {
+			DCounter c = new DCounter(new Counter() {
+				@Override
+				public void inc() {}
+
+				@Override
+				public void inc(long n) {}
+
+				@Override
+				public void dec() {}
+
+				@Override
+				public void dec(long n) {}
+
+				@Override
+				public long getCount() {
+					return 1;
+				}
+			}, "testCounter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(c));
+		}
+
+		@Test
+		public void serializeMeter() throws JsonProcessingException {
+
+			DMeter m = new DMeter(new Meter() {
+				@Override
+				public void markEvent() {}
+
+				@Override
+				public void markEvent(long n) {}
+
+				@Override
+				public double getRate() {
+					return 1;
+				}
+
+				@Override
+				public long getCount() {
+					return 0;
+				}
+			}, "testMeter","localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+				DatadogHttpClient.serialize(m));
+		}
+
+		@Test
+		public void serializeMeterWithoutHost() throws JsonProcessingException {
+
+			DMeter m = new DMeter(new Meter() {
+				@Override
+				public void markEvent() {}
+
+				@Override
+				public void markEvent(long n) {}
+
+				@Override
+				public double getRate() {
+					return 1;
+				}
+
+				@Override
+				public long getCount() {
+					return 0;
+				}
+			}, "testMeter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+				DatadogHttpClient.serialize(m));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 317dde8..e1b66c2 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -40,6 +40,7 @@ under the License.
 		<module>flink-metrics-graphite</module>
 		<module>flink-metrics-jmx</module>
 		<module>flink-metrics-statsd</module>
+		<module>flink-metrics-datadog</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up


Mime
View raw message