falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-2044 Persist Process stats in db
Date Tue, 29 Nov 2016 18:09:45 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 49fa46e29 -> 1f28bde6f


FALCON-2044 Persist Process stats in db

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: Ajay Yadava <ajayyadava@apache.org>

Closes #196 from PraveenAdlakha/2044


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1f28bde6
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1f28bde6
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1f28bde6

Branch: refs/heads/master
Commit: 1f28bde6f49aedd2ca95181f483c609a5304aecc
Parents: 49fa46e
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Tue Nov 29 13:09:23 2016 -0500
Committer: Ajay Yadava <ajayyadava@apache.org>
Committed: Tue Nov 29 13:09:23 2016 -0500

----------------------------------------------------------------------
 .../persistence/PersistenceConstants.java       |   2 +-
 .../persistence/ProcessInstanceInfoBean.java    | 131 +++++++++++++++++++
 .../falcon/tools/FalconStateStoreDBCLI.java     |   1 +
 .../src/main/resources/META-INF/persistence.xml |  16 ++-
 common/src/main/resources/startup.properties    |   3 +-
 .../site/twiki/GraphiteMetricCollection.twiki   |  24 ----
 docs/src/site/twiki/MetricCollection.twiki      |  37 ++++++
 docs/src/site/twiki/Operability.twiki           |   6 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   |  36 +++++
 .../plugin/ProcessExecutionStatsPlugin.java     |  76 +++++++++++
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  11 ++
 src/build/findbugs-exclude.xml                  |   5 +
 src/conf/startup.properties                     |   5 +-
 13 files changed, 317 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index fc82ae7..26a5cd4 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -79,5 +79,5 @@ public final class PersistenceConstants {
     public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
     public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
     public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
-
+    public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
new file mode 100644
index 0000000..c408510
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Class to store info regarding process history.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name= PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES , query = "select
 OBJECT(a) from ProcessInstanceInfoBean a ")
+})
+@Table(name = "ProcessInstanceInfo")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class ProcessInstanceInfoBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @NotNull
+    @Column(name = "process_name")
+    private String processName;
+
+    @NotNull
+    @Column(name = "colo")
+    private String colo;
+
+    public String getPipeline() {
+        return pipeline;
+    }
+
+    public void setPipeline(String pipeline) {
+        this.pipeline = pipeline;
+    }
+
+    @NotNull
+    @Column(name = "pipeline")
+    private String pipeline;
+
+    @NotNull
+    @Column(name = "status")
+    private String status;
+
+    @NotNull
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    @NotNull
+    @Column(name = "start_delay")
+    private long startDelay;
+
+    @NotNull
+    @Column(name = "processing_time")
+    private long processingTime;
+
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    public String getProcessName() {
+        return processName;
+    }
+
+    public void setProcessName(String processName) {
+        this.processName = processName;
+    }
+
+    public String getColo() {
+        return colo;
+    }
+
+    public void setColo(String colo) {
+        this.colo = colo;
+    }
+
+    public long getStartDelay() {
+        return startDelay;
+    }
+
+    public void setStartDelay(long startDelay) {
+        this.startDelay = startDelay;
+    }
+
+    public long getProcessingTime() {
+        return processingTime;
+    }
+
+    public void setProcessingTime(long processingTime) {
+        this.processingTime = processingTime;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 0c04da3..6ad887e 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -249,6 +249,7 @@ public class FalconStateStoreDBCLI {
         args.add("org.apache.falcon.persistence.BacklogMetricBean");
         args.add("org.apache.falcon.persistence.ExtensionBean");
         args.add("org.apache.falcon.persistence.ExtensionJobsBean");
+        args.add("org.apache.falcon.persistence.ProcessInstanceInfoBean");
         return args.toArray(new String[args.size()]);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index 0f20103..8d0bd25 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -31,7 +31,7 @@
         <class>org.apache.falcon.persistence.BacklogMetricBean</class>
         <class>org.apache.falcon.persistence.ExtensionBean</class>
         <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
-
+        <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
 
@@ -41,8 +41,8 @@
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
                 org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
-                org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
-
+                org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+                org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
             <property name="openjpa.ReadLockLevel" value="read"/>
@@ -67,6 +67,7 @@
         <class>org.apache.falcon.persistence.BacklogMetricBean</class>
         <class>org.apache.falcon.persistence.ExtensionBean</class>
         <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
+        <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
 
@@ -76,8 +77,8 @@
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
                 org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
-                org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
-
+                org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+                org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
             <property name="openjpa.ReadLockLevel" value="read"/>
@@ -101,6 +102,7 @@
         <class>org.apache.falcon.persistence.BacklogMetricBean</class>
         <class>org.apache.falcon.persistence.ExtensionBean</class>
         <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
+        <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
 
@@ -110,8 +112,8 @@
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
                 org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
-                org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionBean;
-                org.apache.falcon.persistence.ExtensionJobsBean)"/>
+                org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+                org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
             <property name="openjpa.ReadLockLevel" value="read"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 5d5da5a..f91f3b6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -313,7 +313,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-#                     org.apache.falcon.plugin.EmailNotificationPlugin
+#                     org.apache.falcon.plugin.EmailNotificationPlugin,\
+#                     org.apache.falcon.plugin.ProcessExecutionStatsPlugin
 
 ######### StateStore Properties #####
 #*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/GraphiteMetricCollection.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/GraphiteMetricCollection.twiki b/docs/src/site/twiki/GraphiteMetricCollection.twiki
deleted file mode 100644
index 0ae0498..0000000
--- a/docs/src/site/twiki/GraphiteMetricCollection.twiki
+++ /dev/null
@@ -1,24 +0,0 @@
----++Graphite Metric Collection
-
-Graphite Metric Collection currently allows to collect the following metrics at process level
:
-
-1. Processing time the process spent in the running state in seconds (workflow_end_time -
workflow_start_time)
-2. Wait time that the process spent in the waiting/ready state. (workflow_start_time - workflow_nominal_time)
-3. Number of instances that are failed for a process.
-
-To send data to graphite we need to intialize metricNotificationService in startup.properties:
-
-<verbatim>
-*.application.services= org.apache.falcon.metrics.MetricNotificationService,
-</verbatim>
-
-Add following properties for graphiteNotificationPlugin :
-
-*Graphite properties*
-<verbatim>
-   * *.falcon.graphite.hostname=localhost
-   * *.falcon.graphite.port=2003
-   * *.falcon.graphite.frequency=1
-   * *.falcon.graphite.prefix=falcon
-</verbatim>
-The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite
is in seconds.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/MetricCollection.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/MetricCollection.twiki b/docs/src/site/twiki/MetricCollection.twiki
new file mode 100644
index 0000000..636c739
--- /dev/null
+++ b/docs/src/site/twiki/MetricCollection.twiki
@@ -0,0 +1,37 @@
+---++Metric Collection
+
+Metric Collection currently allows to collect the following metrics at process level:
+
+   1. Processing time the process spent in the running state in seconds (workflow_end_time
- workflow_start_time)
+   1.Wait time that the process spent in the waiting/ready state. (workflow_start_time -
workflow_nominal_time)
+   1.Number of instances that are failed for a process.
+
+To send data to *Graphite*
+
+Falcon need to intialize metricNotificationService in startup.properties:
+
+<verbatim>
+*.application.services= org.apache.falcon.metrics.MetricNotificationService,
+</verbatim>
+
+Add following properties for graphiteNotificationPlugin :
+
+*Graphite properties*
+<verbatim>
+   * *.falcon.graphite.hostname=localhost
+   * *.falcon.graphite.port=2003
+   * *.falcon.graphite.frequency=1
+   * *.falcon.graphite.prefix=falcon
+</verbatim>
+The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite
is in seconds.
+
+
+To send data to *Falcon DB*
+
+Falcon needs to *!ProcessInstanceInfo* table in the database have a look at [[FalconDatabase]]
to know how to create it.
+
+Add the following properties in the startup.properties:
+
+<verbatim>
+#*.monitoring.plugins=org.apache.falcon.plugin.ProcessExecutionStatsPlugin
+</verbatim>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/Operability.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Operability.twiki b/docs/src/site/twiki/Operability.twiki
index f01c235..e21ada8 100644
--- a/docs/src/site/twiki/Operability.twiki
+++ b/docs/src/site/twiki/Operability.twiki
@@ -228,6 +228,8 @@ Users may also extend the Falcon Audit plugin to send audits to systems
like Apa
 extending org.apache.falcon.plugin.AuditingPlugin interface.
 
 
----++ Metrics Collection In Graphite
+---++ Metrics Collection In Graphite and Database
 
-Falcon has support to send metrics to graphite more details regarding this can be found on
[[GraphiteMetricCollection][Graphite Metric Collection]]
\ No newline at end of file
+Falcon has support to send process metrics like waiting time ,exection time and number of
failures to graphite and falcon db.
+
+For details go through [[MetricCollection][Metric Collection]]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 552ebde..669e18d 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -25,6 +25,7 @@ import org.apache.falcon.persistence.PendingInstanceBean;
 import org.apache.falcon.persistence.PersistenceConstants;
 import org.apache.falcon.persistence.ResultNotFoundException;
 import org.apache.falcon.persistence.EntitySLAAlertBean;
+import org.apache.falcon.persistence.ProcessInstanceInfoBean;
 import org.apache.falcon.service.FalconJPAService;
 
 import javax.persistence.EntityManager;
@@ -198,6 +199,41 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
+    public void putProcessInstance(String processName, String colo, Long nominalTime, Long
startDelay,
+                                   Long processingTime, String pipeline, String status){
+        ProcessInstanceInfoBean processInstanceInfoBean = new ProcessInstanceInfoBean();
+        processInstanceInfoBean.setProcessName(processName);
+        processInstanceInfoBean.setColo(colo);
+        processInstanceInfoBean.setNominalTime(new Date(nominalTime));
+        processInstanceInfoBean.setStartDelay(startDelay);
+        processInstanceInfoBean.setProcessingTime(processingTime);
+        processInstanceInfoBean.setPipeline(pipeline);
+        processInstanceInfoBean.setStatus(status);
+
+        EntityManager entityManager = getEntityManager();
+        try {
+            beginTransaction(entityManager);
+            entityManager.persist(processInstanceInfoBean);
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public List<ProcessInstanceInfoBean> getAllInstancesProcessInstance(){
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES);
+        List result = q.getResultList();
+
+        try {
+            if (CollectionUtils.isEmpty(result)) {
+                return null;
+            }
+        } finally{
+            entityManager.close();
+        }
+        return result;
+    }
+
     private void commitAndCloseTransaction(EntityManager entityManager) {
         entityManager.getTransaction().commit();
         entityManager.close();

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
new file mode 100644
index 0000000..676c17b
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.plugin;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This plugin writes process completion time ,number of failures and wait time to DB.
+ */
+public class ProcessExecutionStatsPlugin implements MonitoringPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionStatsPlugin.class);
+
+    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
+    @Override
+    public void monitor(ResourceMessage message) {
+        try {
+            String entityType = StringUtils.isNotBlank(message.getDimensions().get("entityType"))
+                    ? message.getDimensions().get("entityType") :message.getDimensions().get("entity-type");
+            String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName"))
+                    ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name");
+            LOG.debug("message:" + message.getAction());
+            if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())
+                    && ConfigurationStore.get().get(EntityType.PROCESS, entityName)
!= null) {
+                Process process = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
+                String pipelines =  StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines()
+                        : "__untagged";
+                String cluster =  message.getDimensions().get("cluster");
+                DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time"));
+                DateTime startTime = new DateTime(message.getDimensions().get("start-time"));
+                Long startDelay = (long) Seconds.secondsBetween(nominalTime, startTime).getSeconds();
+                Long timeTaken =  message.getExecutionTime() / 1000000000;
+
+                String [] pipelineNames = pipelines.split(",");
+
+                for(String name : pipelineNames){
+
+                    if ((message.getAction().equals("wf-instance-succeeded"))) {
+                        MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster,
nominalTime.getMillis(),
+                                startDelay, timeTaken, name, "succeeded");
+                    }
+                    if (message.getAction().equals("wf-instance-failed")){
+                        MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster,
nominalTime.getMillis(),
+                                startDelay, timeTaken, name, "failed");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Exception in sending metrics to FalconDB:", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index a64b654..860fbfc 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.jdbc;
 
 import java.io.File;
 import java.util.Date;
+import java.util.List;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
@@ -27,6 +28,7 @@ import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.persistence.ProcessInstanceInfoBean;
 import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.tools.FalconStateStoreDBCLI;
 import org.apache.falcon.util.StateStoreProperties;
@@ -173,6 +175,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
                 "test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed());
     }
 
+    @Test
+    public void putProcessInstance() throws Exception{
+        MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+        store.putProcessInstance("test-process", "test-colo", 1466602429423L, 99999999L,
99999999L, "test", "failed");
+        List<ProcessInstanceInfoBean> list =  store.getAllInstancesProcessInstance();
+        ProcessInstanceInfoBean processInstanceInfoBean =  list.get(0);
+        Assert.assertEquals("test-process", processInstanceInfoBean.getProcessName());
+    }
+
     private void clear() {
         EntityManager em = FalconJPAService.get().getEntityManager();
         em.getTransaction().begin();

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 04e267f..189f2f8 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -75,6 +75,11 @@
     </Match>
 
     <Match>
+        <Class name="org.apache.falcon.persistence.ProcessInstanceInfoBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+    </Match>
+
+    <Match>
         <Class name="org.apache.falcon.persistence.ExtensionJobsBean" />
         <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
     </Match>

http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 6a95cce..901c3a9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -358,8 +358,11 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 #*.falcon.email.smtp.password=""
 
 # Setting monitoring plugin, if SMTP parameters is defined
+# DefaultMonitoringPlugin
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-#                     org.apache.falcon.plugin.EmailNotificationPlugin
+#                     org.apache.falcon.plugin.EmailNotificationPlugin,\
+#                     org.apache.falcon.plugin.ProcessExecutionStatsPlugin
+
 # Graphite properties
 #*.falcon.graphite.hostname=localhost
 #*.falcon.graphite.port=2003


Mime
View raw message