falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [2/2] falcon git commit: FALCON-1568 Process Instances are not getting scheduled in Falcon Native Scheduler
Date Mon, 07 Dec 2015 06:13:08 GMT
FALCON-1568 Process Instances are not getting scheduled in Falcon Native Scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/97e35874
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/97e35874
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/97e35874

Branch: refs/heads/master
Commit: 97e35874fa62103ba40a8bbb908c79c4f3b79a2d
Parents: 3487f71
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Mon Dec 7 10:47:16 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Dec 7 10:47:16 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                        |  4 ++++
 .../falcon/execution/ProcessExecutionInstance.java |  4 +++-
 .../service/impl/JobCompletionService.java         | 17 +++++++++++------
 3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 193a26e..4690c95 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,10 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1598 Flaky test : EntityManagerJerseyIT.testDuplicateDeleteCommands (Narayan Periwal
via Pallavi Rao)
+
+    FALCON-1568 Process Instances are not getting scheduled in Falcon Native Scheduler (Pallavi
Rao)
+
     FALCON-1595 In secure cluster, Falcon server loses ability to communicate with HDFS over
time (Balu Vellanki)
 
     FALCON-1490 Fixing inconsistencies in filterBy behavior (Narayan Periwal via Balu Vellanki)

http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index cff4a73..f3beabc 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -320,6 +320,8 @@ public class ProcessExecutionInstance extends ExecutionInstance {
 
     @Override
     public void destroy() throws FalconException {
-        NotificationServicesRegistry.unregister(executionService, getId());
+        // Only Registration to Data service happens via process execution instance. So,
handle just that.
+        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
+                .unregister(executionService, getId());
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
index 501c6aa..23f2b4e 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -41,12 +41,13 @@ import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * This notification service notifies {@link NotificationHandler} when an external job
@@ -57,7 +58,7 @@ public class JobCompletionService implements FalconNotificationService,
Workflow
     private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class);
     private static final DateTimeZone UTC = DateTimeZone.UTC;
 
-    private List<NotificationHandler> listeners = Collections.synchronizedList(new
ArrayList<NotificationHandler>());
+    private Set<NotificationHandler> listeners = Collections.synchronizedSet(new HashSet<NotificationHandler>());
 
     @Override
     public void register(NotificationRequest notifRequest) throws NotificationServiceException
{
@@ -140,9 +141,13 @@ public class JobCompletionService implements FalconNotificationService,
Workflow
 
     private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status status) throws
FalconException {
         JobCompletedEvent event = new JobCompletedEvent(constructCallbackID(context), status,
getEndTime(context));
-        for (NotificationHandler handler : listeners) {
-            LOG.debug("Notifying {} with event {}", handler, event.getTarget());
-            handler.onEvent(event);
+        synchronized (listeners) {
+            Iterator<NotificationHandler> iterator = listeners.iterator();
+            while(iterator.hasNext()) {
+                NotificationHandler handler = iterator.next();
+                LOG.debug("Notifying {} with event {}", handler, event.getTarget());
+                handler.onEvent(event);
+            }
         }
     }
 


Mime
View raw message