falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-355 Remove SLAMonitoringService. Contributed by Shwetha GS
Date Fri, 14 Mar 2014 09:30:55 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master bb55a2c9f -> 5e4352151


FALCON-355 Remove SLAMonitoringService. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5e435215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5e435215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5e435215

Branch: refs/heads/master
Commit: 5e43521519900f7778c7db938aaec85ee4489a52
Parents: bb55a2c
Author: Shwetha GS <shwethags@gmail.com>
Authored: Fri Mar 14 15:00:43 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Fri Mar 14 15:00:43 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 common/src/main/resources/startup.properties    |   1 -
 .../org/apache/falcon/aspect/GenericAlert.java  |   9 -
 .../workflow/engine/OozieWorkflowEngine.java    | 325 ++++++++-----------
 .../falcon/service/FalconTopicSubscriber.java   |  15 -
 .../falcon/service/SLAMonitoringService.java    | 237 --------------
 src/conf/startup.properties                     |   1 -
 7 files changed, 136 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1f8cb1..5748d27 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
     (Venkatesh Seetharam)
    
   IMPROVEMENTS
+    FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
+
     FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste
     Onofrévia Shaik Idris)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 0b941eb..457b3a6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -33,7 +33,6 @@
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
-                        org.apache.falcon.service.SLAMonitoringService,\
                         org.apache.falcon.service.LogCleanupService
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index 0b680ba..5ab2f72 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -102,15 +102,6 @@ public final class GenericAlert {
 
     }
 
-    @Monitored(event = "sla-miss")
-    public static String alertOnLikelySLAMiss(
-            @Dimension(value = "cluster") String cluster,
-            @Dimension(value = "entity-type") String entityType,
-            @Dimension(value = "entity-name") String entityName,
-            @Dimension(value = "nominal-time") String nominalTime) {
-        return "IGNORE";
-    }
-
     @Monitored(event = "log-cleanup-service-failed")
     public static String alertLogCleanupServiceFailed(
             @Dimension(value = "message") String message,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index dee77c0..ac8862e 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -60,34 +60,32 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     public static final String ENGINE = "oozie";
     private static final BundleJob MISSING = new NullBundleJob();
 
-    private static final List<WorkflowJob.Status> WF_KILL_PRECOND = Arrays.asList(WorkflowJob.Status.PREP,
-            WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED, WorkflowJob.Status.FAILED);
-    private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays
-            .asList(WorkflowJob.Status.RUNNING);
-    private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays
-            .asList(WorkflowJob.Status.SUSPENDED);
-    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED,
-            WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
-    private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = Arrays
-            .asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
-
-    private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = Arrays.asList(
-            Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED,
-            Job.Status.PREPSUSPENDED, Job.Status.DONEWITHERROR);
-    private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS = Arrays.asList(
-            Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
-    private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(
-            Job.Status.PREP, Job.Status.RUNNING);
-
-    private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND = Arrays.asList(
-            Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
-    private static final List<Job.Status> BUNDLE_RESUME_PRECOND = Arrays.asList(
-            Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
+    private static final List<WorkflowJob.Status> WF_KILL_PRECOND =
+        Arrays.asList(WorkflowJob.Status.PREP, WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED,
+            WorkflowJob.Status.FAILED);
+    private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
+    private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
+    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
+        Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
+    private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
+        Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
+
+    private static final List<Job.Status> BUNDLE_ACTIVE_STATUS =
+        Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED,
+            Job.Status.DONEWITHERROR);
+    private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS =
+        Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
+    private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING);
+
+    private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND =
+        Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
+    private static final List<Job.Status> BUNDLE_RESUME_PRECOND =
+        Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
     private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
 
-    private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[]{
-        "parallel", "clusters.clusters[\\d+].validity.end", };
+    private static final String[] BUNDLE_UPDATEABLE_PROPS =
+        new String[]{"parallel", "clusters.clusters[\\d+].validity.end", };
 
     public OozieWorkflowEngine() {
         registerListener(new OozieHouseKeepingService());
@@ -153,8 +151,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         ACTIVE, RUNNING, SUSPENDED
     }
 
-    private boolean isBundleInState(Entity entity, BundleStatus status)
-        throws FalconException {
+    private boolean isBundleInState(Entity entity, BundleStatus status) throws FalconException {
 
         Map<String, BundleJob> bundles = findLatestBundle(entity);
         for (BundleJob bundle : bundles.values()) {
@@ -190,8 +187,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
 
         try {
-            List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(
-                    OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
+            List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(OozieClient.FILTER_NAME
+                + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
                 List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
                 for (BundleJob job : jobs) {
@@ -276,8 +273,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return result;
     }
 
-    private String doBundleAction(Entity entity, BundleAction action, String cluster)
-        throws FalconException {
+    private String doBundleAction(Entity entity, BundleAction action, String cluster) throws FalconException {
 
         List<BundleJob> jobs = findBundles(entity, cluster);
         if (jobs.isEmpty()) {
@@ -290,16 +286,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             switch (action) {
             case SUSPEND:
                 // not already suspended and preconditions are true
-                if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus())
-                        && BUNDLE_SUSPEND_PRECOND.contains(job.getStatus())) {
+                if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus()) && BUNDLE_SUSPEND_PRECOND.contains(
+                    job.getStatus())) {
                     suspend(cluster, job.getId());
                 }
                 break;
 
             case RESUME:
                 // not already running and preconditions are true
-                if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus())
-                        && BUNDLE_RESUME_PRECOND.contains(job.getStatus())) {
+                if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus()) && BUNDLE_RESUME_PRECOND.contains(
+                    job.getStatus())) {
                     resume(cluster, job.getId());
                 }
                 break;
@@ -327,8 +323,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             //set end time of bundle
-            client.change(job.getId(),
-                    OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
+            client.change(job.getId(), OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
             LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + clusterName);
 
             //kill bundle
@@ -339,8 +334,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void beforeAction(Entity entity, BundleAction action, String cluster)
-        throws FalconException {
+    private void beforeAction(Entity entity, BundleAction action, String cluster) throws FalconException {
 
         for (WorkflowEngineActionListener listener : listeners) {
             switch (action) {
@@ -360,8 +354,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void afterAction(Entity entity, BundleAction action, String cluster)
-        throws FalconException {
+    private void afterAction(Entity entity, BundleAction action, String cluster) throws FalconException {
 
         for (WorkflowEngineActionListener listener : listeners) {
             switch (action) {
@@ -382,18 +375,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public InstancesResult getRunningInstances(Entity entity)
-        throws FalconException {
+    public InstancesResult getRunningInstances(Entity entity) throws FalconException {
         try {
-            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(
-                    ENGINE, entity);
+            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             List<Instance> runInstances = new ArrayList<Instance>();
             String[] wfNames = builder.getWorkflowNames(entity);
             List<String> coordNames = new ArrayList<String>();
             for (String wfName : wfNames) {
-                if (EntityUtil.getWorkflowName(Tag.RETENTION, entity)
-                        .toString().equals(wfName)) {
+                if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
                     continue;
                 }
                 coordNames.add(wfName);
@@ -409,23 +399,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                             continue;
                         }
 
-                        CoordinatorAction action = client.getCoordActionInfo(wf
-                                .getParentId());
-                        String nominalTimeStr = SchemaHelper
-                                .formatDateUTC(action.getNominalTime());
-                        Instance instance = new Instance(cluster,
-                                nominalTimeStr, WorkflowStatus.RUNNING);
+                        CoordinatorAction action = client.getCoordActionInfo(wf.getParentId());
+                        String nominalTimeStr = SchemaHelper.formatDateUTC(action.getNominalTime());
+                        Instance instance = new Instance(cluster, nominalTimeStr, WorkflowStatus.RUNNING);
                         instance.startTime = wf.getStartTime();
                         if (entity.getEntityType() == EntityType.FEED) {
-                            instance.sourceCluster = getSourceCluster(cluster,
-                                    action, entity);
+                            instance.sourceCluster = getSourceCluster(cluster, action, entity);
                         }
                         runInstances.add(instance);
                     }
                 }
             }
-            return new InstancesResult("Running Instances",
-                    runInstances.toArray(new Instance[runInstances.size()]));
+            return new InstancesResult("Running Instances", runInstances.toArray(new Instance[runInstances.size()]));
 
         } catch (OozieClientException e) {
             throw new FalconException(e);
@@ -433,39 +418,36 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public InstancesResult killInstances(Entity entity, Date start, Date end,
-                                         Properties props) throws FalconException {
+    public InstancesResult killInstances(Entity entity, Date start, Date end, Properties props) throws FalconException {
         return doJobAction(JobAction.KILL, entity, start, end, props);
     }
 
     @Override
-    public InstancesResult reRunInstances(Entity entity, Date start, Date end,
-                                          Properties props) throws FalconException {
+    public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props)
+        throws FalconException {
         return doJobAction(JobAction.RERUN, entity, start, end, props);
     }
 
     @Override
-    public InstancesResult suspendInstances(Entity entity, Date start,
-                                            Date end, Properties props) throws FalconException {
+    public InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props)
+        throws FalconException {
         return doJobAction(JobAction.SUSPEND, entity, start, end, props);
     }
 
     @Override
-    public InstancesResult resumeInstances(Entity entity, Date start, Date end,
-                                           Properties props) throws FalconException {
+    public InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props)
+        throws FalconException {
         return doJobAction(JobAction.RESUME, entity, start, end, props);
     }
 
     @Override
-    public InstancesResult getStatus(Entity entity, Date start, Date end)
-        throws FalconException {
+    public InstancesResult getStatus(Entity entity, Date start, Date end) throws FalconException {
 
         return doJobAction(JobAction.STATUS, entity, start, end, null);
     }
 
     @Override
-    public InstancesSummaryResult getSummary(Entity entity, Date start, Date end)
-        throws FalconException {
+    public InstancesSummaryResult getSummary(Entity entity, Date start, Date end) throws FalconException {
 
         return doSummaryJobAction(entity, start, end, null);
     }
@@ -474,8 +456,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY
     }
 
-    private WorkflowJob getWorkflowInfo(String cluster, String wfId)
-        throws FalconException {
+    private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException {
         try {
             return OozieClientFactory.get(cluster).getJobInfo(wfId);
         } catch (OozieClientException e) {
@@ -483,8 +464,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private InstancesResult doJobAction(JobAction action, Entity entity,
-                                        Date start, Date end, Properties props) throws FalconException {
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props)
+        throws FalconException {
         Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end);
         List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
         List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
@@ -518,8 +499,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 }
 
                 String nominalTimeStr = SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime());
-                InstancesResult.Instance instance = new InstancesResult.Instance(
-                        cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
+                InstancesResult.Instance instance =
+                    new InstancesResult.Instance(cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
                 if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
                     WorkflowJob jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
                     instance.startTime = jobInfo.getStartTime();
@@ -539,8 +520,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesResult;
     }
 
-    private InstancesSummaryResult doSummaryJobAction(Entity entity,
-                                        Date start, Date end, Properties props) throws FalconException {
+    private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, Date end, Properties props)
+        throws FalconException {
 
         Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
         List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
@@ -565,15 +546,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
                 Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
                 Date iterEnd = (coord.getLastActionTime() != null && coord.getLastActionTime().before(end)
-                        ? coord.getLastActionTime() : end);
+                    ? coord.getLastActionTime() : end);
 
                 if (i == (applicableCoords.size() - 1)) {
                     isLastCoord = true;
                 }
 
                 int startActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterStart);
-                int lastMaterializedActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(),
-                        freq, tz, iterEnd);
+                int lastMaterializedActionNumber =
+                    EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd);
                 int endActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, end);
 
                 if (lastMaterializedActionNumber < startActionNumber) {
@@ -587,7 +568,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 CoordinatorJob coordJob;
                 try {
                     coordJob = client.getCoordJobInfo(coord.getId(), null, startActionNumber,
-                            (lastMaterializedActionNumber - startActionNumber));
+                        (lastMaterializedActionNumber - startActionNumber));
                 } catch (OozieClientException e) {
                     LOG.debug("Unable to get details for coordinator " + coord.getId() + " " + e.getMessage());
                     throw new FalconException(e);
@@ -602,12 +583,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 instancesSummary.put("UNSCHEDULED", unscheduledInstances);
             }
 
-            InstanceSummary summary= new InstanceSummary(cluster, instancesSummary);
+            InstanceSummary summary = new InstanceSummary(cluster, instancesSummary);
             instances.add(summary);
         }
 
-        InstancesSummaryResult instancesSummaryResult = new InstancesSummaryResult(APIResult.Status.SUCCEEDED,
-                JobAction.SUMMARY.name());
+        InstancesSummaryResult instancesSummaryResult =
+            new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
         instancesSummaryResult.setInstancesSummary(instances.toArray(new InstanceSummary[instances.size()]));
         return instancesSummaryResult;
     }
@@ -615,10 +596,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary) {
         List<CoordinatorAction> actions = coordJob.getActions();
 
-        for (CoordinatorAction coordAction :  actions) {
+        for (CoordinatorAction coordAction : actions) {
             if (instancesSummary.containsKey(coordAction.getStatus().name())) {
                 instancesSummary.put(coordAction.getStatus().name(),
-                        instancesSummary.get(coordAction.getStatus().name()) + 1L);
+                    instancesSummary.get(coordAction.getStatus().name()) + 1L);
             } else {
                 instancesSummary.put(coordAction.getStatus().name(), 1L);
             }
@@ -626,7 +607,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
-                                 Properties props) throws FalconException {
+        Properties props) throws FalconException {
         WorkflowJob jobInfo = null;
         String status = coordinatorAction.getStatus().name();
         if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
@@ -684,30 +665,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return mapActionStatus(status);
     }
 
-    private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException  {
-        try{
+    private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException {
+        try {
             OozieClient client = OozieClientFactory.get(cluster);
-            client.reRunCoord(coordinatorAction.getJobId(),
-                RestConstants.JOB_COORD_RERUN_ACTION,
-                    Integer.toString(coordinatorAction.getActionNumber()), true, true);
+            client.reRunCoord(coordinatorAction.getJobId(), RestConstants.JOB_COORD_RERUN_ACTION,
+                Integer.toString(coordinatorAction.getActionNumber()), true, true);
             assertCoordActionStatus(cluster, coordinatorAction.getId(),
                 org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
-                    org.apache.oozie.client.CoordinatorAction.Status.WAITING,
+                org.apache.oozie.client.CoordinatorAction.Status.WAITING,
                 org.apache.oozie.client.CoordinatorAction.Status.READY);
             LOG.info("Rerun job " + coordinatorAction.getId() + " on cluster " + cluster);
-        }catch (Exception e) {
+        } catch (Exception e) {
             LOG.error("Unable to rerun workflows", e);
             throw new FalconException(e);
         }
     }
 
     private void assertCoordActionStatus(String cluster, String coordActionId,
-            org.apache.oozie.client.CoordinatorAction.Status... statuses)
-        throws FalconException, OozieClientException {
+        org.apache.oozie.client.CoordinatorAction.Status... statuses) throws FalconException, OozieClientException {
         OozieClient client = OozieClientFactory.get(cluster);
         CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId);
         for (int counter = 0; counter < 3; counter++) {
-            for(org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
+            for (org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
                 if (status.equals(actualStatus.getStatus())) {
                     return;
                 }
@@ -719,9 +698,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
             actualStatus = client.getCoordActionInfo(coordActionId);
         }
-        throw new FalconException("For Job" + coordActionId + ", actual statuses: "
-            +actualStatus + ", expected statuses: "
-                 + Arrays.toString(statuses));
+        throw new FalconException("For Job" + coordActionId + ", actual statuses: " + actualStatus + ", "
+            + "expected statuses: " + Arrays.toString(statuses));
     }
 
     private String getSourceCluster(String cluster, CoordinatorAction coordinatorAction, Entity entity)
@@ -734,10 +712,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private List<String> getIncludedClusters(Properties props,
-                                             String clustersType) {
-        String clusters = props == null ? "" : props.getProperty(clustersType,
-                "");
+    private List<String> getIncludedClusters(Properties props, String clustersType) {
+        String clusters = props == null ? "" : props.getProperty(clustersType, "");
         List<String> clusterList = new ArrayList<String>();
         for (String cluster : clusters.split(",")) {
             if (StringUtils.isNotEmpty(cluster)) {
@@ -749,8 +725,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private String mapActionStatus(String status) {
         if (CoordinatorAction.Status.READY.toString().equals(status)
-                || CoordinatorAction.Status.WAITING.toString().equals(status)
-                || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
+            || CoordinatorAction.Status.WAITING.toString().equals(status)
+            || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.WAITING.name();
         } else if (CoordinatorAction.Status.DISCARDED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.KILLED.name();
@@ -763,8 +739,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    protected Map<String, List<CoordinatorAction>> getCoordActions(
-            Entity entity, Date start, Date end) throws FalconException {
+    protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end)
+        throws FalconException {
         Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
         Map<String, List<CoordinatorAction>> actionsMap = new HashMap<String, List<CoordinatorAction>>();
 
@@ -804,17 +780,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private Frequency createFrequency(String frequency, Timeunit timeUnit) {
-        return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name())
-                .getFalconTimeUnit());
+        return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name()).getFalconTimeUnit());
     }
 
     /**
      * TimeUnit as understood by Oozie.
      */
     private enum OozieTimeUnit {
-        MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(
-                null), MONTH(TimeUnit.months), END_OF_DAY(null), END_OF_MONTH(
-                null), NONE(null);
+        MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(null), MONTH(TimeUnit.months),
+        END_OF_DAY(null), END_OF_MONTH(null), NONE(null);
 
         private TimeUnit falconTimeUnit;
 
@@ -824,32 +798,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         public TimeUnit getFalconTimeUnit() {
             if (falconTimeUnit == null) {
-                throw new IllegalStateException("Invalid coord frequency: "
-                        + name());
+                throw new IllegalStateException("Invalid coord frequency: " + name());
             }
             return falconTimeUnit;
         }
     }
 
-    private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client,
-                                                     Date start, Date end, List<BundleJob> bundles)
-        throws FalconException {
+    private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client, Date start, Date end,
+        List<BundleJob> bundles) throws FalconException {
 
         List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
         try {
             for (BundleJob bundle : bundles) {
-                List<CoordinatorJob> coords = client.getBundleJobInfo(
-                        bundle.getId()).getCoordinators();
+                List<CoordinatorJob> coords = client.getBundleJobInfo(bundle.getId()).getCoordinators();
                 for (CoordinatorJob coord : coords) {
-                    String coordName = EntityUtil.getWorkflowName(
-                            Tag.RETENTION, entity).toString();
+                    String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
                     if (coordName.equals(coord.getAppName())) {
                         continue;
                     }
                     // if end time is before coord-start time or start time is
                     // after coord-end time ignore.
-                    if (!(end.compareTo(coord.getStartTime()) <= 0 || start
-                            .compareTo(coord.getEndTime()) >= 0)) {
+                    if (!(end.compareTo(coord.getStartTime()) <= 0 || start.compareTo(coord.getEndTime()) >= 0)) {
                         applicableCoords.add(coord);
                     }
                 }
@@ -894,8 +863,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             LOG.info("Updating entity through Workflow Engine" + newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
             if (newEndTime.before(now())) {
-                throw new FalconException("New end time for " + newEntity.getName()
-                        + " is past current time. Entity can't be updated. Use remove and add");
+                throw new FalconException("New end time for " + newEntity.getName() + " is past current time. Entity "
+                    + "can't be updated. Use remove and add");
             }
 
             LOG.debug("Updating for cluster : " + cluster + ", bundle: " + bundle.getId());
@@ -904,12 +873,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 // only concurrency and endtime are changed. So, change coords
                 LOG.info("Change operation is adequate! : " + cluster + ", bundle: " + bundle.getId());
                 updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
-                        EntityUtil.getEndTime(newEntity, cluster));
+                    EntityUtil.getEndTime(newEntity, cluster));
                 return newEndTime;
             }
 
-            LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster " + cluster + ", bundle: "
-                    + bundle.getId());
+            LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster " + cluster + ", "
+                + "bundle: " + bundle.getId());
             effectiveTime = updateInternal(oldEntity, newEntity, cluster, bundle, false, effectiveTime);
             LOG.info("Entity update complete : " + newEntity.toShortString() + cluster + ", bundle: " + bundle.getId());
         }
@@ -933,22 +902,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.info("Triggering update for " + cluster + ", " + affectedProcBundle.getId());
 
-            //TODO handle roll forward
-//            BundleJob feedBundle = findLatestBundle(newEntity, cluster);
-//            if (feedBundle == MISSING) {
-//                throw new IllegalStateException("Unable to find feed bundle in " + cluster
-//                        + " for entity " + newEntity.getName());
-//            }
-//            boolean processCreated = feedBundle.getCreatedTime().before(
-//                    affectedProcBundle.getCreatedTime());
-
             Date depEndTime =
-                    updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, false, effectiveTime);
+                updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, false, effectiveTime);
             if (effectiveTime == null || effectiveTime.after(depEndTime)) {
                 effectiveTime = depEndTime;
             }
-            LOG.info("Entity update complete : " + affectedEntity.toShortString() + cluster
-                    + ", bundle: " + affectedProcBundle.getId());
+            LOG.info("Entity update complete : " + affectedEntity.toShortString() + cluster + ", "+ "bundle: "
+                + affectedProcBundle.getId());
         }
         LOG.info("Entity update and all dependent entities updated: " + oldEntity.toShortString());
         return effectiveTime;
@@ -979,8 +939,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private Date getCoordLastActionTime(CoordinatorJob coord) {
         if (coord.getNextMaterializedTime() != null) {
-            Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord
-                    .getTimeZone()));
+            Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
             cal.setTime(coord.getLastActionTime());
             Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
             cal.add(freq.getTimeUnit().getCalendarUnit(), -freq.getFrequencyAsInt());
@@ -989,16 +948,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return null;
     }
 
-    private void updateCoords(String cluster, BundleJob bundle, int concurrency,
-                              Date endTime) throws FalconException {
+    private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime) throws FalconException {
         if (endTime.compareTo(now()) <= 0) {
             throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
 
         // change coords
         for (CoordinatorJob coord : bundle.getCoordinators()) {
-            LOG.debug("Updating endtime of coord " + coord.getId() + " to "
-                    + SchemaHelper.formatDateUTC(endTime) + " on cluster " + cluster);
+            LOG.debug("Updating endtime of coord " + coord.getId() + " to " + SchemaHelper.formatDateUTC(endTime)
+                + " on cluster " + cluster);
             Date lastActionTime = getCoordLastActionTime(coord);
             if (lastActionTime == null) { // nothing is materialized
                 LOG.info("Nothing is materialized for this coord: " + coord.getId());
@@ -1011,12 +969,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 }
             } else {
                 LOG.info("Actions have materialized for this coord: " + coord.getId() + ", last action "
-                        + SchemaHelper.formatDateUTC(lastActionTime));
+                    + SchemaHelper.formatDateUTC(lastActionTime));
                 if (!endTime.after(lastActionTime)) {
                     Date pauseTime = offsetTime(endTime, -1);
                     // set pause time which deletes future actions
-                    LOG.info("Setting pause time on coord : " + coord.getId() + " to "
-                            + SchemaHelper.formatDateUTC(pauseTime));
+                    LOG.info("Setting pause time on coord : " + coord.getId() + " to " + SchemaHelper.formatDateUTC(
+                        pauseTime));
                     change(cluster, coord.getId(), concurrency, null, SchemaHelper.formatDateUTC(pauseTime));
                 }
                 change(cluster, coord.getId(), concurrency, endTime, "");
@@ -1037,9 +995,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private Date updateInternal(Entity oldEntity, Entity newEntity, String cluster, BundleJob oldBundle,
-                                boolean alreadyCreated, Date inEffectiveTime) throws FalconException {
+        boolean alreadyCreated, Date inEffectiveTime) throws FalconException {
         OozieWorkflowBuilder<Entity> builder =
-                (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
+            (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
 
         Job.Status oldBundleStatus = oldBundle.getStatus();
         //Suspend coords as bundle suspend doesn't suspend coords synchronously
@@ -1075,7 +1033,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
         } else {
             LOG.info("New bundle has already been created. Bundle Id: " + latestBundle.getId() + ", Start: "
-                    + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
+                + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
 
             //pick effectiveTime from already created bundle
             effectiveTime = getMinStartTime(latestBundle);
@@ -1132,14 +1090,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private List<WorkflowJob> getRunningWorkflows(String cluster,
-                                                  List<String> wfNames) throws FalconException {
+    private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws FalconException {
         StringBuilder filter = new StringBuilder();
-        filter.append(OozieClient.FILTER_STATUS).append('=')
-                .append(Job.Status.RUNNING.name());
+        filter.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING.name());
         for (String wfName : wfNames) {
-            filter.append(';').append(OozieClient.FILTER_NAME).append('=')
-                    .append(wfName);
+            filter.append(';').append(OozieClient.FILTER_NAME).append('=').append(wfName);
         }
 
         try {
@@ -1150,8 +1105,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props)
-        throws FalconException {
+    public void reRun(String cluster, String jobId, Properties props) throws FalconException {
 
         ProxyOozieClient client = OozieClientFactory.get(cluster);
         try {
@@ -1175,8 +1129,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void assertStatus(String cluster, String jobId, Status... statuses)
-        throws FalconException {
+    private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException {
 
         String actualStatus = getWorkflowStatus(cluster, jobId);
         for (int counter = 0; counter < 3; counter++) {
@@ -1191,9 +1144,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
             actualStatus = getWorkflowStatus(cluster, jobId);
         }
-        throw new FalconException("For Job" + jobId + ", actual statuses: "
-                + actualStatus + ", expected statuses: "
-                + Arrays.toString(statuses));
+        throw new FalconException("For Job" + jobId + ", actual statuses: " + actualStatus + ", expected statuses: "
+            + Arrays.toString(statuses));
     }
 
     private boolean statusEquals(String left, Status... right) {
@@ -1206,8 +1158,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public String getWorkflowStatus(String cluster, String jobId)
-        throws FalconException {
+    public String getWorkflowStatus(String cluster, String jobId) throws FalconException {
 
         ProxyOozieClient client = OozieClientFactory.get(cluster);
         try {
@@ -1242,8 +1193,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private String run(String cluster, Properties props) throws FalconException {
         try {
             String jobId = OozieClientFactory.get(cluster).run(props);
-            LOG.info("Submitted " + jobId + " on cluster " + cluster
-                    + " with properties : " + props);
+            LOG.info("Submitted " + jobId + " on cluster " + cluster + " with properties : " + props);
             return jobId;
         } catch (OozieClientException e) {
             LOG.error("Unable to schedule workflows", e);
@@ -1254,8 +1204,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private void suspend(String cluster, String jobId) throws FalconException {
         try {
             OozieClientFactory.get(cluster).suspend(jobId);
-            assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED,
-                Status.FAILED, Status.KILLED);
+            assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED, Status.FAILED,
+                Status.KILLED);
             LOG.info("Suspended job " + jobId + " on cluster " + cluster);
         } catch (OozieClientException e) {
             throw new FalconException(e);
@@ -1275,27 +1225,24 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private void kill(String cluster, String jobId) throws FalconException {
         try {
             OozieClientFactory.get(cluster).kill(jobId);
-            assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED,
-                    Status.FAILED);
+            assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED, Status.FAILED);
             LOG.info("Killed job " + jobId + " on cluster " + cluster);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
     }
 
-    private void change(String cluster, String jobId, String changeValue)
-        throws FalconException {
+    private void change(String cluster, String jobId, String changeValue) throws FalconException {
         try {
             OozieClientFactory.get(cluster).change(jobId, changeValue);
-            LOG.info("Changed bundle/coord " + jobId + ": " + changeValue
-                    + " on cluster " + cluster);
+            LOG.info("Changed bundle/coord " + jobId + ": " + changeValue + " on cluster " + cluster);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
     }
 
-    private void change(String cluster, String id, int concurrency,
-                        Date endTime, String pauseTime) throws FalconException {
+    private void change(String cluster, String id, int concurrency, Date endTime, String pauseTime)
+        throws FalconException {
         StringBuilder changeValue = new StringBuilder();
         changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=").append(concurrency).append(";");
         if (endTime != null) {
@@ -1319,9 +1266,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             CoordinatorJob coord = client.getCoordJobInfo(id);
             for (int counter = 0; counter < 3; counter++) {
                 Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime));
-                if (coord.getConcurrency() != concurrency
-                        || (endTime != null && !coord.getEndTime().equals(endTime))
-                        || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
+                if (coord.getConcurrency() != concurrency || (endTime != null && !coord.getEndTime().equals(endTime))
+                    || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
                     try {
                         Thread.sleep(100);
                     } catch (InterruptedException ignore) {
@@ -1332,12 +1278,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 }
                 coord = client.getCoordJobInfo(id);
             }
-            LOG.error("Failed to change coordinator. Current value "
-                    + coord.getConcurrency() + ", "
-                    + SchemaHelper.formatDateUTC(coord.getEndTime()) + ", "
-                    + SchemaHelper.formatDateUTC(coord.getPauseTime()));
-            throw new FalconException("Failed to change coordinator " + id
-                    + " with change value " + changeValueStr);
+            LOG.error("Failed to change coordinator. Current value " + coord.getConcurrency() + ", "
+                + SchemaHelper.formatDateUTC(coord.getEndTime()) + ", " + SchemaHelper.formatDateUTC(
+                    coord.getPauseTime()));
+            throw new FalconException("Failed to change coordinator " + id + " with change value " + changeValueStr);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
@@ -1355,8 +1299,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public InstancesResult getJobDetails(String cluster, String jobId)
-        throws FalconException {
+    public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException {
         Instance[] instances = new Instance[1];
         Instance instance = new Instance();
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index e780c18..a23c396 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -126,7 +126,6 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
                     entityName, nominalTime, workflowId, workflowUser, runId, operation,
                     SchemaHelper.formatDateUTC(startTime), duration);
 
-                notifySLAService(cluster, entityName, entityType, nominalTime, duration);
                 notifyMetadataMappingService(entityName, operation, mapMessage.getString(ARG.logDir.getArgName()));
             }
         } catch (JMSException e) {
@@ -138,20 +137,6 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
         }
     }
 
-    private void notifySLAService(String cluster, String entityName,
-                                  String entityType, String nominalTime, Long duration) {
-        try {
-            getSLAMonitoringService().notifyCompletion(EntityUtil.getEntity(entityType, entityName),
-                    cluster, SchemaHelper.parseDateUTC(nominalTime), duration);
-        } catch (Throwable e) {
-            LOG.warn("Unable to notify SLA Service", e);
-        }
-    }
-
-    private SLAMonitoringService getSLAMonitoringService() {
-        return Services.get().getService(SLAMonitoringService.SERVICE_NAME);
-    }
-
     private void notifyMetadataMappingService(String entityName, String operation,
                                               String logDir) throws FalconException {
         if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
deleted file mode 100644
index d3d9e19..0000000
--- a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.falcon.workflow.engine.WorkflowEngineActionListener;
-import org.apache.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-/**
- * A service implementation for SLA Monitoring.
- */
-public class SLAMonitoringService implements FalconService, WorkflowEngineActionListener {
-    private static final Logger LOG = Logger.getLogger(SLAMonitoringService.class);
-    public static final String SERVICE_NAME = "SLAMonitor";
-
-    private ConcurrentMap<String, Long> monitoredEntities = new ConcurrentHashMap<String, Long>();
-
-    private ConcurrentMap<String, ConcurrentMap<Date, Date>> pendingJobs
-        = new ConcurrentHashMap<String, ConcurrentMap<Date, Date>>();
-
-    private static final long INITIAL_LATENCY_SECS = 12 * 3600;
-
-    private static final long POLL_PERIODICITY_SECS = 300;
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        WorkflowEngineFactory.getWorkflowEngine().registerListener(this);
-        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), POLL_PERIODICITY_SECS, POLL_PERIODICITY_SECS, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-    }
-
-    @Override
-    public void afterSchedule(Entity entity, String cluster) throws FalconException {
-        addEntityForMonitoring(entity, cluster);
-    }
-
-    @Override
-    public void afterDelete(Entity entity, String cluster) throws FalconException {
-        removeMonitoredEntity(entity, cluster);
-    }
-
-    @Override
-    public void afterSuspend(Entity entity, String cluster) throws FalconException {
-        removeMonitoredEntity(entity, cluster);
-    }
-
-    @Override
-    public void afterResume(Entity entity, String cluster) throws FalconException {
-        addEntityForMonitoring(entity, cluster);
-    }
-
-    public void notifyCompletion(Entity entity, String cluster, Date nominalTime, long duration) {
-        if (!isEntityMonitored(entity, cluster)) {
-            addEntityForMonitoring(entity, cluster);
-        }
-        updateLatency(entity, cluster, duration);
-        removeFromPendingList(entity, cluster, nominalTime);
-    }
-
-    private String getKey(Entity entity, String cluster) {
-        return entity.toShortString() + "/" + cluster;
-    }
-
-    private void addEntityForMonitoring(Entity entity, String cluster) {
-        monitoredEntities.putIfAbsent(getKey(entity, cluster), INITIAL_LATENCY_SECS);
-    }
-
-    private void removeMonitoredEntity(Entity entity, String cluster) {
-        monitoredEntities.remove(getKey(entity, cluster));
-        pendingJobs.remove(getKey(entity, cluster));
-    }
-
-    private boolean isEntityMonitored(Entity entity, String cluster) {
-        return monitoredEntities.containsKey(getKey(entity, cluster));
-    }
-
-    private void updateLatency(Entity entity, String cluster, long duration) {
-        long newLatency = (duration + monitoredEntities.get(getKey(entity, cluster))) / 2;
-        monitoredEntities.put(getKey(entity, cluster), newLatency);
-    }
-
-    private void removeFromPendingList(Entity entity, String cluster, Date nominalTime) {
-        ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(getKey(entity, cluster));
-        if (pendingInstances != null) {
-            LOG.debug("Removing from pending jobs: " + getKey(entity, cluster) + " ---> "
-                    + SchemaHelper.formatDateUTC(nominalTime));
-            pendingInstances.remove(nominalTime);
-        }
-    }
-
-    private class Monitor implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                if (monitoredEntities.isEmpty()) {
-                    return;
-                }
-                Set<String> keys = new HashSet<String>(monitoredEntities.keySet());
-                checkSLAMissOnPendingEntities(keys);
-                addNewPendingEntities(keys);
-            } catch (Throwable e) {
-                LOG.error("Monitor failed: ", e);
-            }
-        }
-
-        private void checkSLAMissOnPendingEntities(Set<String> keys) throws FalconException {
-            Date now = new Date();
-            for (String key : keys) {
-                ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
-                if (pendingInstances == null) {
-                    continue;
-                }
-                ConcurrentMap<Date, Date> interim =
-                        new ConcurrentHashMap<Date, Date>(pendingInstances);
-                for (Map.Entry<Date, Date> entry : interim.entrySet()) {
-                    if (entry.getValue().before(now)) {
-                        Entity entity = getEntity(key);
-                        String cluster = getCluster(key);
-                        GenericAlert.alertOnLikelySLAMiss(cluster, entity.getEntityType().name(),
-                                entity.getName(), SchemaHelper.formatDateUTC(entry.getKey()));
-                        LOG.debug("Removing from pending jobs: " + key + " ---> "
-                                + SchemaHelper.formatDateUTC(entry.getKey()));
-                        pendingInstances.remove(entry.getKey());
-                    }
-                }
-                interim.clear();
-            }
-        }
-
-        private void addNewPendingEntities(Set<String> keys) throws FalconException {
-            Date now = new Date();
-            Date windowEndTime = new Date(now.getTime() + POLL_PERIODICITY_SECS * 1000);
-            for (String key : keys) {
-                Entity entity = getEntity(key);
-                String cluster = getCluster(key);
-                if (entity == null) {
-                    LOG.warn("No entity for " + key);
-                    continue;
-                }
-                Date startTime = EntityUtil.getStartTime(entity, cluster);
-                Frequency frequency = EntityUtil.getFrequency(entity);
-                TimeZone timeZone = EntityUtil.getTimeZone(entity);
-                Date nextStart = EntityUtil.getNextStartTime(startTime, frequency, timeZone, now);
-                if (nextStart.after(windowEndTime)) {
-                    continue;
-                }
-                ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
-                while (!nextStart.after(windowEndTime)) {
-                    if (pendingInstances == null) {
-                        pendingJobs.putIfAbsent(key, new ConcurrentHashMap<Date, Date>());
-                        pendingInstances = pendingJobs.get(key);
-                    }
-                    Long latency = monitoredEntities.get(key);
-                    if (latency == null) {
-                        break;
-                    }
-                    // 1.5 times latency is when it is supposed to have breached
-                    pendingInstances.putIfAbsent(nextStart, new Date(nextStart.getTime() + latency * 1500));
-                    LOG.debug("Adding to pending jobs: " + key + " ---> " + SchemaHelper.formatDateUTC(nextStart));
-                    Calendar startCal = Calendar.getInstance(timeZone);
-                    startCal.setTime(nextStart);
-                    startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
-                    nextStart = startCal.getTime();
-                }
-            }
-        }
-    }
-
-    private static final Pattern PATTERN = Pattern.compile("[()\\s/]");
-
-    private Entity getEntity(String key) throws FalconException {
-        String[] parts = PATTERN.split(key);
-        String name = parts[3];
-        String type = parts[1];
-        return EntityUtil.getEntity(type, name);
-    }
-
-    private String getCluster(String key) throws FalconException {
-        String[] parts = PATTERN.split(key);
-        return parts[4];
-    }
-
-    @Override
-    public void beforeSchedule(Entity entity, String cluster) throws FalconException {
-    }
-
-    @Override
-    public void beforeDelete(Entity entity, String cluster) throws FalconException {
-    }
-
-    @Override
-    public void beforeSuspend(Entity entity, String cluster) throws FalconException {
-    }
-
-    @Override
-    public void beforeResume(Entity entity, String cluster) throws FalconException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index f413019..2457862 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -37,7 +37,6 @@
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
-                        org.apache.falcon.service.SLAMonitoringService,\
                         org.apache.falcon.metadata.MetadataMappingService,\
                         org.apache.falcon.service.LogCleanupService
 prism.application.services=org.apache.falcon.entity.store.ConfigurationStore


Mime
View raw message