falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pavanku...@apache.org
Subject falcon git commit: FALCON-1956 Graphite Plugin for monitoring
Date Tue, 17 May 2016 11:46:50 GMT
Repository: falcon
Updated Branches:
  refs/heads/master a31fa545f -> c59aa1070


FALCON-1956 Graphite Plugin for monitoring

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: "Pavan Kumar Kolamuri <pavan.kolamuri@gmail.com>, Srikanth Sundarrajan <sriksun@hotmail.com>"

Closes #141 from PraveenAdlakha/codahale


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c59aa107
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c59aa107
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c59aa107

Branch: refs/heads/master
Commit: c59aa1070b9445227b41ad0ba04ab23ed8885689
Parents: a31fa54
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Tue May 17 17:15:04 2016 +0530
Committer: pavankumar526 <pavan.kolamuri@gmail.com>
Committed: Tue May 17 17:15:04 2016 +0530

----------------------------------------------------------------------
 .../metrics/MetricNotificationService.java      | 109 +++++++++++++++++++
 common/src/main/resources/startup.properties    |   8 ++
 metrics/pom.xml                                 |  10 ++
 pom.xml                                         |  18 +++
 .../plugin/GraphiteNotificationPlugin.java      |  82 ++++++++++++++
 src/conf/startup.properties                     |   9 +-
 6 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
new file mode 100644
index 0000000..30e6bb6
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for metrics notification.
+ */
+public class MetricNotificationService implements FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricNotificationService.class);
+
+    public static final String SERVICE_NAME = MetricNotificationService.class.getSimpleName();
+    private static final MetricNotificationService METRIC_NOTIFICATION_SERVICE = new MetricNotificationService();
+    private final GraphiteReporter graphiteReporter;
+    private final MetricRegistry metricRegistry;
+
+    private Map<String, MetricGauge> metricMap = new ConcurrentHashMap<>();
+
+    public static MetricNotificationService get(){
+        return METRIC_NOTIFICATION_SERVICE;
+    }
+
+    public MetricNotificationService(){
+        Graphite graphite = new Graphite(new InetSocketAddress(StartupProperties
+                .get().getProperty("falcon.graphite.hostname"), Integer.parseInt(StartupProperties.get()
+                    .getProperty("falcon.graphite.port"))));
+        metricRegistry=new MetricRegistry();
+        this.graphiteReporter = GraphiteReporter.forRegistry(metricRegistry)
+                .convertDurationsTo(TimeUnit.SECONDS)
+                .filter(MetricFilter.ALL)
+                .build(graphite);
+    }
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        LOG.info("Starting Graphite Service");
+        graphiteReporter.start(Long.parseLong(StartupProperties.get().getProperty("falcon.graphite.frequency")),
+                TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        graphiteReporter.stop();
+    }
+
+    private MetricGauge createMetric(final String metricName){
+        if (!metricMap.containsKey(metricName)) {
+            MetricGauge metricGauge = new MetricGauge();
+            metricMap.put(metricName, metricGauge);
+            metricRegistry.register(metricName, metricGauge);
+        }
+        return metricMap.get(metricName);
+    }
+
+    public void publish(String metricsName, Long value){
+        synchronized(this){
+            createMetric(metricsName).setValue(value);
+        }
+    }
+
+    private static class MetricGauge implements Gauge<Long> {
+
+        private Long value=0L;
+        public void setValue(Long value){
+            this.value=value;
+        }
+
+        @Override
+        public Long getValue() {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 37c09e0..2229edf 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -45,6 +45,8 @@
                         org.apache.falcon.service.ProxyUserService,\
                         org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.extensions.ExtensionService
+##Add if you want to send data to graphite
+#                        org.apache.falcon.metrics.MetricNotificationService\
 ## Add if you want to use Falcon Azure integration ##
 #                        org.apache.falcon.adfservice.ADFProviderService
 ## If you wish to use Falcon native scheduler add the commented out services below to application.services
##
@@ -310,3 +312,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists
is a NOP.
 ## If set to false, it does not create the DB schema. If the DB schema does not exist it
fails start up.
 #*.falcon.statestore.create.db.schema=true
+
+# Graphite properties
+#*.falcon.graphite.hostname=localhost
+#*.falcon.graphite.port=2003
+#*.falcon.graphite.frequency=1
+#*.falcon.graphite.prefix=falcon

http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index 5924605..1e87ac4 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -92,5 +92,15 @@
             <artifactId>mail</artifactId>
             <version>1.4.7</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+        </dependency>
+
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index da63feb..1a651ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1164,6 +1164,24 @@
                 <version>1.1.3</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.codahale.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>3.0.2</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.acplt</groupId>
+                        <artifactId>oncrpc</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>com.codahale.metrics</groupId>
+                <artifactId>metrics-graphite</artifactId>
+                <version>3.0.2</version>
+            </dependency>
+
         </dependencies>
 
     </dependencyManagement>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
new file mode 100644
index 0000000..9d46b0d
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.metrics.MetricNotificationService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Graphite Notification Plugin.
+ */
+public class GraphiteNotificationPlugin implements MonitoringPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GraphiteNotificationPlugin.class);
+
+    @Override
+    public void monitor(ResourceMessage message) {
+        MetricNotificationService metricNotificationService =
+                Services.get().getService(MetricNotificationService.SERVICE_NAME);
+        try {
+            String entityType = message.getDimensions().get("entity-type");
+            String entityName = message.getDimensions().get("entity-name");
+            String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix");
+            if (entityType.equals(EntityType.PROCESS.name())) {
+                Entity entity = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
+                Process process = (Process) entity;
+                String pipeline =  StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines()
: "default";
+
+
+                if ((message.getAction().equals("wf-instance-succeeded"))) {
+                    Long timeTaken =  message.getExecutionTime() / 1000000000;
+                    String metricsName = prefix + message.getDimensions().get("cluster")
+ pipeline
+                            + ".GENERATE." + entityName + ".processing_time";
+                    metricNotificationService.publish(metricsName, timeTaken);
+
+                    DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time"));
+                    DateTime startTime = new DateTime(message.getDimensions().get("start-time"));
+                    metricsName = prefix + message.getDimensions().get("cluster") + pipeline
+                            + ".GENERATE." + entityName + ".start_delay";
+                    metricNotificationService.publish(metricsName,
+                        (long)Seconds.secondsBetween(nominalTime, startTime).getSeconds());
+                }
+
+                if (message.getAction().equals("wf-instance-failed")){
+                    String metricName =  prefix + message.getDimensions().get("cluster")
+ pipeline
+                            + ".GENERATE." +  entityName + ".failure"
+                        + message.getDimensions().get("error-message");
+                    metricNotificationService.publish(metricName, (long) 1);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Exception in sending metrics to Graphite:", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c59aa107/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 47895dd..797b60e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -42,7 +42,6 @@
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
-                        org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
@@ -53,6 +52,8 @@
                         org.apache.falcon.service.GroupsService,\
                         org.apache.falcon.service.ProxyUserService,\
                         org.apache.falcon.extensions.ExtensionService
+##Add if you want to send data to graphite
+#                        org.apache.falcon.metrics.MetricNotificationService\
 ## Add if you want to use Falcon Azure integration ##
 #                        org.apache.falcon.adfservice.ADFProviderService
 ## If you wish to use Falcon native scheduler uncomment out below  application services and
comment out above application services ##
@@ -80,6 +81,7 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.store.ConfigurationStore
 
 
+
 # List of Lifecycle policies configured.
 *.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
 # List of builders for the policies.
@@ -309,3 +311,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
 #                     org.apache.falcon.plugin.EmailNotificationPlugin
+# Graphite properties
+#*.falcon.graphite.hostname=localhost
+#*.falcon.graphite.port=2003
+#*.falcon.graphite.frequency=1
+#*.falcon.graphite.prefix=falcon
\ No newline at end of file


Mime
View raw message