falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes. Contributed by Ajay Yadava.
Date Mon, 23 Nov 2015 14:54:24 GMT
Repository: falcon
Updated Branches:
  refs/heads/master f3b781dec -> c4958773d


FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c4958773
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c4958773
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c4958773

Branch: refs/heads/master
Commit: c4958773db9f7abc771c99b6b704951083020cc8
Parents: f3b781d
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Nov 23 19:53:33 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Nov 23 19:53:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../apache/falcon/execution/EntityExecutor.java |   6 +-
 .../falcon/execution/ExecutionInstance.java     |   4 +-
 .../execution/FalconExecutionService.java       |  34 ++--
 .../execution/ProcessExecutionInstance.java     |   8 +-
 .../falcon/execution/ProcessExecutor.java       |  44 ++---
 .../service/impl/JobCompletionService.java      |  15 +-
 .../service/impl/SchedulerService.java          |  57 +++---
 .../apache/falcon/state/EntityClusterID.java    |  51 ++++++
 .../java/org/apache/falcon/state/EntityID.java  |  51 ++++++
 .../main/java/org/apache/falcon/state/ID.java   | 177 +++----------------
 .../org/apache/falcon/state/InstanceID.java     |  83 +++++++++
 .../org/apache/falcon/state/InstanceState.java  |   5 +
 .../org/apache/falcon/state/StateService.java   |   4 +-
 .../falcon/state/store/AbstractStateStore.java  |   8 +-
 .../falcon/state/store/EntityStateStore.java    |   8 +-
 .../falcon/state/store/InMemoryStateStore.java  |  62 +++----
 .../falcon/state/store/InstanceStateStore.java  |  16 +-
 .../workflow/engine/FalconWorkflowEngine.java   |   6 +-
 .../execution/FalconExecutionServiceTest.java   |  33 ++--
 .../notification/service/AlarmServiceTest.java  |   6 +-
 .../service/SchedulerServiceTest.java           |  17 +-
 .../apache/falcon/predicate/PredicateTest.java  |   6 +-
 .../falcon/state/InstanceStateServiceTest.java  |   2 +-
 24 files changed, 398 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 700f7e0..e4ee8f8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,10 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes(Ajay Yadava)
+
+    FALCON-1587 Divide FalconCLI.twiki into sub sections for different modules on the lines of REST Api(Praveen Adlakha via Ajay Yadava)
+
     FALCON-1552 Migration of  ProcessInstanceManagerIT to use falcon unit (Narayan Periwal via Pallavi Rao)
 
     FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
index 9b07b9e..88d88c1 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -20,7 +20,7 @@ package org.apache.falcon.execution;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.InstanceStateChangeHandler;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
@@ -37,7 +37,7 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta
     public static final String DEFAULT_CACHE_SIZE = "20";
     protected String cluster;
     protected static final StateStore STATE_STORE = AbstractStateStore.get();
-    protected ID id;
+    protected EntityClusterID id;
 
     /**
      * Schedules execution instances for the entity. Idempotent operation.
@@ -105,7 +105,7 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta
     /**
      * @return ID of the entity
      */
-    public ID getId() {
+    public EntityClusterID getId() {
         return id;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 3869ff2..2d6b67d 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -21,7 +21,7 @@ package org.apache.falcon.execution;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.predicate.Predicate;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -84,7 +84,7 @@ public abstract class ExecutionInstance implements NotificationHandler {
     /**
      * @return The unique ID of this instance. The instance is referred using this ID inside the system.
      */
-    public abstract ID getId();
+    public abstract InstanceID getId();
 
     /**
      * @return - The entity to which this instance belongs.

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index b959320..b48a65b 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -24,9 +24,10 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.EntityStateChangeHandler;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.StateService;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.slf4j.Logger;
@@ -45,7 +46,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class);
 
     // Stores all entity executors in memory
-    private ConcurrentMap<ID, EntityExecutor> executors = new ConcurrentHashMap<ID, EntityExecutor>();
+    private ConcurrentMap<EntityClusterID, EntityExecutor> executors = new ConcurrentHashMap<>();
 
     private static FalconExecutionService executionService = new FalconExecutionService();
 
@@ -61,7 +62,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC
             try {
                 for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
                     EntityExecutor executor = createEntityExecutor(entity, cluster);
-                    executors.put(new ID(entity, cluster), executor);
+                    executors.put(new EntityClusterID(entity, cluster), executor);
                     executor.schedule();
                 }
             } catch (FalconException e) {
@@ -108,12 +109,21 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     @Override
     public void onEvent(Event event) throws FalconException {
         // Currently, simply passes along the event to the appropriate executor
-        EntityExecutor executor = executors.get(event.getTarget().getEntityID());
-        if (executor == null) {
-            // The executor has gone away, throw an exception so the notification service knows
-            throw new FalconException("Target executor for " + event.getTarget().getEntityID() + " does not exist.");
+        EntityClusterID id = null;
+        if (event.getTarget() instanceof EntityClusterID) {
+            id = (EntityClusterID) event.getTarget();
+        } else if (event.getTarget() instanceof InstanceID) {
+            id = ((InstanceID) event.getTarget()).getEntityClusterID();
+        }
+
+        if (id != null) {
+            EntityExecutor executor = executors.get(id);
+            if (executor == null) {
+                // The executor has gone away, throw an exception so the notification service knows
+                throw new FalconException("Target executor for " + event.getTarget() + " does not exist.");
+            }
+            executor.onEvent(event);
         }
-        executor.onEvent(event);
     }
 
     @Override
@@ -125,7 +135,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     public void onSchedule(Entity entity) throws FalconException {
         for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
             EntityExecutor executor = createEntityExecutor(entity, cluster);
-            ID id = new ID(entity, cluster);
+            EntityClusterID id = new EntityClusterID(entity, cluster);
             executors.put(id, executor);
             LOG.info("Scheduling entity {}.", id);
             executor.schedule();
@@ -144,7 +154,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     public void onResume(Entity entity) throws FalconException {
         for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
             EntityExecutor executor = createEntityExecutor(entity, cluster);
-            executors.put(new ID(entity, cluster), executor);
+            executors.put(new EntityClusterID(entity, cluster), executor);
             LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(),
                     entity.getEntityType(), cluster);
             executor.resumeAll();
@@ -204,11 +214,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC
      * @throws FalconException
      */
     public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws FalconException {
-        ID id = new ID(entity, cluster);
+        EntityClusterID id = new EntityClusterID(entity, cluster);
         if (executors.containsKey(id)) {
             return executors.get(id);
         } else {
-            throw new FalconException("Entity executor for entity : " + id.getEntityKey() + " does not exist.");
+            throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 8c84f2b..434f168 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -36,7 +36,7 @@ import org.apache.falcon.notification.service.event.JobCompletedEvent;
 import org.apache.falcon.notification.service.event.JobScheduledEvent;
 import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.predicate.Predicate;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -60,7 +60,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
     private DAGEngine dagEngine = null;
     private boolean hasTimedOut = false;
-    private ID id;
+    private InstanceID id;
     private int instanceSequence;
     private final FalconExecutionService executionService = FalconExecutionService.get();
 
@@ -75,7 +75,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
         super(instanceTime, cluster);
         this.process = process;
-        this.id = new ID(process, cluster, getInstanceTime());
+        this.id = new InstanceID(process, cluster, getInstanceTime());
         computeInstanceSequence();
         dagEngine = DAGEngineFactory.getDAGEngine(cluster);
         registerForNotifications(false);
@@ -210,7 +210,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     }
 
     @Override
-    public ID getId() {
+    public InstanceID getId() {
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index d10d2fd..e446069 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -32,12 +32,13 @@ import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobCompletedEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.AlarmService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
-import org.apache.falcon.notification.service.impl.AlarmService;
 import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.EntityState;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.StateService;
 import org.apache.falcon.util.StartupProperties;
@@ -58,7 +59,7 @@ import java.util.TimeZone;
  */
 public class ProcessExecutor extends EntityExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class);
-    protected LoadingCache<ID, ProcessExecutionInstance> instances;
+    protected LoadingCache<InstanceID, ProcessExecutionInstance> instances;
     private Predicate triggerPredicate;
     private final Process process;
     private final StateService stateService = StateService.get();
@@ -74,7 +75,7 @@ public class ProcessExecutor extends EntityExecutor {
     public ProcessExecutor(Process proc, String clusterName) throws FalconException {
         process = proc;
         cluster = clusterName;
-        id = new ID(proc, clusterName);
+        id = new EntityClusterID(proc, clusterName);
     }
 
     @Override
@@ -84,7 +85,7 @@ public class ProcessExecutor extends EntityExecutor {
             initInstances();
         }
         // Check to handle restart and restoration from state store.
-        if (STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SCHEDULED) {
+        if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) {
             dryRun();
         } else {
             LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster);
@@ -105,9 +106,9 @@ public class ProcessExecutor extends EntityExecutor {
 
         instances = CacheBuilder.newBuilder()
                 .maximumSize(cacheSize)
-                .build(new CacheLoader<ID, ProcessExecutionInstance>() {
+                .build(new CacheLoader<InstanceID, ProcessExecutionInstance>() {
                     @Override
-                    public ProcessExecutionInstance load(ID id) throws Exception {
+                    public ProcessExecutionInstance load(InstanceID id) throws Exception {
                         return (ProcessExecutionInstance) STATE_STORE.getExecutionInstance(id).getInstance();
                     }
                 });
@@ -289,13 +290,16 @@ public class ProcessExecutor extends EntityExecutor {
                 handleEvent(event);
             } else {
                 // Else, pass it along to the execution instance
-                ProcessExecutionInstance instance = instances.get(event.getTarget());
-                if (instance != null) {
-                    instance.onEvent(event);
-                    if (instance.isReady()) {
-                        stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
-                    } else if (instance.hasTimedout()) {
-                        stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+                if (event.getTarget() instanceof InstanceID) {
+                    InstanceID instanceID = (InstanceID) event.getTarget();
+                    ProcessExecutionInstance instance = instances.get(instanceID);
+                    if (instance != null) {
+                        instance.onEvent(event);
+                        if (instance.isReady()) {
+                            stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+                        } else if (instance.hasTimedout()) {
+                            stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+                        }
                     }
                 }
             }
@@ -307,16 +311,17 @@ public class ProcessExecutor extends EntityExecutor {
 
     private void handleEvent(Event event) throws FalconException {
         ProcessExecutionInstance instance;
+        InstanceID instanceID;
         try {
             switch (event.getType()) {
             // TODO : Handle cases where scheduling fails.
             case JOB_SCHEDULED:
-                instance = instances.get(event.getTarget());
+                instance = instances.get((InstanceID)event.getTarget());
                 instance.onEvent(event);
                 stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
                 break;
             case JOB_COMPLETED:
-                instance = instances.get(event.getTarget());
+                instance = instances.get((InstanceID)event.getTarget());
                 instance.onEvent(event);
                 switch (((JobCompletedEvent) event).getStatus()) {
                 case SUCCEEDED:
@@ -387,11 +392,6 @@ public class ProcessExecutor extends EntityExecutor {
         triggerPredicate = Predicate.createTimePredicate(startTime.getTime(), endTime.getTime(), -1);
     }
 
-    @Override
-    public ID getId() {
-        return id;
-    }
-
     // This executor must handle any events intended for itself.
     // Or, if it is job run or job complete notifications, so it can handle the instance's state transition.
     private boolean shouldHandleEvent(Event event) {
@@ -402,7 +402,7 @@ public class ProcessExecutor extends EntityExecutor {
 
     @Override
     public void onTrigger(ExecutionInstance instance) throws FalconException {
-        instances.put(new ID(instance), (ProcessExecutionInstance) instance);
+        instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
index 73a4199..501c6aa 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -29,6 +29,7 @@ import org.apache.falcon.notification.service.request.JobCompletionNotificationR
 import org.apache.falcon.notification.service.request.NotificationRequest;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowExecutionListener;
@@ -46,7 +47,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.TimeZone;
 
 /**
  * This notification service notifies {@link NotificationHandler} when an external job
@@ -55,7 +55,7 @@ import java.util.TimeZone;
 public class JobCompletionService implements FalconNotificationService, WorkflowExecutionListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class);
-    private static DateTimeZone utc = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+    private static final DateTimeZone UTC = DateTimeZone.UTC;
 
     private List<NotificationHandler> listeners = Collections.synchronizedList(new ArrayList<NotificationHandler>());
 
@@ -152,11 +152,12 @@ public class JobCompletionService implements FalconNotificationService, Workflow
     }
 
     // Constructs the callback ID from the details available in the context.
-    private ID constructCallbackID(WorkflowExecutionContext context) throws FalconException {
-        ID id = new ID(EntityType.valueOf(context.getEntityType()), context.getEntityName());
-        id.setCluster(context.getClusterName());
-        id.setInstanceTime(new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc));
-        return id;
+    private InstanceID constructCallbackID(WorkflowExecutionContext context) throws FalconException {
+        EntityType entityType = EntityType.valueOf(context.getEntityType());
+        String entityName = context.getEntityName();
+        String clusterName = context.getClusterName();
+        DateTime instanceTime = new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), UTC);
+        return new InstanceID(entityType, entityName, clusterName, instanceTime);
     }
 
     private WorkflowExecutionContext createContext(Properties props) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index a70bc3c..ace8444 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -40,7 +40,9 @@ import org.apache.falcon.notification.service.request.JobCompletionNotificationR
 import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
 import org.apache.falcon.notification.service.request.NotificationRequest;
 import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
@@ -81,9 +83,9 @@ public class SchedulerService implements FalconNotificationService, Notification
 
     private static final StateStore STATE_STORE = AbstractStateStore.get();
 
-    private Cache<ID, Object> instancesToIgnore;
+    private Cache<InstanceID, Object> instancesToIgnore;
     // TODO : limit the no. of awaiting instances per entity
-    private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances;
+    private LoadingCache<EntityClusterID, List<ExecutionInstance>> executorAwaitedInstances;
 
     @Override
     public void register(NotificationRequest notifRequest) throws NotificationServiceException {
@@ -102,11 +104,10 @@ public class SchedulerService implements FalconNotificationService, Notification
     @Override
     public void unregister(NotificationHandler handler, ID listenerID) {
         // If ID is that of an entity, do nothing
-        if (listenerID.getInstanceTime() == null) {
-            return;
+        if (listenerID instanceof InstanceID) {
+            // Not efficient to iterate over elements to remove this. Add to ignore list.
+            instancesToIgnore.put((InstanceID) listenerID, new Object());
         }
-        // Not efficient to iterate over elements to remove this. Add to ignore list.
-        instancesToIgnore.put(listenerID, new Object());
 
     }
 
@@ -129,21 +130,21 @@ public class SchedulerService implements FalconNotificationService, Notification
         PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator());
         runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq);
 
-        CacheLoader instanceCacheLoader = new CacheLoader<ID, Collection<ExecutionInstance>>() {
+        CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, Collection<ExecutionInstance>>() {
             @Override
-            public Collection<ExecutionInstance> load(ID id) throws Exception {
+            public Collection<ExecutionInstance> load(EntityClusterID id) throws Exception {
                 List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
                 states.add(InstanceState.STATE.READY);
                 List<ExecutionInstance> readyInstances = new ArrayList<>();
                 // TODO : Limit it to no. of instances that can be run in parallel.
-                for (InstanceState state : STATE_STORE.getExecutionInstances(id.getEntityID(), states)) {
+                for (InstanceState state : STATE_STORE.getExecutionInstances(id, states)) {
                     readyInstances.add(state.getInstance());
                 }
                 return readyInstances;
             }
         };
 
-        awaitedInstances = CacheBuilder.newBuilder()
+        executorAwaitedInstances = CacheBuilder.newBuilder()
                 .maximumSize(100)
                 .concurrencyLevel(1)
                 .removalListener(this)
@@ -182,13 +183,18 @@ public class SchedulerService implements FalconNotificationService, Notification
         // Interested only in job completion events.
         if (event.getType() == EventType.JOB_COMPLETED) {
             try {
+                ID targetID = event.getTarget();
+                List<ExecutionInstance> instances = null;
                 // Check if the instance is awaited.
-                ID id = event.getTarget();
-                List<ExecutionInstance> instances = awaitedInstances.get(id);
-                // Else, check if the entity is awaited.
-                if (instances == null) {
-                    id = id.getEntityID();
-                    instances = awaitedInstances.get(id);
+                if (targetID instanceof EntityClusterID) {
+                    EntityClusterID id = (EntityClusterID) event.getTarget();
+                    instances = executorAwaitedInstances.get(id);
+                    if (instances != null && instances.isEmpty()) {
+                        executorAwaitedInstances.invalidate(id);
+                    }
+                } else if (targetID instanceof InstanceID) {
+                    InstanceID id = (InstanceID) event.getTarget();
+                    instances = executorAwaitedInstances.get(id.getEntityClusterID());
                 }
                 if (instances != null && !instances.isEmpty()) {
                     ExecutionInstance instance = instances.get(0);
@@ -202,19 +208,12 @@ public class SchedulerService implements FalconNotificationService, Notification
                                         handler, instance.getId());
                                 requestBuilder.setInstance(instance);
                                 InstanceRunner runner = new InstanceRunner(requestBuilder.build());
-                                // Since an instance just finished of the same entity just finished
-                                if (id.equals(instance.getId())) {
-                                    runner.incrementAllowedInstances();
-                                }
                                 runQueue.execute(runner);
                                 instances.remove(instance);
                             }
                         }
                     }
                 }
-                if (instances != null && instances.isEmpty()) {
-                    awaitedInstances.invalidate(id);
-                }
             } catch (Exception e) {
                 throw new FalconException(e);
             }
@@ -304,11 +303,11 @@ public class SchedulerService implements FalconNotificationService, Notification
                 if (instanceCheck() && dependencyCheck()) {
                     return true;
                 } else {
-                    ID entityID = instance.getId().getEntityID();
+                    EntityClusterID entityID = instance.getId().getEntityClusterID();
                     // Instance is awaiting scheduling conditions to be met. Add predicate to that effect.
                     instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(),
                             entityID));
-                    updateAwaitedInstances(entityID);
+                    updateExecutorAwaitedInstances(entityID);
                     LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}",
                             instance.getId(), entityID);
                 }
@@ -319,13 +318,13 @@ public class SchedulerService implements FalconNotificationService, Notification
             return false;
         }
 
-        private void updateAwaitedInstances(ID id) throws ExecutionException {
+        private void updateExecutorAwaitedInstances(EntityClusterID id) throws ExecutionException {
             synchronized (id) {
-                List<ExecutionInstance> instances = awaitedInstances.get(id);
+                List<ExecutionInstance> instances = executorAwaitedInstances.get(id);
                 if (instances == null) {
                     // Order is FIFO.
                     instances = new LinkedList<>();
-                    awaitedInstances.put(id, instances);
+                    executorAwaitedInstances.put(id, instances);
                 }
                 instances.add(instance);
             }
@@ -340,7 +339,7 @@ public class SchedulerService implements FalconNotificationService, Notification
                 // Dependants should wait for this instance to complete. Add predicate to that effect.
                 instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(
                         request.getHandler(), execInstance.getId()));
-                updateAwaitedInstances(execInstance.getId());
+                updateExecutorAwaitedInstances(execInstance.getId().getEntityClusterID());
             }
             return false;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java b/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java
new file mode 100644
index 0000000..b25a547
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+
+/**
+ * A unique ID for an entity, cluster pair.
+ * This is required in scenarios like scheduling an entity per cluster.
+ */
+public class EntityClusterID extends ID {
+
+    private final String clusterName;
+
+    public EntityClusterID(Entity entity, String clusterName) {
+        this(entity.getEntityType(), entity.getName(), clusterName);
+    }
+
+    public EntityClusterID(EntityType entityType, String entityName, String clusterName) {
+        this.entityName = entityName;
+        this.entityType = entityType;
+        this.clusterName = clusterName;
+        this.key = this.entityType + KEY_SEPARATOR + this.entityName + KEY_SEPARATOR + clusterName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    @Override
+    public EntityID getEntityID() {
+        return new EntityID(entityType, entityName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/EntityID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityID.java b/scheduler/src/main/java/org/apache/falcon/state/EntityID.java
new file mode 100644
index 0000000..cf37986
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityID.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.datanucleus.util.StringUtils;
+
+/**
+ * A unique id for an entity(wrapped).
+ */
+public class EntityID extends ID {
+
+    public EntityID(EntityType entityType, String entityName) {
+        assert entityType != null : "Entity type must be present.";
+        assert !StringUtils.isEmpty(entityName) : "Entity name must be present.";
+        this.entityName = entityName;
+        this.entityType = entityType;
+        this.key = this.entityType + KEY_SEPARATOR + this.entityName;
+    }
+
+    public EntityID(Entity entity) {
+        this(entity.getEntityType(), entity.getName());
+    }
+
+    @Override
+    public String toString() {
+        return key;
+    }
+
+    @Override
+    public EntityID getEntityID() {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/ID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/ID.java b/scheduler/src/main/java/org/apache/falcon/state/ID.java
index 420c856..e93dbd3 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/ID.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/ID.java
@@ -17,172 +17,30 @@
  */
 package org.apache.falcon.state;
 
-import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.execution.ExecutionInstance;
-import org.datanucleus.util.StringUtils;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 
 /**
  * A serializable, comparable ID used to uniquely represent an entity or an instance.
  */
-public final class ID implements Serializable, Comparable<ID> {
+public abstract class ID implements Serializable, Comparable<ID> {
     public static final String KEY_SEPARATOR = "/";
-    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
-
-    private String entityName;
-    private EntityType entityType;
-    private String entityKey;
-    private String cluster;
-    private DateTime instanceTime;
-
-    /**
-     * Default Constructor.
-     */
-    public ID(){}
-
-    /**
-     * Constructor.
-     *
-     * @param type
-     * @param name
-     */
-    public ID(EntityType type, String name) {
-        assert type != null : "Entity type must be present.";
-        assert !StringUtils.isEmpty(name) : "Entity name must be present.";
-        this.entityName = name;
-        this.entityType = type;
-        this.entityKey = entityType + KEY_SEPARATOR + entityName;
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param entity
-     */
-    public ID(Entity entity) {
-        this(entity.getEntityType(), entity.getName());
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param entity
-     * @param cluster
-     */
-    public ID(Entity entity, String cluster) {
-        this(entity.getEntityType(), entity.getName());
-        assert !StringUtils.isEmpty(cluster) : "Cluster cannot be empty.";
-        this.cluster = cluster;
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param instance
-     */
-    public ID(ExecutionInstance instance) {
-        this(instance.getEntity(), instance.getCluster());
-        assert instance.getInstanceTime() != null : "Nominal time cannot be null.";
-        this.instanceTime = instance.getInstanceTime();
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param entity
-     * @param cluster
-     * @param instanceTime
-     */
-    public ID(Entity entity, String cluster, DateTime instanceTime) {
-        this(entity, cluster);
-        assert instanceTime != null : "Nominal time cannot be null.";
-        this.instanceTime = instanceTime;
-    }
-
-    /**
-     * @return cluster name
-     */
-    public String getCluster() {
-        return cluster;
-    }
-
-    /**
-     * @param cluster name
-     */
-    public void setCluster(String cluster) {
-        this.cluster = cluster;
-    }
-
-    /**
-     * @return nominal time
-     */
-    public DateTime getInstanceTime() {
-        return instanceTime;
-    }
-
-    /**
-     * @param instanceTime
-     */
-    public void setInstanceTime(DateTime instanceTime) {
-        this.instanceTime = instanceTime;
-    }
-
-    /**
-     * @return entity name
-     */
-    public String getEntityName() {
-        return entityName;
-    }
-
-    /**
-     * @return entity type
-     */
-    public EntityType getEntityType() {
-        return entityType;
-    }
-
-    @Override
-    public String toString() {
-        String val = entityKey;
-        if (!StringUtils.isEmpty(cluster)) {
-            val = val + KEY_SEPARATOR + cluster;
-        }
-
-        if (instanceTime != null) {
-            DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
-            val = val + KEY_SEPARATOR + fmt.print(instanceTime);
-        }
-        return val;
-    }
-
-    /**
-     * @return An ID without the cluster name
-     */
-    public String getEntityKey() {
-        return entityKey;
-    }
-
-    /**
-     * @return ID without the instance information
-     */
-    public ID getEntityID() {
-        ID newID = new ID(this.entityType, this.entityName);
-        newID.setCluster(this.cluster);
-        newID.setInstanceTime(null);
-        return newID;
-    }
+    protected String entityName;
+    protected EntityType entityType;
+    protected String key;
 
     @Override
     public boolean equals(Object id) {
         if (id == null || id.getClass() != getClass()) {
             return false;
         }
-        return compareTo((ID)id) == 0;
+        return compareTo((ID) id) == 0;
+    }
+
+    @Override
+    public String toString() {
+        return key;
     }
 
     @Override
@@ -197,4 +55,19 @@ public final class ID implements Serializable, Comparable<ID> {
         }
         return this.toString().compareTo(id.toString());
     }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public EntityType getEntityType() {
+        return entityType;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public abstract EntityID getEntityID();
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
new file mode 100644
index 0000000..a722be9
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * A unique ID for a given(wrapped) instance.
+ * An instance is the execution unit of an entity and can be uniquely defined by
+ * (entity, cluster, instanceTime).
+ */
+public class InstanceID extends ID {
+    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+
+    /**
+     * Name of the cluster for the instance.
+     */
+    private String clusterName;
+
+    /**
+     *
+     */
+    private DateTime instanceTime;
+
+    public InstanceID(EntityType entityType, String entityName, String clusterName, DateTime instanceTime) {
+        this.entityType = entityType;
+        this.entityName = entityName;
+        this.clusterName = clusterName;
+        this.instanceTime = new DateTime(instanceTime);
+        DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+        this.key = this.entityType + KEY_SEPARATOR + this.entityName + KEY_SEPARATOR + this.clusterName
+                + KEY_SEPARATOR + fmt.print(instanceTime);
+    }
+
+    public InstanceID(Entity entity, String clusterName, DateTime instanceTime) {
+        this(entity.getEntityType(), entity.getName(), clusterName, instanceTime);
+    }
+
+
+    public InstanceID(ExecutionInstance instance) {
+        this(instance.getEntity(), instance.getCluster(), instance.getInstanceTime());
+        assert instance.getInstanceTime() != null : "Instance time cannot be null.";
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    @Override
+    public EntityID getEntityID() {
+        return new EntityID(entityType, entityName);
+    }
+
+
+    public EntityClusterID getEntityClusterID() {
+        return new EntityClusterID(entityType, entityName, clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index 8cf24ee..ada9d2b 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -247,4 +247,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
         states.add(STATE.SUCCEEDED);
         return states;
     }
+
+    @Override
+    public String toString() {
+        return instance.getId().toString() + "STATE: " + currentState.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index 81357a4..c1671ac 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -64,7 +64,7 @@ public final class StateService {
      */
     public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
         throws FalconException {
-        ID id = new ID(entity);
+        EntityID id = new EntityID(entity);
         if (!stateStore.entityExists(id)) {
             // New entity
             if (event == EntityState.EVENT.SUBMIT) {
@@ -122,7 +122,7 @@ public final class StateService {
      */
     public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT event,
                                   InstanceStateChangeHandler handler) throws FalconException {
-        ID id = new ID(instance);
+        InstanceID id = new InstanceID(instance);
         if (!stateStore.executionInstanceExists(id)) {
             // New instance
             if (event == InstanceState.EVENT.TRIGGER) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index ba3d5fd..e36f85c 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -21,8 +21,8 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
-import org.apache.falcon.state.ID;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.slf4j.Logger;
@@ -46,14 +46,14 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
     public void onRemove(Entity entity) throws FalconException {
         // Delete entity should remove its instances too.
         if (entity.getEntityType() != EntityType.CLUSTER) {
-            deleteEntity(new ID(entity));
+            deleteEntity(new EntityID(entity));
         }
     }
 
     @Override
     public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
         if (newEntity.getEntityType() != EntityType.CLUSTER) {
-            EntityState entityState = getEntity(new ID(oldEntity));
+            EntityState entityState = getEntity(new EntityID(oldEntity));
             if (entityState == null) {
                 onAdd(newEntity);
             } else {
@@ -67,7 +67,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
     public void onReload(Entity entity) throws FalconException {
         if (entity.getEntityType() != EntityType.CLUSTER) {
             // To ensure the config store and state store are in sync
-            if (!entityExists(new ID(entity))) {
+            if (!entityExists(new EntityID(entity))) {
                 LOG.info("State store missing entity {}. Adding it.", entity.getName());
                 onAdd(entity);
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index 4aa6fdb..113f4c5 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -19,8 +19,8 @@ package org.apache.falcon.state.store;
 
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
-import org.apache.falcon.state.ID;
 
 import java.util.Collection;
 
@@ -39,13 +39,13 @@ public interface EntityStateStore {
      * @return Entity corresponding to the key
      * @throws StateStoreException - If entity does not exist.
      */
-    EntityState getEntity(ID entityId) throws StateStoreException;
+    EntityState getEntity(EntityID entityId) throws StateStoreException;
 
     /**
      * @param entityId
      * @return true, if entity exists in store.
      */
-    boolean entityExists(ID entityId);
+    boolean entityExists(EntityID entityId);
 
     /**
      * @param state
@@ -72,5 +72,5 @@ public interface EntityStateStore {
      * @param entityId
      * @throws StateStoreException
      */
-    void deleteEntity(ID entityId) throws StateStoreException;
+    void deleteEntity(EntityID entityId) throws StateStoreException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index 64c5a59..52b3bb8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -21,8 +21,10 @@ import com.google.common.collect.Lists;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.joda.time.DateTime;
 
@@ -57,7 +59,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     @Override
     public void putEntity(EntityState entityState) throws StateStoreException {
-        String key = new ID(entityState.getEntity()).getEntityKey();
+        String key = new EntityID(entityState.getEntity()).getKey();
         if (entityStates.containsKey(key)) {
             throw new StateStoreException("Entity with key, " + key + " already exists.");
         }
@@ -65,16 +67,16 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
-    public EntityState getEntity(ID entityId) throws StateStoreException {
-        if (!entityStates.containsKey(entityId.getEntityKey())) {
+    public EntityState getEntity(EntityID entityId) throws StateStoreException {
+        if (!entityStates.containsKey(entityId.getKey())) {
             throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
         }
-        return entityStates.get(entityId.getEntityKey());
+        return entityStates.get(entityId.getKey());
     }
 
     @Override
-    public boolean entityExists(ID entityId) {
-        return entityStates.containsKey(entityId.getEntityKey());
+    public boolean entityExists(EntityID entityId) {
+        return entityStates.containsKey(entityId.getKey());
     }
 
     @Override
@@ -95,7 +97,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     @Override
     public void updateEntity(EntityState entityState) throws StateStoreException {
-        String key = new ID(entityState.getEntity()).getEntityKey();
+        String key = new EntityID(entityState.getEntity()).getKey();
         if (!entityStates.containsKey(key)) {
             throw new StateStoreException("Entity with key, " + key + " does not exist.");
         }
@@ -103,17 +105,17 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
-    public void deleteEntity(ID entityId) throws StateStoreException {
-        if (!entityStates.containsKey(entityId.getEntityKey())) {
+    public void deleteEntity(EntityID entityId) throws StateStoreException {
+        if (!entityStates.containsKey(entityId.getKey())) {
             throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
         }
         deleteExecutionInstances(entityId);
-        entityStates.remove(entityId.getEntityKey());
+        entityStates.remove(entityId.getKey());
     }
 
     @Override
     public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
-        String key = new ID(instanceState.getInstance()).toString();
+        String key = new InstanceID(instanceState.getInstance()).getKey();
         if (instanceStates.containsKey(key)) {
             throw new StateStoreException("Instance with key, " + key + " already exists.");
         }
@@ -121,8 +123,8 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
-    public InstanceState getExecutionInstance(ID instanceId) throws StateStoreException {
-        if (!instanceStates.containsKey(instanceId.toString())) {
+    public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException {
+        if (!instanceStates.containsKey(instanceId.getKey())) {
             throw new StateStoreException("Instance with key, " + instanceId + " does not exist.");
         }
         return instanceStates.get(instanceId.toString());
@@ -130,7 +132,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     @Override
     public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
-        String key = new ID(instanceState.getInstance()).toString();
+        String key = new InstanceID(instanceState.getInstance()).getKey();
         if (!instanceStates.containsKey(key)) {
             throw new StateStoreException("Instance with key, " + key + " does not exist.");
         }
@@ -140,11 +142,11 @@ public final class InMemoryStateStore extends AbstractStateStore {
     @Override
     public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
         throws StateStoreException {
-        ID id = new ID(entity, cluster);
-        if (!entityStates.containsKey(id.getEntityKey())) {
-            throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+        EntityClusterID id = new EntityClusterID(entity, cluster);
+        if (!entityStates.containsKey(id.getEntityID().getKey())) {
+            throw new StateStoreException("Entity with key, " + id.getEntityID().getKey() + " does not exist.");
         }
-        Collection<InstanceState> instances = new ArrayList<InstanceState>();
+        Collection<InstanceState> instances = new ArrayList<>();
         for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
             if (instanceState.getKey().startsWith(id.toString())) {
                 instances.add(instanceState.getValue());
@@ -156,7 +158,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
     @Override
     public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
             Collection<InstanceState.STATE> states) throws StateStoreException {
-        ID id = new ID(entity, cluster);
+        EntityClusterID id = new EntityClusterID(entity, cluster);
         return getExecutionInstances(id, states);
     }
 
@@ -164,7 +166,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
     public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
             Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException {
         List<InstanceState> instancesToReturn = new ArrayList<>();
-        ID id = new ID(entity, cluster);
+        EntityClusterID id = new EntityClusterID(entity, cluster);
         for (InstanceState state : getExecutionInstances(id, states)) {
             ExecutionInstance instance = state.getInstance();
             DateTime instanceTime = instance.getInstanceTime();
@@ -179,9 +181,9 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
-    public Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
-        throws StateStoreException {
-        Collection<InstanceState> instances = new ArrayList<InstanceState>();
+    public Collection<InstanceState> getExecutionInstances(EntityClusterID entityId,
+                                       Collection<InstanceState.STATE> states) throws StateStoreException {
+        Collection<InstanceState> instances = new ArrayList<>();
         for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
             if (instanceState.getKey().startsWith(entityId.toString())
                     && states.contains(instanceState.getValue().getCurrentState())) {
@@ -193,9 +195,9 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     @Override
     public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
-        ID id = new ID(entity, cluster);
-        if (!entityStates.containsKey(id.getEntityKey())) {
-            throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+        EntityClusterID id = new EntityClusterID(entity, cluster);
+        if (!entityStates.containsKey(id.getEntityID().getKey())) {
+            throw new StateStoreException("Entity with key, " + id.getEntityID().getKey() + " does not exist.");
         }
         InstanceState latestState = null;
         // TODO : Very crude. Iterating over all entries and getting the last one.
@@ -208,14 +210,14 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
-    public boolean executionInstanceExists(ID instanceId) {
+    public boolean executionInstanceExists(InstanceID instanceId) {
         return instanceStates.containsKey(instanceId.toString());
     }
 
     @Override
-    public void deleteExecutionInstances(ID entityId) {
+    public void deleteExecutionInstances(EntityID entityId) {
         for (String instanceKey : Lists.newArrayList(instanceStates.keySet())) {
-            if (instanceKey.startsWith(entityId.getEntityKey())) {
+            if (instanceKey.startsWith(entityId.getKey())) {
                 instanceStates.remove(instanceKey);
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index d6a4b49..483d9e6 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -19,7 +19,9 @@ package org.apache.falcon.state.store;
 
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.joda.time.DateTime;
 
@@ -43,7 +45,7 @@ public interface InstanceStateStore {
      * @return Execution instance corresponding to the name.
      * @throws StateStoreException - When instance does not exist
      */
-    InstanceState getExecutionInstance(ID instanceId) throws StateStoreException;
+    InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException;
 
     /**
      * Updates an execution instance in the store.
@@ -83,13 +85,13 @@ public interface InstanceStateStore {
                                                     DateTime start, DateTime end) throws StateStoreException;
 
     /**
-     * @param entityId
+     * @param entityClusterID
      * @param states
      * @return - All execution instance for an given entityKey (that includes the cluster name)
      * @throws StateStoreException
      */
-    Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
-        throws StateStoreException;
+    Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID,
+                                                    Collection<InstanceState.STATE> states) throws StateStoreException;
     /**
      * @param entity
      * @param cluster
@@ -102,12 +104,12 @@ public interface InstanceStateStore {
      * @param instanceId
      * @return true, if instance exists.
      */
-    boolean executionInstanceExists(ID instanceId);
+    boolean executionInstanceExists(InstanceID instanceId);
 
     /**
      * Delete instances of a given entity.
      *
      * @param entityId
      */
-    void deleteExecutionInstances(ID entityId);
+    void deleteExecutionInstances(EntityID entityId);
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 3a5024a..d7d157f 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -32,8 +32,8 @@ import org.apache.falcon.execution.FalconExecutionService;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
-import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
@@ -86,12 +86,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public boolean isActive(Entity entity) throws FalconException {
-        return STATE_STORE.getEntity(new ID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED;
+        return STATE_STORE.getEntity(new EntityID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED;
     }
 
     @Override
     public boolean isSuspended(Entity entity) throws FalconException {
-        return STATE_STORE.getEntity(new ID(entity))
+        return STATE_STORE.getEntity(new EntityID(entity))
                 .getCurrentState().equals(EntityState.STATE.SUSPENDED);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index b2f9e59..bff92c9 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -36,8 +36,11 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
 import org.apache.falcon.service.Services;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.InMemoryStateStore;
@@ -135,7 +138,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         storeEntity(EntityType.PROCESS, "summarize1");
         Process process = getStore().get(EntityType.PROCESS, "summarize1");
         Assert.assertNotNull(process);
-        ID processKey = new ID(process);
+        EntityID processKey = new EntityID(process);
         String clusterName = dfsCluster.getCluster().getName();
 
         // Schedule a process
@@ -183,10 +186,12 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize2");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
+        EntityClusterID executorID = new EntityClusterID(process, clusterName);
 
         // Schedule a process
-        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(),
+                EntityState.STATE.SUBMITTED);
         FalconExecutionService.get().schedule(process);
 
         // Simulate two time notifications
@@ -218,7 +223,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
                 instance1.getInstance().getId());
         Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
                 instance2.getInstance().getId());
-        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
 
         FalconExecutionService.get().resume(process);
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
@@ -260,8 +265,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize4");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
-
+        EntityID processID = new EntityID(process);
+        EntityClusterID executorID = new EntityClusterID(process, clusterName);
         // Schedule a process
         Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
         FalconExecutionService.get().schedule(process);
@@ -296,7 +301,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().delete(process);
 
         // Deregister from notification services
-        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
     }
 
     @Test
@@ -305,7 +310,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize3");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
 
         // Schedule a process
         Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
@@ -334,7 +339,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize6");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
 
         // Schedule a process
         Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
@@ -356,7 +361,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize5");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
 
         // Schedule a process
         Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
@@ -434,7 +439,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, "summarize7");
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
 
         // Store couple of instances in store
         stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
@@ -468,7 +473,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         Process process = getStore().get(EntityType.PROCESS, name);
         Assert.assertNotNull(process);
         String clusterName = dfsCluster.getCluster().getName();
-        ID processID = new ID(process, clusterName);
+        EntityID processID = new EntityID(process);
 
         // Schedule the process
         FalconExecutionService.get().schedule(process);
@@ -517,7 +522,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
     }
 
     private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) {
-        ID id = new ID(process, cluster);
+        EntityClusterID id = new EntityClusterID(process, cluster);
         switch (type) {
         case TIME:
             Date start = process.getClusters().getClusters().get(0).getValidity().getStart();
@@ -536,7 +541,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
     }
 
     private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) {
-        ID id = new ID(instance);
+        ID id = new InstanceID(instance);
         switch (type) {
         case DATA:
             DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
index 36f1fd1..34965f2 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.execution.NotificationHandler;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.impl.AlarmService;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.EntityID;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.mockito.Mockito;
@@ -57,8 +57,8 @@ public class AlarmServiceTest {
 
         Process mockProcess = new Process();
         mockProcess.setName("test");
-        ID id = new ID(mockProcess);
-        id.setCluster("testCluster");
+        EntityID id = new EntityID(mockProcess);
+//        id.setCluster("testCluster");
 
         AlarmService.AlarmRequestBuilder request =
                 new AlarmService.AlarmRequestBuilder(handler, id);

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index b4a0f35..001f466 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -35,7 +35,9 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
 import org.apache.falcon.service.Services;
+import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.InMemoryStateStore;
@@ -147,16 +149,16 @@ public class SchedulerServiceTest extends AbstractTestBase {
         Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
         // Simulate the completion of previous instance.
         stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED);
-        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+        scheduler.onEvent(new JobCompletedEvent(new EntityClusterID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
                 DateTime.now()));
         // When an instance completes instance2 should get scheduled next iteration
-        Thread.sleep(100);
+        Thread.sleep(300);
         Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
         // Simulate another completion and ensure instance3 runs.
         stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
-        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+        scheduler.onEvent(new JobCompletedEvent(new EntityClusterID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
                 DateTime.now()));
-        Thread.sleep(100);
+        Thread.sleep(300);
         Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
     }
 
@@ -193,7 +195,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
         Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
         Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class));
         stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
-        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()),
+        scheduler.onEvent(new JobCompletedEvent(new InstanceID(mockProcess, cluster, instance2.getInstanceTime()),
                 WorkflowJob.Status.SUCCEEDED, DateTime.now()));
         // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
         Thread.sleep(100);
@@ -300,8 +302,9 @@ public class SchedulerServiceTest extends AbstractTestBase {
             JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
             Process p = (Process) process.copy();
             p.setName(scheduledEvent.getTarget().getEntityName());
-            ProcessExecutionInstance instance = new ProcessExecutionInstance(p,
-                    scheduledEvent.getTarget().getInstanceTime(), cluster);
+            InstanceID instanceID = (InstanceID) scheduledEvent.getTarget();
+            DateTime instanceTime = new DateTime(instanceID.getInstanceTime());
+            ProcessExecutionInstance instance = new ProcessExecutionInstance(p, instanceTime, cluster);
             InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING);
             if (!stateStore.executionInstanceExists(instance.getId())) {
                 stateStore.putExecutionInstance(state);

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
index 95dd5ae..073f73e 100644
--- a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
@@ -18,12 +18,12 @@
 package org.apache.falcon.predicate;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
-import org.apache.falcon.state.ID;
+import org.apache.falcon.state.EntityID;
 import org.joda.time.DateTime;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import org.apache.falcon.entity.v0.process.Process;
 
 /**
  * Tests the predicate class.
@@ -35,7 +35,7 @@ public class PredicateTest {
         Process process = new Process();
         process.setName("test");
         DateTime now = DateTime.now();
-        TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now);
+        TimeElapsedEvent te = new TimeElapsedEvent(new EntityID(process), now, now, now);
         Predicate.getPredicate(te);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
index d27ac7e..43c3c54 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -62,7 +62,7 @@ public class InstanceStateServiceTest {
     public void testLifeCycle() throws FalconException {
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener);
         InstanceState instanceFromStore = AbstractStateStore.get()
-                .getExecutionInstance(new ID(mockInstance));
+                .getExecutionInstance(new InstanceID(mockInstance));
         Mockito.verify(listener).onTrigger(mockInstance);
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);


Mime
View raw message