Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EAEFE2009A8 for ; Tue, 17 May 2016 13:46:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E96EF1609F5; Tue, 17 May 2016 11:46:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E33BE1609AE for ; Tue, 17 May 2016 13:46:50 +0200 (CEST) Received: (qmail 95502 invoked by uid 500); 17 May 2016 11:46:50 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 95492 invoked by uid 99); 17 May 2016 11:46:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2016 11:46:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04DA9DFADE; Tue, 17 May 2016 11:46:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pavankumar@apache.org To: commits@falcon.apache.org Message-Id: <295d583d5ca04da1860f2d6a12f65145@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1956 Graphite Plugin for monitoring Date: Tue, 17 May 2016 11:46:50 +0000 (UTC) archived-at: Tue, 17 May 2016 11:46:52 -0000 Repository: falcon Updated Branches: refs/heads/master a31fa545f -> c59aa1070 FALCON-1956 Graphite Plugin for monitoring Author: Praveen Adlakha Reviewers: "Pavan Kumar Kolamuri , Srikanth Sundarrajan " Closes #141 from PraveenAdlakha/codahale Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c59aa107 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c59aa107 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c59aa107 Branch: refs/heads/master Commit: c59aa1070b9445227b41ad0ba04ab23ed8885689 Parents: a31fa54 Author: Praveen Adlakha Authored: Tue May 17 17:15:04 2016 +0530 Committer: pavankumar526 Committed: Tue May 17 17:15:04 2016 +0530 ---------------------------------------------------------------------- .../metrics/MetricNotificationService.java | 109 +++++++++++++++++++ common/src/main/resources/startup.properties | 8 ++ metrics/pom.xml | 10 ++ pom.xml | 18 +++ .../plugin/GraphiteNotificationPlugin.java | 82 ++++++++++++++ src/conf/startup.properties | 9 +- 6 files changed, 235 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java new file mode 100644 index 0000000..30e6bb6 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import org.apache.falcon.FalconException; +import org.apache.falcon.service.FalconService; +import org.apache.falcon.util.StartupProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * Service for metrics notification. + */ +public class MetricNotificationService implements FalconService { + private static final Logger LOG = LoggerFactory.getLogger(MetricNotificationService.class); + + public static final String SERVICE_NAME = MetricNotificationService.class.getSimpleName(); + private static final MetricNotificationService METRIC_NOTIFICATION_SERVICE = new MetricNotificationService(); + private final GraphiteReporter graphiteReporter; + private final MetricRegistry metricRegistry; + + private Map metricMap = new ConcurrentHashMap<>(); + + public static MetricNotificationService get(){ + return METRIC_NOTIFICATION_SERVICE; + } + + public MetricNotificationService(){ + Graphite graphite = new Graphite(new InetSocketAddress(StartupProperties + .get().getProperty("falcon.graphite.hostname"), Integer.parseInt(StartupProperties.get() + .getProperty("falcon.graphite.port")))); + metricRegistry=new MetricRegistry(); + this.graphiteReporter = GraphiteReporter.forRegistry(metricRegistry) + .convertDurationsTo(TimeUnit.SECONDS) + .filter(MetricFilter.ALL) + .build(graphite); + } + + @Override + public String getName() { + return SERVICE_NAME; + } + + @Override + public void init() throws FalconException { + LOG.info("Starting Graphite Service"); + graphiteReporter.start(Long.parseLong(StartupProperties.get().getProperty("falcon.graphite.frequency")), + TimeUnit.SECONDS); + } + + @Override + public void destroy() throws FalconException { + graphiteReporter.stop(); + } + + private MetricGauge createMetric(final String metricName){ + if (!metricMap.containsKey(metricName)) { + MetricGauge metricGauge = new MetricGauge(); + metricMap.put(metricName, metricGauge); + metricRegistry.register(metricName, metricGauge); + } + return metricMap.get(metricName); + } + + public void publish(String metricsName, Long value){ + synchronized(this){ + createMetric(metricsName).setValue(value); + } + } + + private static class MetricGauge implements Gauge { + + private Long value=0L; + public void setValue(Long value){ + this.value=value; + } + + @Override + public Long getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 37c09e0..2229edf 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -45,6 +45,8 @@ org.apache.falcon.service.ProxyUserService,\ org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.extensions.ExtensionService +##Add if you want to send data to graphite +# org.apache.falcon.metrics.MetricNotificationService\ ## Add if you want to use Falcon Azure integration ## # org.apache.falcon.adfservice.ADFProviderService ## If you wish to use Falcon native scheduler add the commented out services below to application.services ## @@ -310,3 +312,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. #*.falcon.statestore.create.db.schema=true + +# Graphite properties +#*.falcon.graphite.hostname=localhost +#*.falcon.graphite.port=2003 +#*.falcon.graphite.frequency=1 +#*.falcon.graphite.prefix=falcon http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/metrics/pom.xml ---------------------------------------------------------------------- diff --git a/metrics/pom.xml b/metrics/pom.xml index 5924605..1e87ac4 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -92,5 +92,15 @@ mail 1.4.7 + + + com.codahale.metrics + metrics-core + + + com.codahale.metrics + metrics-graphite + + http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index da63feb..1a651ea 100644 --- a/pom.xml +++ b/pom.xml @@ -1164,6 +1164,24 @@ 1.1.3 + + com.codahale.metrics + metrics-core + 3.0.2 + + + org.acplt + oncrpc + + + + + + com.codahale.metrics + metrics-graphite + 3.0.2 + + http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java new file mode 100644 index 0000000..9d46b0d --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.plugin; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.metrics.MetricNotificationService; +import org.apache.falcon.service.Services; +import org.apache.falcon.util.StartupProperties; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Graphite Notification Plugin. + */ +public class GraphiteNotificationPlugin implements MonitoringPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(GraphiteNotificationPlugin.class); + + @Override + public void monitor(ResourceMessage message) { + MetricNotificationService metricNotificationService = + Services.get().getService(MetricNotificationService.SERVICE_NAME); + try { + String entityType = message.getDimensions().get("entity-type"); + String entityName = message.getDimensions().get("entity-name"); + String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix"); + if (entityType.equals(EntityType.PROCESS.name())) { + Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName); + Process process = (Process) entity; + String pipeline = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() : "default"; + + + if ((message.getAction().equals("wf-instance-succeeded"))) { + Long timeTaken = message.getExecutionTime() / 1000000000; + String metricsName = prefix + message.getDimensions().get("cluster") + pipeline + + ".GENERATE." + entityName + ".processing_time"; + metricNotificationService.publish(metricsName, timeTaken); + + DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time")); + DateTime startTime = new DateTime(message.getDimensions().get("start-time")); + metricsName = prefix + message.getDimensions().get("cluster") + pipeline + + ".GENERATE." + entityName + ".start_delay"; + metricNotificationService.publish(metricsName, + (long)Seconds.secondsBetween(nominalTime, startTime).getSeconds()); + } + + if (message.getAction().equals("wf-instance-failed")){ + String metricName = prefix + message.getDimensions().get("cluster") + pipeline + + ".GENERATE." + entityName + ".failure" + + message.getDimensions().get("error-message"); + metricNotificationService.publish(metricName, (long) 1); + } + } + } catch (Exception e) { + LOG.error("Exception in sending metrics to Graphite:", e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 47895dd..797b60e 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -42,7 +42,6 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ - org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.ConfigurationStore,\ @@ -53,6 +52,8 @@ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService,\ org.apache.falcon.extensions.ExtensionService +##Add if you want to send data to graphite +# org.apache.falcon.metrics.MetricNotificationService\ ## Add if you want to use Falcon Azure integration ## # org.apache.falcon.adfservice.ADFProviderService ## If you wish to use Falcon native scheduler uncomment out below application services and comment out above application services ## @@ -80,6 +81,7 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.ConfigurationStore + # List of Lifecycle policies configured. *.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete # List of builders for the policies. @@ -309,3 +311,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Setting monitoring plugin, if SMTP parameters is defined #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ # org.apache.falcon.plugin.EmailNotificationPlugin +# Graphite properties +#*.falcon.graphite.hostname=localhost +#*.falcon.graphite.port=2003 +#*.falcon.graphite.frequency=1 +#*.falcon.graphite.prefix=falcon \ No newline at end of file