falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [1/2] falcon git commit: FALCON-1757 EntityNotRegisteredException when entity is deleted from falcon (By Pavan Kolamuri)
Date Fri, 22 Jan 2016 11:48:48 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.9 ae002a01e -> 1fb294fbe


FALCON-1757 EntityNotRegisteredException when entity is deleted from falcon (By Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c5db759c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c5db759c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c5db759c

Branch: refs/heads/0.9
Commit: c5db759c18ce13f2b7ed6db3a82cca7eb507ea0f
Parents: ae002a0
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Fri Jan 22 16:25:19 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Fri Jan 22 16:25:19 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                               |  2 ++
 .../falcon/rerun/handler/AbstractRerunConsumer.java       |  9 ++++++++-
 .../apache/falcon/rerun/handler/LateRerunConsumer.java    |  9 ++++++++-
 .../org/apache/falcon/rerun/handler/LateRerunHandler.java |  7 +++++++
 .../org/apache/falcon/rerun/handler/RetryConsumer.java    | 10 ++++++++--
 5 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 765569e..038c6f2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -106,6 +106,8 @@ Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1757 EntityNotRegisteredException when entity is deleted from falcon (Pavan Kolamuri
via Pallavi Rao)
+
     FALCON-1748 Client throws FalconWebException irrespective of type of error(Praveen Adlakha
via Ajay Yadava)
 
     FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED'
instances (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index f60b927..61ca8c0 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -20,6 +20,7 @@ package org.apache.falcon.rerun.handler;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
@@ -53,8 +54,8 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends
Abst
         AbstractRerunPolicy policy = new ExpBackoffPolicy();
         Frequency frequency = new Frequency("minutes(1)");
         while (!Thread.currentThread().isInterrupted()) {
+            T message = null;
             try {
-                T message;
                 try {
                     message = handler.takeFromQueue();
                     attempt = 1;
@@ -83,6 +84,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M
extends Abst
                         message.getEntityType(), message.getEntityName());
 
             } catch (Throwable e) {
+                if (e instanceof EntityNotRegisteredException) {
+                    LOG.warn("Entity {} of type {} doesn't exist in config store. Rerun "
+                                    + "cannot be done for workflow ", message.getEntityName(),
+                            message.getEntityType(), message.getWfId());
+                    return;
+                }
                 LOG.error("Error in rerun consumer", e);
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 4297788..fa0d6ae 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -58,7 +59,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
                     || jobStatus.equals("SUSPENDED")) {
                 LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same
delay as "
-                    + "job status is running: {}", message.getWfId());
+                        + "job status is running: {}", message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;
@@ -81,6 +82,12 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {
+            if (e instanceof EntityNotRegisteredException) {
+                LOG.warn("Entity {} of type {} doesn't exist in config store. Late rerun
"
+                                + "cannot be done for workflow ", message.getEntityName(),
+                        message.getEntityType(), message.getWfId());
+                return;
+            }
             LOG.warn("Late Re-run failed for instance {}:{} after {}",
                     message.getEntityName(), message.getInstance(), message.getDelayInMilliSec(),
e);
             GenericAlert.alertLateRerunFailed(message.getEntityType(), message.getEntityName(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 1d2ed37..0be6252 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -20,6 +20,7 @@ package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
@@ -91,6 +92,12 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>>
extends
                     wait, entityType, entityName, nominalTime, intRunId, workflowUser);
             offerToQueue(event);
         } catch (Exception e) {
+            if (e instanceof EntityNotRegisteredException) {
+                LOG.warn("Entity {} of type {} doesn't exist in config store. So late rerun
"
+                                + "cannot be done for workflow ", entityName,
+                        entityType, wfId);
+                return;
+            }
             LOG.error("Unable to schedule late rerun for entity instance: {} ({}): {} And
WorkflowId: {}",
                     entityType, entityName, nominalTime, wfId, e);
             GenericAlert.alertLateRerunFailed(entityType, entityName,

http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 96300d9..9b46713 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.rerun.event.RetryEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -43,17 +44,22 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
         try {
             if (!jobStatus.equals("KILLED")) {
                 LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay
as job status is running:"
-                    + " {}", message.getWfId());
+                        + " {}", message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;
             }
             LOG.info("Retrying attempt: {} out of configured: {} attempt for instance: {}:{}
And WorkflowId: {}"
-                    + " At time: {}",
+                            + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(),
message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
             handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(),
null, false);
         } catch (Exception e) {
+            if (e instanceof EntityNotRegisteredException) {
+                LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "
+                        + "cannot be done for workflow ", entityName, entityType, message.getWfId());
+                return;
+            }
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
                     .getProperty("max.retry.failure.count", "1"));
             if (message.getFailRetryCount() < maxFailRetryCount) {


Mime
View raw message