falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1864 Retry event does not get removed from delay queue
Date Mon, 28 Mar 2016 06:41:38 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 820572d8a -> 8e79ba81a


FALCON-1864 Retry event does not get removed from delay queue

Author: Pallavi Rao <pallavi.rao@inmobi.com>

Reviewers: @sandeepSamudrala, @pavankumar526, @PraveenAdlakha

Closes #76 from pallavi-rao/1864


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8e79ba81
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8e79ba81
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8e79ba81

Branch: refs/heads/master
Commit: 8e79ba81ad5e228ab235477860134ec6891d291a
Parents: 820572d
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Mon Mar 28 12:11:35 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Mar 28 12:11:35 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/rerun/handler/LateRerunConsumer.java  |  2 +-
 .../org/apache/falcon/rerun/handler/RetryConsumer.java  | 12 ++++++++----
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index f224805..047fa0f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -60,7 +60,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
                     || jobStatus.equals("SUSPENDED")) {
                 LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same
delay as "
-                        + "job status is running: {}", message.getWfId());
+                        + "job status is {} for : {}", jobStatus, message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e79ba81/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 836a172..4c763c2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -43,14 +43,18 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
     protected void handleRerun(String clusterName, String jobStatus,
                                RetryEvent message, String entityType, String entityName)
{
         try {
-            if (!jobStatus.equals("KILLED")) {
-                LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay
as job status is running:"
-                        + " {}", message.getWfId());
+            // Can happen when user does a manual re-run in-between retries.
+            if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")) {
+                LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay
as "
+                        + "job status is {} for : {}", jobStatus, message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());
                 handler.offerToQueue(message);
                 return;
+            } else if (jobStatus.equals("SUSPENDED") || jobStatus.equals("SUCCEEDED")) {
+                LOG.debug("Not retrying workflow {} anymore as it is in {} state. ", message.getWfId(),
jobStatus);
+                return;
             }
-            LOG.info("Retrying attempt: {} out of configured: {} attempt for instance: {}:{}
And WorkflowId: {}"
+            LOG.info("Retrying attempt: {} out of configured: {} attempts for instance: {}:{}
And WorkflowId: {}"
                             + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(),
message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));


Mime
View raw message