falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1572 Only one instance is running in a process when run using Native Scheduler. Contributed by Pallavi Rao.
Date Mon, 14 Dec 2015 13:59:56 GMT
Repository: falcon
Updated Branches:
  refs/heads/master f9c8feac1 -> 3059980c6


FALCON-1572 Only one instance is running in a process when run using Native Scheduler. Contributed
by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3059980c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3059980c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3059980c

Branch: refs/heads/master
Commit: 3059980c65400bc2dc0100cdf3b1a781927a3203
Parents: f9c8fea
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Dec 14 17:50:22 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Dec 14 17:50:22 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 scheduler/pom.xml                               |  6 ++
 .../apache/falcon/execution/EntityExecutor.java |  5 ++
 .../falcon/execution/ExecutionInstance.java     |  5 ++
 .../execution/FalconExecutionService.java       |  5 ++
 .../falcon/execution/NotificationHandler.java   | 24 +++++++
 .../service/impl/JobCompletionService.java      | 11 +++-
 .../service/impl/SchedulerService.java          | 69 ++++++++++----------
 .../service/SchedulerServiceTest.java           | 10 +++
 9 files changed, 100 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1f09d3..4d42881 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1572 Only one instance is running in a process when run using Native Scheduler(Pallavi
Rao via Ajay Yadava)
+
     FALCON-1660 Examples directory missing in distributed mode(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1647 Unable to create feed : FilePermission error under cluster staging directory(Balu
Vellanki via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 336997d..c305651 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -137,6 +137,12 @@
             <artifactId>derby</artifactId>
             <version>10.10.1.1</version>
         </dependency>
+
+        <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+            <version>${commons-dbcp.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
index 88d88c1..c9c0f42 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -108,4 +108,9 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta
     public EntityClusterID getId() {
         return id;
     }
+
+    @Override
+    public PRIORITY getPriority() {
+        return PRIORITY.MEDIUM;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 5f96d3f..3cc8a25 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -202,4 +202,9 @@ public abstract class ExecutionInstance implements NotificationHandler
{
      * @throws FalconException
      */
     public abstract void destroy() throws FalconException;
+
+    @Override
+    public PRIORITY getPriority() {
+        return PRIORITY.MEDIUM;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index b6741a4..01208d6 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -133,6 +133,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     }
 
     @Override
+    public PRIORITY getPriority() {
+        return PRIORITY.HIGH;
+    }
+
+    @Override
     public void onSubmit(Entity entity) throws FalconException {
         // Do nothing
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
index b071f5f..2a2589e 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
@@ -24,6 +24,23 @@ import org.apache.falcon.notification.service.event.Event;
  * An interface that every class that handles notifications from notification services must
implement.
  */
 public interface NotificationHandler {
+
+    /**
+     * When there are multiple notification handlers for the same event,
+     * the priority determines which handler gets notified first.
+     */
+    enum PRIORITY {HIGH(5), MEDIUM(3), LOW(0);
+
+        private final int priority;
+
+        PRIORITY(int i) {
+            this.priority = i;
+        }
+
+        public int getPriority() {
+            return priority;
+        }
+    }
     /**
      * The method a notification service calls to onEvent an event.
      *
@@ -31,4 +48,11 @@ public interface NotificationHandler {
      * @throws FalconException
      */
     void onEvent(Event event) throws FalconException;
+
+    /**
+     * When there are multiple notification handlers for the same event,
+     * the priority determines which handler gets notified first.
+     * @return
+     */
+    PRIORITY getPriority();
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
index 23f2b4e..4278d3f 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.falcon.notification.service.impl;
 
+import java.util.Comparator;
+import java.util.TreeSet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
@@ -43,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
@@ -58,7 +59,13 @@ public class JobCompletionService implements FalconNotificationService,
Workflow
     private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class);
     private static final DateTimeZone UTC = DateTimeZone.UTC;
 
-    private Set<NotificationHandler> listeners = Collections.synchronizedSet(new HashSet<NotificationHandler>());
+    private Set<NotificationHandler> listeners = Collections.synchronizedSet(new TreeSet<>(
+            new Comparator<NotificationHandler>() {
+                @Override
+                public int compare(NotificationHandler o1, NotificationHandler o2) {
+                    return Integer.compare(o1.getPriority().getPriority(), o2.getPriority().getPriority());
+                }
+            }));
 
     @Override
     public void register(NotificationRequest notifRequest) throws NotificationServiceException
{

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index ace8444..fb11091 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -23,6 +23,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
@@ -55,9 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -85,7 +86,7 @@ public class SchedulerService implements FalconNotificationService, Notification
 
     private Cache<InstanceID, Object> instancesToIgnore;
     // TODO : limit the no. of awaiting instances per entity
-    private LoadingCache<EntityClusterID, List<ExecutionInstance>> executorAwaitedInstances;
+    private LoadingCache<EntityClusterID, SortedMap<Integer, ExecutionInstance>>
executorAwaitedInstances;
 
     @Override
     public void register(NotificationRequest notifRequest) throws NotificationServiceException
{
@@ -108,7 +109,6 @@ public class SchedulerService implements FalconNotificationService, Notification
             // Not efficient to iterate over elements to remove this. Add to ignore list.
             instancesToIgnore.put((InstanceID) listenerID, new Object());
         }
-
     }
 
     @Override
@@ -130,15 +130,16 @@ public class SchedulerService implements FalconNotificationService,
Notification
         PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20,
new PriorityComparator());
         runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq);
 
-        CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, Collection<ExecutionInstance>>()
{
+        CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, SortedMap<Integer,
ExecutionInstance>>() {
             @Override
-            public Collection<ExecutionInstance> load(EntityClusterID id) throws Exception
{
+            public SortedMap<Integer, ExecutionInstance> load(EntityClusterID id) throws
Exception {
                 List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
                 states.add(InstanceState.STATE.READY);
-                List<ExecutionInstance> readyInstances = new ArrayList<>();
+                SortedMap<Integer, ExecutionInstance> readyInstances = Collections.synchronizedSortedMap(
+                        new TreeMap<Integer, ExecutionInstance>());
                 // TODO : Limit it to no. of instances that can be run in parallel.
                 for (InstanceState state : STATE_STORE.getExecutionInstances(id, states))
{
-                    readyInstances.add(state.getInstance());
+                    readyInstances.put(state.getInstance().getInstanceSequence(), state.getInstance());
                 }
                 return readyInstances;
             }
@@ -184,7 +185,7 @@ public class SchedulerService implements FalconNotificationService, Notification
         if (event.getType() == EventType.JOB_COMPLETED) {
             try {
                 ID targetID = event.getTarget();
-                List<ExecutionInstance> instances = null;
+                SortedMap<Integer, ExecutionInstance> instances = null;
                 // Check if the instance is awaited.
                 if (targetID instanceof EntityClusterID) {
                     EntityClusterID id = (EntityClusterID) event.getTarget();
@@ -197,19 +198,22 @@ public class SchedulerService implements FalconNotificationService,
Notification
                     instances = executorAwaitedInstances.get(id.getEntityClusterID());
                 }
                 if (instances != null && !instances.isEmpty()) {
-                    ExecutionInstance instance = instances.get(0);
-                    if (instance != null && instance.getAwaitingPredicates() != null)
{
-                        for (Predicate predicate : instance.getAwaitingPredicates()) {
-                            if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) {
-                                // Construct a request object
-                                NotificationHandler handler = ReflectionUtils
-                                        .getInstanceByClassName(predicate.getClauseValue("handler").toString());
-                                JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder(
-                                        handler, instance.getId());
-                                requestBuilder.setInstance(instance);
-                                InstanceRunner runner = new InstanceRunner(requestBuilder.build());
-                                runQueue.execute(runner);
-                                instances.remove(instance);
+                    synchronized (instances) {
+                        // Order is FIFO..
+                        ExecutionInstance instance = instances.get(instances.firstKey());
+                        if (instance != null && instance.getAwaitingPredicates()
!= null) {
+                            for (Predicate predicate : instance.getAwaitingPredicates())
{
+                                if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION)
{
+                                    // Construct a request object
+                                    NotificationHandler handler = ReflectionUtils
+                                            .getInstanceByClassName(predicate.getClauseValue("handler").toString());
+                                    JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder(
+                                            handler, instance.getId());
+                                    requestBuilder.setInstance(instance);
+                                    InstanceRunner runner = new InstanceRunner(requestBuilder.build());
+                                    runQueue.execute(runner);
+                                    instances.remove(instance.getInstanceSequence());
+                                }
                             }
                         }
                     }
@@ -221,6 +225,11 @@ public class SchedulerService implements FalconNotificationService, Notification
     }
 
     @Override
+    public PRIORITY getPriority() {
+        return PRIORITY.MEDIUM;
+    }
+
+    @Override
     public void destroy() throws FalconException {
         runQueue.shutdownNow();
         instancesToIgnore.invalidateAll();
@@ -244,10 +253,6 @@ public class SchedulerService implements FalconNotificationService, Notification
             allowedParallelInstances = EntityUtil.getParallel(instance.getEntity());
         }
 
-        public int incrementAllowedInstances() {
-            return ++allowedParallelInstances;
-        }
-
         private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
             switch(entity.getEntityType()) {
             case PROCESS :
@@ -319,15 +324,9 @@ public class SchedulerService implements FalconNotificationService, Notification
         }
 
         private void updateExecutorAwaitedInstances(EntityClusterID id) throws ExecutionException
{
-            synchronized (id) {
-                List<ExecutionInstance> instances = executorAwaitedInstances.get(id);
-                if (instances == null) {
-                    // Order is FIFO.
-                    instances = new LinkedList<>();
-                    executorAwaitedInstances.put(id, instances);
-                }
-                instances.add(instance);
-            }
+            SortedMap<Integer, ExecutionInstance> instances = executorAwaitedInstances.get(id);
+            // instances will never be null as it is initialized in the loading cache.
+            instances.put(instance.getInstanceSequence(), instance);
         }
 
         private boolean dependencyCheck() throws FalconException, ExecutionException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index c43ccf0..5a66518 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -287,6 +287,11 @@ public class SchedulerServiceTest extends AbstractTestBase {
                     failed = true;
                 }
             }
+
+            @Override
+            public PRIORITY getPriority() {
+                return PRIORITY.MEDIUM;
+            }
         };
         SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
                 scheduler.createRequestBuilder(failureHandler, instance1.getId());
@@ -317,6 +322,11 @@ public class SchedulerServiceTest extends AbstractTestBase {
                 stateStore.updateExecutionInstance(state);
             }
         }
+
+        @Override
+        public PRIORITY getPriority() {
+            return PRIORITY.MEDIUM;
+        }
     }
 }
 


Mime
View raw message