falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2173 BacklogEmitterService should emit 0 for process which don…
Date Wed, 26 Oct 2016 04:27:08 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 772e38779 -> c980aa800


FALCON-2173 BacklogEmitterService should emit 0 for process which don…

…ot have backlog

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: @pallavi-rao

Closes #291 from PraveenAdlakha/2173 and squashes the following commits:

d53a4e3 [Praveen Adlakha] test case fixed
d9e6771 [Praveen Adlakha] else condition changed
5c687bc [Praveen Adlakha] comments addressed
738ad4e [Praveen Adlakha] FALCON-2173 BacklogEmitterService should emit 0 for process which
donot have backlog


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c980aa80
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c980aa80
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c980aa80

Branch: refs/heads/master
Commit: c980aa800afdcf38fcdafe004586abb3bafd4732
Parents: 772e387
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Wed Oct 26 09:56:55 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Wed Oct 26 09:56:55 2016 +0530

----------------------------------------------------------------------
 .../service/BacklogMetricEmitterService.java    | 52 ++++++++++++++------
 .../falcon/service/EntitySLAAlertService.java   |  2 +-
 .../service/EntitySLAMonitoringService.java     |  4 +-
 .../java/org/apache/falcon/util/MetricInfo.java |  2 +
 4 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 3aa2155..b01b181 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Clusters;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.jdbc.BacklogMetricStore;
 import org.apache.falcon.metrics.MetricNotificationService;
@@ -103,7 +104,7 @@ public final class BacklogMetricEmitterService implements FalconService,
 
     @Override
     public void onAdd(Entity entity) throws FalconException{
-        //DO Nothing
+        addToBacklog(entity);
     }
 
     @Override
@@ -148,12 +149,25 @@ public final class BacklogMetricEmitterService implements FalconService,
             for(Cluster cluster : process.getClusters().getClusters()){
                 dropMetric(cluster.getName(), process);
             }
+        }else{
+            addToBacklog(newEntity);
         }
     }
 
     @Override
     public void onReload(Entity entity) throws FalconException{
-        // Do Nothing
+        addToBacklog(entity);
+    }
+
+    public void addToBacklog(Entity entity) {
+        if (entity.getEntityType() != EntityType.PROCESS) {
+            return;
+        }
+        Process process = (Process) entity;
+        if (process.getSla() == null){
+            return;
+        }
+        entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
     }
 
     @Override
@@ -306,21 +320,29 @@ public final class BacklogMetricEmitterService implements FalconService,
             MetricInfo metricInfo = null;
             HashMap<String, Long> backLogsCluster = new HashMap<>();
             synchronized (metrics) {
-                long currentTime = System.currentTimeMillis();
-                Iterator iter = metrics.iterator();
-                while (iter.hasNext()) {
-                    try {
-                        metricInfo = (MetricInfo) iter.next();
-                        long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
-                        long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
-                                ? backLogsCluster.get(metricInfo.getCluster()) : 0;
-                        backlog += (currentTime - time);
-                        backLogsCluster.put(metricInfo.getCluster(), backlog);
-                    } catch (ParseException e) {
-                        LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
+                if (metrics.isEmpty()){
+                    Process process = (Process)entityObj;
+                    Clusters clusters = process.getClusters();
+                    for (Cluster cluster : clusters.getClusters()){
+                        publishBacklog(process, cluster.getName(), 0L);
+                    }
+                }else{
+                    long currentTime = System.currentTimeMillis();
+                    Iterator iter = metrics.iterator();
+                    while (iter.hasNext()) {
+                        try {
+                            metricInfo = (MetricInfo) iter.next();
+                            long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
+                            long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
+                                    ? backLogsCluster.get(metricInfo.getCluster()) : 0;
+                            backlog += (currentTime - time);
+                            backLogsCluster.put(metricInfo.getCluster(), backlog);
+                        } catch (ParseException e) {
+                            LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
+                        }
                     }
-                }
 
+                }
             }
             org.apache.falcon.entity.v0.process.Process process = (Process) entityObj;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index c4069dd..837a170 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -163,7 +163,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
                               ) throws FalconException {
         for (EntitySLAListener listener : listeners) {
             listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
-            store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name());
         }
+        store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name());
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 7ff9309..00e116b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -418,8 +418,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
             return;
         }
         for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
-            for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
-                    pendingInstanceBean.getClusterName(), entityType)) {
+            for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances(
+                    pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(),
entityType)) {
                 boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
                         pendingInstanceBean.getClusterName(), instanceTime, entityType);
                 if (status) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
index 694bb87..59c2bfd 100644
--- a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
+++ b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
@@ -25,6 +25,8 @@ public class MetricInfo {
     private String nominalTime;
     private String cluster;
 
+    public MetricInfo(){}
+
     public MetricInfo(String nominalTimeStr, String clusterName) {
         this.nominalTime = nominalTimeStr;
         this.cluster = clusterName;


Mime
View raw message