falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [4/6] falcon git commit: FALCON-1213 Base framework of the native scheduler
Date Tue, 20 Oct 2015 12:10:01 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
new file mode 100644
index 0000000..4883fe7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * indicating availability or non-availability of a dataset.
+ */
+public class DataEvent implements Event {
+    private final ID callbackID;
+    private Path dataLocation;
+    private LocationType dataType;
+    private STATUS status;
+
+    /**
+     * Enumerates the status of data.
+     */
+    public enum STATUS {
+        AVAILABLE,
+        UNAVAILABLE
+    }
+
+    public DataEvent(ID callbackID, Path location, LocationType locType, STATUS availability) {
+        this.callbackID = callbackID;
+        this.dataLocation = location;
+        this.dataType = locType;
+        this.status = availability;
+    }
+
+    public STATUS getStatus() {
+        return status;
+    }
+
+    public void setStatus(STATUS availability) {
+        this.status = availability;
+    }
+
+    public Path getDataLocation() {
+        return dataLocation;
+    }
+
+    public LocationType getDataType() {
+        return dataType;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.DATA;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
new file mode 100644
index 0000000..140973b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An events that are generated by notification services must implement this interface.
+ */
+public interface Event {
+
+    /**
+     * @return The service that generated this event
+     */
+    NotificationServicesRegistry.SERVICE getSource();
+
+    /**
+     * @return ID of the notification handler for which this event was meant for.
+     */
+    ID getTarget();
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
new file mode 100644
index 0000000..c587343
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService}
+ * indicating completion of a Job.
+ */
+public class JobCompletedEvent implements Event {
+
+    private WorkflowJob.Status status;
+    private final ID callbackID;
+    private DateTime endTime;
+
+    public JobCompletedEvent(ID callbackID, WorkflowJob.Status jobStatus, DateTime end) {
+        this.callbackID = callbackID;
+        this.status = jobStatus;
+        this.endTime = end;
+    }
+
+    public WorkflowJob.Status getStatus() {
+        return status;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+
+    public DateTime getEndTime() {
+        return endTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
new file mode 100644
index 0000000..55023e7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService}
+ * indicating if an instance was scheduled for execution.
+ */
+public class JobScheduledEvent implements Event {
+    private final ID callbackID;
+    private String externalID;
+    private STATUS status;
+    private DateTime startTime;
+
+    public JobScheduledEvent(ID callbackID, STATUS status) {
+        this.callbackID = callbackID;
+        this.status = status;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+
+    /**
+     * @return - The status of the scheduled DAG/Job.
+     */
+    public STATUS getStatus() {
+        return status;
+    }
+
+
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(DateTime startTime) {
+        this.startTime = startTime;
+    }
+
+    /**
+     * Enumeration of possible statuses of a DAG/Job.
+     */
+    public enum STATUS {
+        FAILED,
+        SUCCESSFUL
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
new file mode 100644
index 0000000..7ec4de6
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService}
+ * indicating that a given time duration has elapsed.
+ */
+public class TimeElapsedEvent implements Event {
+    private DateTime startTime;
+    private DateTime endTime;
+    private DateTime instanceTime;
+    private final ID callbackID;
+
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    public DateTime getEndTime() {
+        return endTime;
+    }
+
+    public TimeElapsedEvent(ID callbackID, DateTime start, DateTime end, DateTime instTime) {
+        this.callbackID = callbackID;
+        this.startTime = start;
+        this.endTime = end;
+        this.instanceTime = instTime;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.TIME;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
new file mode 100644
index 0000000..cccdeac
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.SchedulerUtil;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.notification.service.request.AlarmRequest;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.quartz.CalendarIntervalTrigger;
+import org.quartz.DateBuilder;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.quartz.CalendarIntervalScheduleBuilder.calendarIntervalSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when requested time
+ * event has occurred. The class users to subscribe to frequency based, cron based or some calendar based time events.
+ */
+public class AlarmService implements FalconNotificationService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlarmService.class);
+
+    private Map<ID, TriggerKey> notifications = new HashMap<ID, TriggerKey>();
+    private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
+    private Scheduler scheduler;
+
+    @Override
+    public void init() throws FalconException {
+        try {
+            scheduler = StdSchedulerFactory.getDefaultScheduler();
+            scheduler.start();
+        } catch (SchedulerException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public void register(NotificationRequest notificationRequest) throws NotificationServiceException {
+        LOG.info("Registering alarm notification for " + notificationRequest.getCallbackId());
+        AlarmRequest request = (AlarmRequest) notificationRequest;
+        DateTime currentTime = DateTime.now();
+        DateTime nextStartTime = request.getStartTime();
+        DateTime endTime;
+        if (request.getEndTime().isBefore(currentTime)) {
+            endTime = request.getEndTime();
+        } else {
+            endTime = currentTime;
+        }
+        // Handle past events.
+        // TODO : Quartz doesn't seem to support running jobs for past events.
+        // TODO : Remove the handling of past events when that support is added.
+        if (request.getStartTime().isBefore(currentTime)) {
+
+            List<Date> instanceTimes = EntityUtil.getInstanceTimes(request.getStartTime().toDate(),
+                    request.getFrequency(), request.getTimeZone(), request.getStartTime().toDate(),
+                    endTime.toDate());
+            if (instanceTimes != null && !instanceTimes.isEmpty()) {
+                Date lastInstanceTime = instanceTimes.get(instanceTimes.size() - 1);
+                nextStartTime = new DateTime(lastInstanceTime.getTime()
+                        + SchedulerUtil.getFrequencyInMillis(new DateTime(lastInstanceTime), request.getFrequency()));
+                // Introduce some delay to allow for rest of the registration to complete.
+                LOG.debug("Triggering events for past from {} till {}", instanceTimes.get(0), lastInstanceTime);
+                executor.schedule(new CatchupJob(request, instanceTimes), 1, TimeUnit.SECONDS);
+            }
+        }
+        // All past events have been scheduled. Nothing to schedule in the future.
+        if (request.getEndTime().isBefore(currentTime)) {
+            return;
+        }
+        LOG.debug("Scheduling to trigger events from {} to {} with frequency {}", nextStartTime, request.getEndTime(),
+                request.getFrequency());
+        // Schedule future events using Quartz
+        CalendarIntervalTrigger trigger = newTrigger()
+                .withIdentity(notificationRequest.getCallbackId().toString(), "Falcon")
+                .startAt(nextStartTime.toDate())
+                .endAt(request.getEndTime().toDate())
+                .withSchedule(
+                        calendarIntervalSchedule()
+                                .withInterval(request.getFrequency().getFrequencyAsInt(),
+                                       getTimeUnit(request.getFrequency().getTimeUnit()))
+                                .withMisfireHandlingInstructionFireAndProceed())
+                .build();
+
+        // define the job and tie it to our Job class
+        JobDetail job = newJob(FalconProcessJob.class)
+                .withIdentity(getJobKey(notificationRequest.getCallbackId().toString()))
+                .setJobData(getJobDataMap((AlarmRequest) notificationRequest))
+                .build();
+        notifications.put(notificationRequest.getCallbackId(), trigger.getKey());
+        // Tell quartz to run the job using our trigger
+        try {
+            scheduler.scheduleJob(job, trigger);
+        } catch (SchedulerException e) {
+            LOG.error("Error scheduling entity {}", trigger.getKey());
+            throw new NotificationServiceException(e);
+        }
+    }
+
+    // Maps the timeunit in entity specification to the one in Quartz DateBuilder
+    private DateBuilder.IntervalUnit getTimeUnit(Frequency.TimeUnit timeUnit) {
+        switch (timeUnit) {
+        case minutes:
+            return DateBuilder.IntervalUnit.MINUTE;
+        case hours:
+            return DateBuilder.IntervalUnit.HOUR;
+        case days:
+            return DateBuilder.IntervalUnit.DAY;
+        case months:
+            return DateBuilder.IntervalUnit.MONTH;
+        default:
+            throw new IllegalArgumentException("Invalid time unit " + timeUnit.name());
+        }
+    }
+
+    private JobKey getJobKey(String key) {
+        return new JobKey(key, "Falcon");
+    }
+
+    private JobDataMap getJobDataMap(AlarmRequest request) {
+        JobDataMap jobProps = new JobDataMap();
+        jobProps.put("request", request);
+
+        return jobProps;
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException {
+        try {
+            LOG.info("Removing time notification for handler {} with callbackID {}", handler, listenerID);
+            scheduler.unscheduleJob(notifications.get(listenerID));
+            notifications.remove(listenerID);
+        } catch (SchedulerException e) {
+            throw new NotificationServiceException("Unable to deregister " + listenerID, e);
+        }
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+        return new AlarmRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "AlarmService";
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        try {
+            scheduler.shutdown();
+        } catch (SchedulerException e) {
+            LOG.warn("Quartz Scheduler shutdown failed.", e);
+        }
+
+    }
+
+    // Generates a time elapsed event and invokes onEvent on the handler.
+    private static void notifyHandler(AlarmRequest request, DateTime instanceTime) throws NotificationServiceException {
+        TimeElapsedEvent event = new TimeElapsedEvent(request.getCallbackId(), request.getStartTime(),
+                request.getEndTime(), instanceTime);
+        try {
+            LOG.info("Sending notification to {} with nominal time {} ", request.getCallbackId(),
+                    event.getInstanceTime());
+            request.getHandler().onEvent(event);
+        } catch (FalconException e) {
+            LOG.error("Unable to onEvent " + request.getCallbackId() + " for nominal time, " + instanceTime, e);
+            throw new NotificationServiceException(e);
+        }
+    }
+
+    /**
+     * The Job that runs when a time trigger happens.
+     */
+    public static class FalconProcessJob implements Job {
+        public FalconProcessJob() {
+        }
+
+        @Override
+        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+            LOG.debug("Quartz job called at : {}, Next fire time: {}", jobExecutionContext.getFireTime(),
+                    jobExecutionContext.getNextFireTime());
+
+            AlarmRequest request = (AlarmRequest) jobExecutionContext.getJobDetail()
+                    .getJobDataMap().get("request");
+            DateTime instanceTime = new DateTime(jobExecutionContext.getScheduledFireTime(),
+                    DateTimeZone.forTimeZone(request.getTimeZone()));
+
+            try {
+                notifyHandler(request, instanceTime);
+            } catch (NotificationServiceException e) {
+                throw new JobExecutionException(e);
+            }
+        }
+    }
+
+    // Quartz doesn't seem to be able to schedule past events. This job specifically handles that.
+    private static class CatchupJob implements Runnable {
+
+        private final AlarmRequest request;
+        private final List<Date> instanceTimes;
+
+        public CatchupJob(AlarmRequest request, List<Date> triggerTimes) {
+            this.request = request;
+            this.instanceTimes = triggerTimes;
+        }
+
+        @Override
+        public void run() {
+            if (instanceTimes == null) {
+                return;
+            }
+            // Immediate notification for all past events.
+            for(Date instanceTime : instanceTimes) {
+                DateTime nominalDateTime = new DateTime(instanceTime, DateTimeZone.forTimeZone(request.getTimeZone()));
+                try {
+                    notifyHandler(request, nominalDateTime);
+                } catch (NotificationServiceException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Builder that builds {@link AlarmRequest}.
+     */
+    public static class AlarmRequestBuilder extends RequestBuilder<AlarmRequest> {
+
+        private DateTime startTime;
+        private DateTime endTime;
+        private Frequency frequency;
+        private TimeZone timeZone;
+
+        public AlarmRequestBuilder(NotificationHandler handler, ID callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param start of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setStartTime(DateTime start) {
+            this.startTime = start;
+            return this;
+        }
+
+        /**
+         * @param end of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setEndTime(DateTime end) {
+            this.endTime = end;
+            return this;
+        }
+
+        /**
+         * @param freq of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setFrequency(Frequency freq) {
+            this.frequency = freq;
+            return this;
+        }
+
+        /**
+         * @param timeZone
+         */
+        public void setTimeZone(TimeZone timeZone) {
+            this.timeZone = timeZone;
+        }
+
+        @Override
+        public AlarmRequest build() {
+            if (callbackId == null || startTime == null || endTime == null || frequency == null) {
+                throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+                        + " callbackId, startTime, endTime, frequency");
+            }
+            return new AlarmRequest(handler, callbackId, startTime, endTime, frequency, timeZone);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
new file mode 100644
index 0000000..7ffb351
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.request.DataNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when requested data
+ * becomes available. This class also supports time out, in which case it notifies about the unavailability.
+ * TODO : Complete/Modify this skeletal class
+ */
+public class DataAvailabilityService implements FalconNotificationService {
+
+    @Override
+    public void register(NotificationRequest request) throws NotificationServiceException {
+        // TODO : Implement this
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        // TODO : Implement this
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+        return new DataRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "DataAvailabilityService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        // TODO : Implement this
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+    /**
+     * Builds {@link DataNotificationRequest}.
+     */
+    public static class DataRequestBuilder extends RequestBuilder<DataNotificationRequest> {
+        private Path dataLocation;
+
+        public DataRequestBuilder(NotificationHandler handler, ID callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param location
+         * @return This instance
+         */
+        public DataRequestBuilder setDataLocation(Path location) {
+            this.dataLocation = location;
+            return this;
+        }
+
+        @Override
+        public DataNotificationRequest build() {
+            if (callbackId == null  || dataLocation == null) {
+                throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+                        + " callbackId, dataLocation");
+            }
+            return new DataNotificationRequest(handler, callbackId, dataLocation);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
new file mode 100644
index 0000000..73a4199
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an external job
+ * completes.
+ */
+public class JobCompletionService implements FalconNotificationService, WorkflowExecutionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class);
+    private static DateTimeZone utc = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+
+    private List<NotificationHandler> listeners = Collections.synchronizedList(new ArrayList<NotificationHandler>());
+
+    @Override
+    public void register(NotificationRequest notifRequest) throws NotificationServiceException {
+        if (notifRequest == null) {
+            throw new NotificationServiceException("Request object cannot be null");
+        }
+        listeners.add(notifRequest.getHandler());
+        JobCompletionNotificationRequest request = (JobCompletionNotificationRequest) notifRequest;
+        // Check if the job is already complete.
+        // If yes, send a notification synchronously.
+        // If not, we expect that this class will get notified when the job completes
+        // as this class is a listener to WorkflowJobEndNotificationService.
+        if (request.getExternalId() != null && request.getCluster() != null) {
+            try {
+                Properties props = DAGEngineFactory.getDAGEngine(request.getCluster())
+                        .getConfiguration(request.getExternalId());
+                WorkflowExecutionContext context = createContext(props);
+                if (context.hasWorkflowFailed()) {
+                    onFailure(context);
+                } else if (context.hasWorkflowSucceeded()) {
+                    onSuccess(context);
+                }
+            } catch (FalconException e) {
+                throw new NotificationServiceException(e);
+            }
+        }
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        listeners.remove(handler);
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+        return new JobCompletionRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "JobCompletionService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        LOG.debug("Registering to job end notification service");
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        onEnd(context, WorkflowJob.Status.SUCCEEDED);
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        onEnd(context, WorkflowJob.Status.FAILED);
+    }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // Do nothing
+    }
+
+    private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status status) throws FalconException {
+        JobCompletedEvent event = new JobCompletedEvent(constructCallbackID(context), status, getEndTime(context));
+        for (NotificationHandler handler : listeners) {
+            LOG.debug("Notifying {} with event {}", handler, event.getTarget());
+            handler.onEvent(event);
+        }
+    }
+
+    private DateTime getEndTime(WorkflowExecutionContext context) throws FalconException {
+        return new DateTime(DAGEngineFactory.getDAGEngine(context.getClusterName())
+                .info(context.getWorkflowId()).getEndTime());
+    }
+
+    // Constructs the callback ID from the details available in the context.
+    private ID constructCallbackID(WorkflowExecutionContext context) throws FalconException {
+        ID id = new ID(EntityType.valueOf(context.getEntityType()), context.getEntityName());
+        id.setCluster(context.getClusterName());
+        id.setInstanceTime(new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc));
+        return id;
+    }
+
+    private WorkflowExecutionContext createContext(Properties props) {
+        // for backwards compatibility, read all args from properties
+        Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            String optionValue = props.getProperty(arg.getName());
+            if (StringUtils.isNotEmpty(optionValue)) {
+                wfProperties.put(arg, optionValue);
+            }
+        }
+
+        return WorkflowExecutionContext.create(wfProperties);
+    }
+
+    /**
+     * Builds {@link JobCompletionNotificationRequest}.
+     */
+    public static class JobCompletionRequestBuilder extends RequestBuilder<JobCompletionNotificationRequest> {
+        private String cluster;
+        private String externalId;
+
+        public JobCompletionRequestBuilder(NotificationHandler handler, ID callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param clusterName
+         */
+        public JobCompletionRequestBuilder setCluster(String clusterName) {
+            this.cluster = clusterName;
+            return this;
+        }
+
+        /**
+         * @param id - The external job id for which job completion notification is requested.
+         * @return
+         */
+        public JobCompletionRequestBuilder setExternalId(String id) {
+            this.externalId = id;
+            return this;
+        }
+
+        @Override
+        public JobCompletionNotificationRequest build() {
+            return new JobCompletionNotificationRequest(handler, callbackId, cluster, externalId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
new file mode 100644
index 0000000..848f89c
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an execution
+ * instance is scheduled on a DAG Engine.
+ * Current implementation of scheduler handles parallel scheduling of instances,
+ * dependencies (an instance depending on completion of another) and priority.
+ */
+public class SchedulerService implements FalconNotificationService, NotificationHandler,
+        RemovalListener<ID, List<ExecutionInstance>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
+
+    public static final String DEFAULT_NUM_OF_SCHEDULER_THREADS = "5";
+    public static final String NUM_OF_SCHEDULER_THREADS_PROP = "scheduler.threads.count";
+
+    // Once scheduling conditions are met, it goes to run queue to be run on DAGEngine, based on priority.
+    private ThreadPoolExecutor runQueue;
+
+    private static final StateStore STATE_STORE = AbstractStateStore.get();
+
+    private Cache<ID, Object> instancesToIgnore;
+    // TODO : limit the no. of awaiting instances per entity
+    private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances;
+
+    @Override
+    public void register(NotificationRequest notifRequest) throws NotificationServiceException {
+        JobScheduleNotificationRequest request = (JobScheduleNotificationRequest) notifRequest;
+        if (request.getInstance() == null) {
+            throw new NotificationServiceException("Request must contain an instance.");
+        }
+        // When the instance is getting rescheduled for run. As in the case of suspend and resume.
+        Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId());
+        if (obj != null) {
+            instancesToIgnore.invalidate(request.getInstance().getId());
+        }
+        runQueue.execute(new InstanceRunner(request));
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        // If ID is that of an entity, do nothing
+        if (listenerID.getInstanceTime() == null) {
+            return;
+        }
+        // Not efficient to iterate over elements to remove this. Add to ignore list.
+        instancesToIgnore.put(listenerID, new Object());
+
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+        return new JobScheduleRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "JobSchedulerService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        int numThreads = Integer.parseInt(RuntimeProperties.get().getProperty(NUM_OF_SCHEDULER_THREADS_PROP,
+                DEFAULT_NUM_OF_SCHEDULER_THREADS));
+
+        // Uses a priority queue to ensure instances with higher priority gets run first.
+        PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator());
+        runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq);
+
+        CacheLoader instanceCacheLoader = new CacheLoader<ID, Collection<ExecutionInstance>>() {
+            @Override
+            public Collection<ExecutionInstance> load(ID id) throws Exception {
+                List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
+                states.add(InstanceState.STATE.READY);
+                List<ExecutionInstance> readyInstances = new ArrayList<>();
+                // TODO : Limit it to no. of instances that can be run in parallel.
+                for (InstanceState state : STATE_STORE.getExecutionInstances(id.getEntityID(), states)) {
+                    readyInstances.add(state.getInstance());
+                }
+                return readyInstances;
+            }
+        };
+
+        awaitedInstances = CacheBuilder.newBuilder()
+                .maximumSize(100)
+                .concurrencyLevel(1)
+                .removalListener(this)
+                .build(instanceCacheLoader);
+
+        instancesToIgnore = CacheBuilder.newBuilder()
+                .expireAfterWrite(1, TimeUnit.HOURS)
+                .concurrencyLevel(1)
+                .build();
+        // Interested in all job completion events.
+        JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest)
+                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
+                        .createRequestBuilder(this, null).build();
+        NotificationServicesRegistry.register(completionRequest);
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> removalNotification) {
+        // When instances are removed due to size...
+        // Ensure instances are persisted in state store and add to another list of awaited entities.
+        if (removalNotification.wasEvicted()) {
+            for (ExecutionInstance instance : removalNotification.getValue()) {
+                InstanceState state = new InstanceState(instance);
+                state.setCurrentState(InstanceState.STATE.READY);
+                try {
+                    STATE_STORE.updateExecutionInstance(state);
+                } catch (StateStoreException e) {
+                    throw new RuntimeException("Unable to persist the ready instance " + instance.getId(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onEvent(Event event) throws FalconException {
+        // Interested only in job completion events.
+        if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) {
+            try {
+                // Check if the instance is awaited.
+                ID id = event.getTarget();
+                List<ExecutionInstance> instances = awaitedInstances.get(id);
+                // Else, check if the entity is awaited.
+                if (instances == null) {
+                    id = id.getEntityID();
+                    instances = awaitedInstances.get(id);
+                }
+                if (instances != null && !instances.isEmpty()) {
+                    ExecutionInstance instance = instances.get(0);
+                    if (instance != null && instance.getAwaitingPredicates() != null) {
+                        for (Predicate predicate : instance.getAwaitingPredicates()) {
+                            if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) {
+                                // Construct a request object
+                                NotificationHandler handler = ReflectionUtils
+                                        .getInstanceByClassName(predicate.getClauseValue("handler").toString());
+                                JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder(
+                                        handler, instance.getId());
+                                requestBuilder.setInstance(instance);
+                                InstanceRunner runner = new InstanceRunner(requestBuilder.build());
+                                // Since an instance just finished of the same entity just finished
+                                if (id.equals(instance.getId())) {
+                                    runner.incrementAllowedInstances();
+                                }
+                                runQueue.execute(runner);
+                                instances.remove(instance);
+                            }
+                        }
+                    }
+                }
+                if (instances != null && instances.isEmpty()) {
+                    awaitedInstances.invalidate(id);
+                }
+            } catch (Exception e) {
+                throw new FalconException(e);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        runQueue.shutdownNow();
+        instancesToIgnore.invalidateAll();
+    }
+
+    private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException {
+        JobScheduledEvent event = new JobScheduledEvent(request.getCallbackId(), JobScheduledEvent.STATUS.FAILED);
+        request.getHandler().onEvent(event);
+    }
+
+    private class InstanceRunner implements Runnable {
+        private final ExecutionInstance instance;
+        private final JobScheduleNotificationRequest request;
+        private short priority;
+        private int allowedParallelInstances = 1;
+
+        public InstanceRunner(JobScheduleNotificationRequest request) {
+            this.request = request;
+            this.instance = request.getInstance();
+            this.priority = getPriority(instance.getEntity()).getPriority();
+            allowedParallelInstances = EntityUtil.getParallel(instance.getEntity());
+        }
+
+        public int incrementAllowedInstances() {
+            return ++allowedParallelInstances;
+        }
+
+        private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
+            switch(entity.getEntityType()) {
+            case PROCESS :
+                return EntityUtil.getPriority((Process)entity);
+            default :
+                throw new UnsupportedOperationException("Scheduling of entities other "
+                        + "than process is not supported yet.");
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                // If de-registered
+                if (instancesToIgnore.getIfPresent(instance.getId()) != null) {
+                    LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId());
+                    instancesToIgnore.invalidate(instance.getId());
+                    return;
+                }
+                LOG.debug("Received request to run instance {}", instance.getId());
+                if (checkConditions()) {
+                    // If instance not already scheduled.
+                    String externalId = instance.getExternalID();
+                    if (externalId == null) {
+                        externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
+                        LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
+                    }
+                    JobScheduledEvent event = new JobScheduledEvent(instance.getId(),
+                            JobScheduledEvent.STATUS.SUCCESSFUL);
+                    event.setExternalID(externalId);
+                    event.setStartTime(new DateTime(DAGEngineFactory.getDAGEngine(instance.getCluster())
+                            .info(externalId).getStartTime()));
+                    request.getHandler().onEvent(event);
+                }
+            } catch (FalconException e) {
+                LOG.error("Error running the instance : " + instance.getId(), e);
+                try {
+                    notifyFailureEvent(request);
+                } catch (FalconException fe) {
+                    throw new RuntimeException("Unable to onEvent : " + request.getCallbackId(), fe);
+                }
+            }
+        }
+
+        public short getPriority() {
+            return priority;
+        }
+
+        private boolean checkConditions() throws FalconException {
+            try {
+                // TODO : If and when the no. of scheduling conditions increase, consider chaining condition checks.
+                // Run if all conditions are met.
+                if (instanceCheck() && dependencyCheck()) {
+                    return true;
+                } else {
+                    ID entityID = instance.getId().getEntityID();
+                    // Instance is awaiting scheduling conditions to be met. Add predicate to that effect.
+                    instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(),
+                            entityID));
+                    updateAwaitedInstances(entityID);
+                    LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}",
+                            instance.getId(), entityID);
+                }
+            } catch (Exception e) {
+                LOG.error("Instance run failed with error : ", e);
+                throw new FalconException("Instance run failed", e);
+            }
+            return false;
+        }
+
+        private void updateAwaitedInstances(ID id) throws ExecutionException {
+            synchronized (id) {
+                List<ExecutionInstance> instances = awaitedInstances.get(id);
+                if (instances == null) {
+                    // Order is FIFO.
+                    instances = new LinkedList<>();
+                    awaitedInstances.put(id, instances);
+                }
+                instances.add(instance);
+            }
+        }
+
+        private boolean dependencyCheck() throws FalconException, ExecutionException {
+            if (request.getDependencies() == null || request.getDependencies().isEmpty()) {
+                return true;
+            }
+
+            for (ExecutionInstance execInstance : request.getDependencies()) {
+                // Dependants should wait for this instance to complete. Add predicate to that effect.
+                instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(
+                        request.getHandler(), execInstance.getId()));
+                updateAwaitedInstances(execInstance.getId());
+            }
+            return false;
+        }
+
+        // Ensure no. of instances running in parallel is per entity specification.
+        private boolean instanceCheck() throws StateStoreException {
+            return STATE_STORE.getExecutionInstances(instance.getEntity(), instance.getCluster(),
+                    InstanceState.getRunningStates()).size() < allowedParallelInstances;
+        }
+    }
+
+    // A priority based comparator to be used by the {@link java.util.concurrent.PriorityBlockingQueue}
+    private static class PriorityComparator<T extends InstanceRunner> implements Comparator<T>, Serializable {
+        @Override
+        public int compare(T o1, T o2) {
+            return o1.getPriority() - o2.getPriority();
+        }
+    }
+
+    /**
+     * Builds {@link JobScheduleNotificationRequest}.
+     */
+    public static class JobScheduleRequestBuilder extends RequestBuilder<JobScheduleNotificationRequest> {
+        private List<ExecutionInstance> dependencies;
+        private ExecutionInstance instance;
+
+        public JobScheduleRequestBuilder(NotificationHandler handler, ID callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param execInstance that needs to be scheduled
+         * @return
+         */
+        public JobScheduleRequestBuilder setInstance(ExecutionInstance execInstance) {
+            this.instance = execInstance;
+            return this;
+        }
+
+        /**
+         * Dependencies to wait for before scheduling.
+         * @param dependencies
+         */
+        public void setDependencies(List<ExecutionInstance> dependencies) {
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public JobScheduleNotificationRequest build() {
+            if (callbackId == null  || instance == null) {
+                throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+                        + " callbackId, execInstance");
+            }
+            return new JobScheduleNotificationRequest(handler, callbackId, instance, dependencies);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
new file mode 100644
index 0000000..2628dc8
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+import java.util.TimeZone;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.AlarmService}
+ * for time based notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Might need a separate builder too.
+ */
+public class AlarmRequest extends NotificationRequest {
+
+    private DateTime startTime;
+    private DateTime endTime;
+    private Frequency frequency;
+    private TimeZone timeZone;
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public AlarmRequest(NotificationHandler notifHandler, ID callbackId, DateTime start,
+                        DateTime end, Frequency freq, TimeZone tz) {
+        this.handler = notifHandler;
+        this.callbackId = callbackId;
+        this.service = NotificationServicesRegistry.SERVICE.TIME;
+        this.startTime = start;
+        this.endTime = end;
+        this.frequency = freq;
+        this.timeZone = tz;
+    }
+
+    /**
+     * @return frequency of the timer
+     */
+    public Frequency getFrequency() {
+        return frequency;
+    }
+
+    /**
+     * @return start time of the timer
+     */
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * @return end time of the timer
+     */
+    public DateTime getEndTime() {
+        return endTime;
+    }
+
+    /**
+     * @return timezone of the request.
+     */
+    public TimeZone getTimeZone() {
+        return timeZone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
new file mode 100644
index 0000000..8393de0
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * for data notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Complete/modify this skeletal class
+ */
+public class DataNotificationRequest extends NotificationRequest {
+    private final Path dataLocation;
+    private String cluster;
+
+    /**
+     * @return data location to be watched.
+     */
+    public Path getDataLocation() {
+        return dataLocation;
+    }
+
+    /**
+     * Given a number of instances, should the service wait for exactly those many,
+     * at least those many or at most those many instances.
+     */
+    public enum INSTANCELIMIT {
+        EXACTLY_N,
+        AT_LEAST_N,
+        AT_MOST_N
+    }
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId, Path location) {
+        this.handler = notifHandler;
+        this.callbackId = callbackId;
+        this.dataLocation = location;
+        this.service = NotificationServicesRegistry.SERVICE.DATA;
+    }
+
+    /**
+     * @return cluster name
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+    /**
+     * @param clusterName
+     * @return This instance
+     */
+    public DataNotificationRequest setCluster(String clusterName) {
+        this.cluster = clusterName;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
new file mode 100644
index 0000000..1d35476
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.JobCompletionService}
+ * for job completion notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobCompletionNotificationRequest extends NotificationRequest {
+
+    private String externalId;
+    private String cluster;
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public JobCompletionNotificationRequest(NotificationHandler notifHandler, ID callbackId, String clstr,
+                                            String jobId) {
+        this.handler = notifHandler;
+        this.service = NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+        this.callbackId = callbackId;
+        this.cluster = clstr;
+        this.externalId = jobId;
+    }
+
+    /**
+     * @return - The external job id for which job completion notification is requested.
+     */
+    public String getExternalId() {
+        return externalId;
+    }
+
+    /**
+     * @return cluster name
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
new file mode 100644
index 0000000..80133bd
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+import java.util.List;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.SchedulerService}
+ * for job run notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobScheduleNotificationRequest extends NotificationRequest {
+    private ExecutionInstance instance;
+    private List<ExecutionInstance> dependencies;
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param id
+     */
+    public JobScheduleNotificationRequest(NotificationHandler notifHandler, ID id, ExecutionInstance inst,
+                                          List<ExecutionInstance> deps) {
+        this.handler = notifHandler;
+        this.service = NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+        this.callbackId = id;
+        this.instance = inst;
+        this.dependencies = deps;
+    }
+
+    /**
+     * @return execution instance that will be scheduled.
+     */
+    public ExecutionInstance getInstance() {
+        return instance;
+    }
+
+    public List<ExecutionInstance> getDependencies() {
+        return dependencies;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
new file mode 100644
index 0000000..c89668d
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An abstract class that all notification requests of services must extend.
+ * TODO : Complete/modify this skeleton class
+ */
+public abstract class NotificationRequest {
+    protected NotificationHandler handler;
+    protected ID callbackId;
+    protected NotificationServicesRegistry.SERVICE service;
+
+    /**
+     * @return - The service that this request is intended for
+     */
+    public NotificationServicesRegistry.SERVICE getService() {
+        return service;
+    }
+
+    /**
+     * @return - The entity that needs to be notified when this request is satisfied.
+     */
+    public ID getCallbackId() {
+        return callbackId;
+    }
+
+    /**
+     * @return - The notification handler.
+     */
+    public NotificationHandler getHandler() {
+        return handler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
new file mode 100644
index 0000000..fb4ce82
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents the gating condition for which an instance is waiting before it is scheduled.
+ * This will be serialized and stored in state store.
+ */
+public class Predicate implements Serializable {
+    /**
+     * Type of predicate, currently data and time are supported.
+     */
+    public enum TYPE {
+        DATA,
+        TIME,
+        JOB_COMPLETION
+    }
+
+    private final TYPE type;
+
+    // A key-value pair of clauses that need make this predicate.
+    private Map<String, Comparable> clauses = new HashMap<String, Comparable>();
+
+    // A generic "any" object that can be used when a particular key is allowed to have any value.
+    public static final Comparable<? extends Serializable> ANY = new Any();
+
+    /**
+     * @return type of predicate
+     */
+    public TYPE getType() {
+        return type;
+    }
+
+    /**
+     * @param key
+     * @return the value corresponding to the key
+     */
+    public Comparable getClauseValue(String key) {
+        return clauses.get(key);
+    }
+
+    /**
+     * Compares this predicate with the supplied predicate.
+     *
+     * @param suppliedPredicate
+     * @return true, if the clauses of the predicates match. false, otherwise.
+     */
+    public boolean evaluate(Predicate suppliedPredicate) {
+        if (type != suppliedPredicate.getType()) {
+            return false;
+        }
+        boolean eval = true;
+        // Iterate over each clause and ensure it matches the clauses of this predicate.
+        for (Map.Entry<String, Comparable> entry : suppliedPredicate.getClauses().entrySet()) {
+            eval = eval && matches(entry.getKey(), entry.getValue());
+            if (!eval) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // Compares the two values of a key.
+    private boolean matches(String lhs, Comparable<? extends Serializable> rhs) {
+        if (clauses.containsKey(lhs) && clauses.get(lhs) != null
+                && rhs != null) {
+            if (clauses.get(lhs).equals(ANY) || rhs.equals(ANY)) {
+                return true;
+            } else {
+                return clauses.get(lhs).compareTo(rhs) == 0;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @param type of predicate
+     */
+    public Predicate(TYPE type) {
+        this.type = type;
+    }
+
+    /**
+     * @return the name-value pairs that make up the clauses of this predicate.
+     */
+    public Map<String, Comparable> getClauses() {
+        return clauses;
+    }
+
+    /**
+     * @param lhs - The key in the key-value pair of a clause
+     * @param rhs - The value in the key-value pair of a clause
+     * @return This instance
+     */
+    public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
+        clauses.put(lhs, rhs);
+        return this;
+    }
+
+    /**
+     * Creates a Predicate of Type TIME.
+     *
+     * @param start
+     * @param end
+     * @param instanceTime
+     * @return
+     */
+    public static Predicate createTimePredicate(long start, long end, long instanceTime) {
+        return new Predicate(TYPE.TIME)
+                .addClause("start", (start < 0) ? ANY : start)
+                .addClause("end", (end < 0) ? ANY : end)
+                .addClause("instanceTime", (instanceTime < 0) ? ANY : instanceTime);
+    }
+
+    /**
+     * Creates a predicate of type DATA.
+     *
+     * @param location
+     * @return
+     */
+    public static Predicate createDataPredicate(Location location) {
+        return new Predicate(TYPE.DATA)
+                .addClause("path", (location == null) ? ANY : location.getPath())
+                .addClause("type", (location == null) ? ANY : location.getType());
+    }
+
+    /**
+     * Creates a predicate of type JOB_COMPLETION.
+     *
+     * @param handler
+     * @param id
+     * @return
+     */
+    public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID id) {
+        return new Predicate(TYPE.JOB_COMPLETION)
+                .addClause("instanceId", id.toString())
+                .addClause("handler", handler.getClass().getName());
+    }
+
+    /**
+     * Creates a predicate from an event based on the event source and values in the event.
+     *
+     * @param event
+     * @return
+     * @throws FalconException
+     */
+    public static Predicate getPredicate(Event event) throws FalconException {
+        if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) {
+            DataEvent dataEvent = (DataEvent) event;
+            if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) {
+                Location loc = new Location();
+                loc.setPath(dataEvent.getDataLocation().toString());
+                loc.setType(dataEvent.getDataType());
+                return createDataPredicate(loc);
+            } else {
+                throw new FalconException("Event does not have enough data to create a predicate");
+            }
+        } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+            TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
+            if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) {
+                long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis();
+                return Predicate.createTimePredicate(timeEvent.getStartTime().getMillis(),
+                        timeEvent.getEndTime().getMillis(), instanceTime);
+            } else {
+                throw new FalconException("Event does not have enough data to create a predicate");
+            }
+
+        } else {
+            throw new FalconException("Unhandled event type " + event.getSource());
+        }
+    }
+
+    /**
+     * An "Any" class that returns '0' when compared to any other object.
+     */
+    private static class Any implements Comparable, Serializable {
+        @Override
+        public int compareTo(Object o) {
+            return 0;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return super.equals(o);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
new file mode 100644
index 0000000..15aea9a
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+
+/**
+ * Represents the state of a schedulable entity.
+ * Implements {@link org.apache.falcon.state.StateMachine} for an entity.
+ */
+public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> {
+    private Entity entity;
+    private STATE currentState;
+    private static final STATE INITIAL_STATE = STATE.SUBMITTED;
+
+    /**
+     * Enumerates all the valid states of a schedulable entity and the valid transitions from that state.
+     */
+    public enum STATE implements StateMachine<EntityState.STATE, EntityState.EVENT> {
+        SUBMITTED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case SCHEDULE:
+                    return STATE.SCHEDULED;
+                case SUBMIT:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Submitted entities can only be scheduled.");
+                }
+            }
+        },
+        SCHEDULED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case SUSPEND:
+                    return STATE.SUSPENDED;
+                case SCHEDULE:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Scheduled entities can only be suspended.");
+                }
+            }
+        },
+        SUSPENDED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case RESUME:
+                    return STATE.SCHEDULED;
+                case SUSPEND:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Suspended entities can only be resumed.");
+                }
+            }
+        }
+    }
+
+    /**
+     * Enumerates all the valid events that can cause a state transition.
+     */
+    public enum EVENT {
+        SUBMIT,
+        SCHEDULE,
+        SUSPEND,
+        RESUME
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param e - Entity
+     */
+    public EntityState(Entity e) {
+        this.entity = e;
+        currentState = INITIAL_STATE;
+    }
+
+    /**
+     * @return - The entity
+     */
+    public Entity getEntity() {
+        return entity;
+    }
+
+    /**
+     * @param e - entity
+     * @return - This instance
+     */
+    public EntityState setEntity(Entity e) {
+        this.entity = e;
+        return this;
+    }
+
+    /**
+     * @return - Current state of the entity.
+     */
+    public STATE getCurrentState() {
+        return currentState;
+    }
+
+    /**
+     * @param state
+     * @return - This instance
+     */
+    public EntityState setCurrentState(STATE state) {
+        this.currentState = state;
+        return this;
+    }
+
+    @Override
+    public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+        return currentState.nextTransition(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
new file mode 100644
index 0000000..44ec3fc
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+
+/**
+ * Any handler interested in handling state changes of entities must implement this interface.
+ */
+public interface EntityStateChangeHandler {
+
+    /**
+     * Invoked when an entity is submitted.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSubmit(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when an entity is scheduled.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSchedule(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when an entity is suspended.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSuspend(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when the an intity is resumed.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onResume(Entity entity) throws FalconException;
+}


Mime
View raw message