falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [2/4] falcon git commit: FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (by Pallavi Rao)
Date Tue, 19 Jan 2016 17:14:37 GMT
FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances
(by Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5fb3a7ab
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5fb3a7ab
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5fb3a7ab

Branch: refs/heads/master
Commit: 5fb3a7ab830255d9321ff4a5a8adb481c6c0d683
Parents: ccd536a
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Jan 19 19:27:21 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Jan 19 19:27:21 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/execution/ProcessExecutor.java       | 39 +++++++++++++-------
 .../org/apache/falcon/state/EntityState.java    |  3 +-
 3 files changed, 29 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f80e7a1..255706d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -116,6 +116,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED'
instances (Pallavi Rao)
+
     FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via
Pallavi Rao)
 
     FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 40fe1b3..188cec2 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Properties;
-import java.util.TimeZone;
 
 /**
  * This class is responsible for managing execution instances of a process.
@@ -153,8 +152,7 @@ public class ProcessExecutor extends EntityExecutor {
                 suspend(instance);
             } catch (FalconException e) {
                 // Proceed with next
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due
to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
             }
         }
         for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -163,8 +161,7 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 suspend(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due
to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
             }
         }
         // Some errors
@@ -173,6 +170,24 @@ public class ProcessExecutor extends EntityExecutor {
         }
     }
 
+    // Error handling for an operation.
+    private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT
action)
+        throws StateStoreException {
+        try {
+            // If the instance terminated while a kill/suspend operation was in progress,
ignore the exception.
+            InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+            if (InstanceState.getTerminalStates().contains(currentState)) {
+                return "";
+            }
+        } catch (StateStoreException sse) {
+            throw sse;
+        }
+
+        String errMsg = "Instance " + action.name() + " failed for: " + instance.getId()
+ " due to " + e.getMessage();
+        LOG.error(errMsg, e);
+        return errMsg;
+    }
+
     //  Returns last materialized instance's time.
     private Date getLastInstanceTime() throws StateStoreException {
         InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
@@ -198,8 +213,8 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 resume(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due
to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append("Instance resume failed for : " + instance.getId() + " due
to " + e.getMessage());
+                LOG.error("Instance resume failed for : " + instance.getId(), e);
             }
         }
         registerForNotifications(getLastInstanceTime());
@@ -219,8 +234,7 @@ public class ProcessExecutor extends EntityExecutor {
                 kill(instance);
             } catch (FalconException e) {
                 // Proceed with next
-                errMsg.append("Instance kill failed for : " + instance.getId() + " due to
" + e.getMessage());
-                LOG.error("Instance kill failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
         for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -229,8 +243,7 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 kill(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance kill failed for : " + instance.getId() + " due to
" + e.getMessage());
-                LOG.error("Instance kill failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
         // Some errors
@@ -248,12 +261,10 @@ public class ProcessExecutor extends EntityExecutor {
             LOG.error("Suspend failed for instance id : " + instance.getId(), e);
             throw new FalconException("Suspend failed for instance : " + instance.getId(),
e);
         }
-
     }
 
     @Override
     public void resume(ExecutionInstance instance) throws FalconException {
-
         try {
             instance.resume();
             if (((ProcessExecutionInstance) instance).isScheduled()) {
@@ -452,7 +463,7 @@ public class ProcessExecutor extends EntityExecutor {
         requestBuilder.setFrequency(process.getFrequency())
                 .setStartTime(new DateTime(startTime))
                 .setEndTime(new DateTime(endTime))
-                .setTimeZone(TimeZone.getTimeZone("UTC"));
+                .setTimeZone(EntityUtil.getTimeZone(process));
         NotificationServicesRegistry.register(requestBuilder.build());
         LOG.info("Registered for a time based notification for process {}  with frequency:
{}, "
                 + "start time: {}, end time: {}", process.getName(), process.getFrequency(),
startTime, endTime);

http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index f44f174..ae57fa1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -81,7 +81,8 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
         SUBMIT,
         SCHEDULE,
         SUSPEND,
-        RESUME
+        RESUME,
+        KILL
     }
 
     /**


Mime
View raw message