falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2074 Bugs in Process SLA monitoring after dev testing
Date Wed, 13 Jul 2016 09:17:49 GMT
Repository: falcon
Updated Branches:
  refs/heads/master e46e5c4c6 -> f14eca88e


FALCON-2074 Bugs in Process SLA monitoring after dev testing

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: @pallavi-rao

Closes #221 from PraveenAdlakha/processSLATesting


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f14eca88
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f14eca88
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f14eca88

Branch: refs/heads/master
Commit: f14eca88e7875e5c8cc88a31c0e9ae58928f29a1
Parents: e46e5c4
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Wed Jul 13 14:47:27 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Wed Jul 13 14:47:27 2016 +0530

----------------------------------------------------------------------
 .../falcon/persistence/MonitoredEntityBean.java |  4 +-
 .../persistence/PersistenceConstants.java       |  3 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 15 +++++--
 .../falcon/service/EntitySLAAlertService.java   | 19 +++++----
 .../service/EntitySLAMonitoringService.java     | 42 ++++++++++++--------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  2 +-
 6 files changed, 54 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
index 20ce537..1db3d04 100644
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -42,7 +42,9 @@ import javax.validation.constraints.NotNull;
                 + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType
= :entityType"),
         @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete
from MonitoredEntityBean "
                 + "a where a.entityName = :entityName and a.entityType = :entityType"),
-        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select
OBJECT(a) "
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query
= "select OBJECT(a) "
+                + "from MonitoredEntityBean a where a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, query = "select
OBJECT(a) "
                 + "from MonitoredEntityBean a")
 })
 @Table(name="MONITORED_ENTITY")

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index f9aa1f5..7c2479d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -26,7 +26,7 @@ public final class PersistenceConstants {
     }
     public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
     public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
-    public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS";
+    public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = "GET_ALL_MONITORING_ENTITY_FOR_TYPE";
     public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
     public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
     public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
@@ -61,4 +61,5 @@ public final class PersistenceConstants {
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
     public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
     public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
+    public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index c1f818a..6a38b0a 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -87,9 +87,18 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException
{
+    public List<MonitoredEntityBean> getAllMonitoredEntity() throws ResultNotFoundException
{
         EntityManager entityManager = getEntityManager();
-        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY);
+        List result = q.getResultList();
+        entityManager.close();
+        return result;
+    }
+
+    public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType)
throws ResultNotFoundException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -159,7 +168,7 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
-    public List<PendingInstanceBean> getAllInstances(){
+    public List<PendingInstanceBean> getAllPendingInstances(){
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
         List result = q.getResultList();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index f023c35..57e46b7 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -69,13 +69,15 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
     public void init() throws FalconException {
         String listenerClassNames = StartupProperties.get().
                 getProperty("feedAlert.listeners");
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
+        if (listenerClassNames != null && !listenerClassNames.isEmpty()) {
+            for (String listenerClassName : listenerClassNames.split(",")) {
+                listenerClassName = listenerClassName.trim();
+                if (listenerClassName.isEmpty()) {
+                    continue;
+                }
+                EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
+                registerListener(listener);
             }
-            EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
         }
 
         String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds",
"600");
@@ -105,7 +107,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
 
     void processSLACandidates(){
         //Get all feeds instances to be monitored
-        List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
+        List<PendingInstanceBean> pendingInstanceBeanList = store.getAllPendingInstances();
         if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
             return;
         }
@@ -152,7 +154,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
                 }
             }
         } catch (FalconException e){
-            LOG.error("Exception in FeedSLAALertService:", e);
+            LOG.error("Exception in EntitySLAALertService:", e);
         }
 
     }
@@ -160,6 +162,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
     @Override
     public void highSLAMissed(String entityName, String clusterName, String entityType ,
Date nominalTime
                               ) throws FalconException {
+        LOG.debug("Listners called...");
         for (EntitySLAListener listener : listeners) {
             listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
             store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index f931625..6616f8b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -344,7 +344,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         @Override
         public void run() {
             try {
-                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size() > 0) {
                     checkPendingInstanceAvailability(EntityType.FEED.toString());
                     checkPendingInstanceAvailability(EntityType.PROCESS.toString());
 
@@ -363,31 +363,34 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
     void addNewPendingFeedInstances(Date to, String entityType) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        List<MonitoredEntityBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
-        for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) {
+        List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.
+                getAllMonitoredEntityForEntity(entityType);
+        for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
             String entityName = monitoredEntityBean.getFeedName();
             Entity entity = EntityUtil.getEntity(entityType, entityName);
+            LOG.debug("entityName:"+ entityName+"entity:"+entity);
             Set<String> clusters =  EntityUtil.getClustersDefined(entity);
             List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList();
             for(String string : clusters){
                 cluster.add(ClusterHelper.getCluster(string));
             }
-            for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : cluster) {
-                if (currentClusters.contains(feedCluster.getName())) {
+            for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : cluster) {
+                if (currentClusters.contains(entityCluster.getName())) {
                     // get start of instances from the database
                     Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
-                            EntityType.FEED.toString());
-                    Pair<String, String> key = new Pair<>(entity.getName(), feedCluster.getName());
+                            entityType);
+                    Pair<String, String> key = new Pair<>(entity.getName(), entityCluster.getName());
                     if (nextInstanceTime == null) {
-                        nextInstanceTime = getInitialStartTime(entity, feedCluster.getName(),
entityType);
+                        nextInstanceTime = getInitialStartTime(entity, entityCluster.getName(),
entityType);
                     } else {
                         nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
                     }
 
                     Set<Date> instances = new HashSet<>();
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
-                            EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+                            EntityUtil.getEntity(EntityType.CLUSTER, entityCluster.getName());
                     nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster,
nextInstanceTime);
+                    LOG.info("nextInstanceTime:"+ nextInstanceTime + "entityName:"+entityName);
                     Date endDate;
                     if (entityType.equals(EntityType.FEED.toString())){
                         endDate =  FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd();
@@ -396,14 +399,14 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
                                 currentCluster.getName()).getEnd();
                     }
                     while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate))
{
-                        LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime,
key);
+                        LOG.info("Adding instance={} for <entity,cluster>={}", nextInstanceTime,
key);
                         instances.add(nextInstanceTime);
                         nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
                         nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster,
nextInstanceTime);
                     }
 
                     for(Date date:instances){
-                        MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(),
feedCluster.getName(), date,
+                        MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(),
entityCluster.getName(), date,
                                 entityType);
                     }
                 }
@@ -416,7 +419,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
      * Checks the availability of all the pendingInstances and removes the ones which have
become available.
      */
     private void checkPendingInstanceAvailability(String entityType) throws FalconException
{
-        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+        LOG.debug("Size "+MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size());
+        if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){
+            LOG.info("Returning as size of pending instance is zero");
+            return;
+        }
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
             for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
                     pendingInstanceBean.getClusterName(), entityType)) {
                 boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
@@ -462,10 +470,10 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
                 }
             }
         } catch (Throwable e) {
-            LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}", entityName,
clusterName,
-                    entityType, e);
+            LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}, nominalTime{}",
entityName,
+                    clusterName, entityType, nominalTime, e);
         }
-        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.",
entity.getName(),
+        LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is not available.",
entity.getName(),
             clusterName, nominalTime);
         return false;
     }
@@ -486,7 +494,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start,
Date end)
         throws FalconException {
         Set<SchedulableEntityInstance> result = new HashSet<>();
-        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
             Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
                     pendingInstanceBean.getClusterName());
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
@@ -602,7 +610,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
     @VisibleForTesting
     Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws
FalconException {
-        if (entityType.equals(EntityType.PROCESS.toString())){
+        if (entityType.equals(EntityType.FEED.toString())){
             Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
             if (sla == null) {
                 throw new IllegalStateException("InitialStartTime can not be determined as
the feed: "

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index 8cf2b2d..018c562 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -90,7 +90,7 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
         monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString());
         Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
                 EntityType.FEED.toString()).getFeedName());
-        Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
+        Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntity().size(), 2);
 
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString());
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString());


Mime
View raw message