falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [4/4] falcon git commit: FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (by Pallavi Rao)
Date Mon, 18 Jan 2016 06:12:09 GMT
FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (by Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9864f001
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9864f001
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9864f001

Branch: refs/heads/master
Commit: 9864f00131eb6cb0329b502be6b4e5154fbe0f05
Parents: 32d2363
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Mon Jan 18 11:40:20 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Jan 18 11:40:20 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../workflow/engine/OozieWorkflowEngine.java    |  7 ++++++-
 .../execution/FalconExecutionService.java       |  5 +++++
 .../falcon/state/store/jdbc/InstanceBean.java   |  2 +-
 .../falcon/state/store/jdbc/JDBCStateStore.java |  3 ++-
 .../workflow/engine/FalconWorkflowEngine.java   |  9 ++++++---
 .../state/service/store/TestJDBCStateStore.java | 21 ++++++++++++++++----
 7 files changed, 39 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b0a585..7fec9e1 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Proposed Release Version: 0.9
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (Pallavi
Rao)
+
     FALCON-1545 Add documentation for Hive replication job counters(Peeyush Bishnoi via Ajay
Yadava)
 
     FALCON-1601 Make Falcon StateStore more secure by not disclosing imp params in startup.props(Pavan
Kumar Kolamuri via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 07eb47c..f4bb406 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -269,12 +269,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private boolean isBundleInState(Map<String, BundleJob> bundles,
                                     BundleStatus status) throws FalconException {
 
+        // Need a separate list to avoid concurrent modification.
+        List<String> bundlesToRemove = new ArrayList<>();
         // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is
not scheduled. Return false.
         for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) {
             if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this
cluster
-                bundles.remove(clusterBundle.getKey());
+                bundlesToRemove.add(clusterBundle.getKey());
             }
         }
+        for (String bundleToRemove : bundlesToRemove) {
+            bundles.remove(bundleToRemove);
+        }
         if (bundles.size() == 0) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index f45ec98..7bdcd6f 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -221,6 +221,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC
      */
     public void delete(Entity entity) throws FalconException {
         for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityClusterID id = new EntityClusterID(entity, cluster);
+            if (!executors.containsKey(id)) {
+                LOG.info("Entity {} is already deleted on cluster {}.", id, cluster);
+                continue;
+            }
             EntityExecutor executor = getEntityExecutor(entity, cluster);
             executor.killAll();
             executors.remove(executor.getId());

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 5ed3ccd..7f7b966 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -48,7 +48,7 @@ import java.sql.Timestamp;
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a)
from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select
OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState
IN (:currentState)"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a)
from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
-        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select
OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)
AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query
= "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster
AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime
< :endTime"),
         @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a)
from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime
desc"),
         @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
 })

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index e898247..2eafbce 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -362,7 +362,7 @@ public final class JDBCStateStore extends AbstractStateStore {
                                                            DateTime end) throws StateStoreException
{
         String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
         EntityManager entityManager = getEntityManager();
-        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE");
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE");
         q.setParameter("entityId", entityKey);
         List<String> instanceStates = new ArrayList<>();
         for (InstanceState.STATE state : states) {
@@ -371,6 +371,7 @@ public final class JDBCStateStore extends AbstractStateStore {
         q.setParameter("currentState", instanceStates);
         q.setParameter("startTime", new Timestamp(start.getMillis()));
         q.setParameter("endTime", new Timestamp(end.getMillis()));
+        q.setParameter("cluster", cluster);
         List result  = q.getResultList();
         entityManager.close();
         try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index bffdb0b..efe9049 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -319,6 +319,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
             for (String name : props.stringPropertyNames()) {
                 keyValuePairs[i++] = new InstancesResult.KeyValuePair(name, props.getProperty(name));
             }
+            instanceInfo.wfParams = keyValuePairs;
             break;
         default:
             throw new IllegalArgumentException("Unhandled action " + action);
@@ -416,9 +417,11 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
         boolean entityUpdated =
                 UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster,
                         EntityUtil.getLatestStagingPath(clusterEntity, oldEntity));
-
+        StringBuilder result = new StringBuilder();
         if (!entityUpdated) {
-            throw new FalconException("No relevant updates detected in the new entity definition!");
+            // Ideally should throw an exception, but, keeping it backward-compatible.
+            LOG.warn("No relevant updates detected in the new entity definition for entity
{}!", newEntity.getName());
+            return result.toString();
         }
 
         Date oldEndTime = EntityUtil.getEndTime(oldEntity, cluster);
@@ -435,7 +438,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
         Collection<InstanceState> instances = new ArrayList<>();
         instances.add(STATE_STORE.getLastExecutionInstance(oldEntity, cluster));
         EXECUTION_SERVICE.getEntityExecutor(oldEntity, cluster).update(newEntity);
-        StringBuilder result = new StringBuilder();
+
         result.append(newEntity.toShortString()).append("/Effective Time: ")
                 .append(getEffectiveTime(newEntity, cluster, instances));
         return result.toString();

http://git-wip-us.apache.org/repos/asf/falcon/blob/9864f001/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index bb8ff61..2a383cc 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -346,21 +346,34 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase {
         InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
         instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
 
+        ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster2", instance2Time);
+        InstanceState instanceState3 = new InstanceState(processExecutionInstance3);
+        instanceState3.setCurrentState(InstanceState.STATE.RUNNING);
+
         stateStore.putExecutionInstance(instanceState1);
         stateStore.putExecutionInstance(instanceState2);
+        stateStore.putExecutionInstance(instanceState3);
 
         List<InstanceState.STATE> states = new ArrayList<>();
         states.add(InstanceState.STATE.RUNNING);
 
         Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
                 "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time
+ 60000));
-        Assert.assertEquals(1, actualInstances.size());
-        Assert.assertEquals(instanceState1, actualInstances.toArray()[0]);
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
 
         actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
                 "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time
+ 60000));
-        Assert.assertEquals(1, actualInstances.size());
-        Assert.assertEquals(instanceState2, actualInstances.toArray()[0]);
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState2);
+
+        // Ensure we can get instances for a different cluster
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+                "cluster2", states, new DateTime(instance2Time), new DateTime(instance2Time
+ 60000));
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState3);
 
     }
 


Mime
View raw message