falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1205 SLAService to keep track of missing SLAs for feeds. Contributed by Ajay Yadava.
Date Tue, 15 Sep 2015 16:43:59 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 3a8659611 -> 947ed13b7


FALCON-1205 SLAService to keep track of missing SLAs for feeds. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/947ed13b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/947ed13b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/947ed13b

Branch: refs/heads/master
Commit: 947ed13b71aa80236396cbe86549feeb1c509a1c
Parents: 3a86596
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Sep 15 21:47:10 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Sep 15 21:47:10 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../src/main/java/org/apache/falcon/Pair.java   |   6 +-
 .../apache/falcon/entity/CatalogStorage.java    |   6 +
 .../org/apache/falcon/entity/FeedHelper.java    |   8 +
 .../apache/falcon/entity/FileSystemStorage.java |   9 +
 .../java/org/apache/falcon/entity/Storage.java  |   9 +
 common/src/main/resources/startup.properties    |   3 +
 .../apache/falcon/entity/AbstractTestBase.java  |   5 +-
 .../entity/store/FeedLocationStoreTest.java     |   7 +-
 .../service/FeedSLAMonitoringService.java       | 362 +++++++++++++++++++
 src/conf/startup.properties                     |   8 +
 11 files changed, 418 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd24a86..3aa300c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1205 SLAService to keep track of missing SLAs for feeds(Ajay Yadava)
+
     FALCON-1449 Move getEntityProperties method to EntityUtil.(Ajay Yadava)
 
     FALCON-1357 Update CHANGES.txt to change 0.7 branch to release.(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/client/src/main/java/org/apache/falcon/Pair.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/Pair.java b/client/src/main/java/org/apache/falcon/Pair.java
index e1f8bc3..d4cea90 100644
--- a/client/src/main/java/org/apache/falcon/Pair.java
+++ b/client/src/main/java/org/apache/falcon/Pair.java
@@ -18,12 +18,16 @@
 
 package org.apache.falcon;
 
+import java.io.Serializable;
+
 /**
  * Simple pair class to hold a pair of object of specific class.
  * @param <A> - First element in pair.
  * @param <B> - Second element in pair
  */
-public class Pair<A, B> {
+public class Pair<A, B> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
     public final A first;

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 7930fba..143d9b4 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -387,6 +387,12 @@ public class CatalogStorage extends Configured implements Storage {
     }
 
     @Override
+    public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed,
String clusterName,
+                                         LocationType locationType, Date instancetime) throws
FalconException {
+        throw new UnsupportedOperationException("getInstanceAvailabilityStatus"); //TODO
to be implemented later
+    }
+
+    @Override
     public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath)
throws FalconException {
         LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
                 getTable(), retentionLimit, timeZone);

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 894c370..572923b 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -740,4 +740,12 @@ public final class FeedHelper {
         }
         return result;
     }
+
+    public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed,
String clusterName,
+                                                                              Date instanceTime)
+        throws FalconException {
+        Storage storage = createStorage(clusterName, feed);
+        return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA,
instanceTime);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 9ff33e7..200f71f 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -460,6 +460,15 @@ public class FileSystemStorage extends Configured implements Storage
{
         }
     }
 
+    @Override
+    public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed,
String clusterName,
+                                                                   LocationType locationType,
+                                                                   Date instanceTime) throws
FalconException {
+
+        List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType,
instanceTime, instanceTime);
+        return result.get(0).getStatus();
+    }
+
     public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {
         FileStatus fileStatus = null;
         try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index 777cde8..3dc8f67 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -94,6 +94,15 @@ public interface Storage extends Configurable {
     List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
                                         Date start, Date end) throws FalconException;
 
+
+    /**
+     * Checks the availability status for a given feed instance.
+     */
+    FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String
clusterName,
+                                                                        LocationType locationType,
+                                                                        Date instanceTime)
throws FalconException;
+
+
     /**
      * Delete the instances of the feeds which are older than the retentionLimit specified.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 0593b96..39a412d 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -46,6 +46,7 @@
                         org.apache.falcon.entity.ColoClusterRelation,\
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
+                        org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
 
 ##### JMS MQ Broker Implementation class #####
@@ -69,6 +70,8 @@
 
 #Configurations used in UTs
 debug.config.store.uri=file://${user.dir}/target/store
+#Location to store state of Feed SLA monitoring service
+debug.pending.feed.instances.store.uri = file://${user.dir}/data/sla/pendingfeedinstances
 debug.config.oozie.conf.uri=${user.dir}/target/oozie
 debug.system.lib.location=${system.lib.location}
 debug.broker.url=vm://localhost

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index a36623c..6179855 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -72,8 +72,9 @@ public class AbstractTestBase {
 
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
-        StartupProperties.get().setProperty("configstore.listeners",
-                listeners.replace("org.apache.falcon.service.SharedLibraryHostingService",
""));
+        listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService",
"");
+        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService",
"");
+        StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
index b94c248..033a55b 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
@@ -60,10 +60,9 @@ public class FeedLocationStoreTest extends AbstractTestBase {
 
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
-        StartupProperties.get().setProperty("configstore.listeners",
-                listeners.replace("org.apache.falcon.service.SharedLibraryHostingService",
""));
-//        StartupProperties.get().setProperty("configstore.listeners",
-//                "org.apache.falcon.entity.store.FeedLocationStore");
+        listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService",
"");
+        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService",
"");
+        StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
new file mode 100644
index 0000000..37aa9e6
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service to monitor Feed SLAs.
+ */
+public class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService
{
+    private static final Logger LOG = LoggerFactory.getLogger(FeedSLAMonitoringService.class);
+
+    private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
+
+    private static final int ONE_MS = 1;
+
+    /**
+     * Permissions for storePath.
+     */
+    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE,
FsAction.NONE);
+
+    /**
+     * Feeds to be monitored.
+     */
+    private static Set<String> monitoredFeeds;
+
+
+    /**
+     * Map<Pair<feedName, clusterName>, Set<instanceTime> to store
+     * each missing instance of a feed.
+     */
+    private static Map<Pair<String, String>, Set<Date>> pendingInstances;
+
+
+    /**
+     * Used to store the last time when pending instances were checked for SLA.
+     */
+    private static Date lastCheckedAt;
+
+
+    /**
+     * Used to store last time when the state was serialized to the store.
+     */
+    private static Date lastSerializedAt;
+
+    /**
+     * Frequency in seconds of "status check" for pending feed instances.
+     */
+    private static final int STATUS_CHECK_FREQUENCY_SECS = 10 * 60; // 10 minutes
+
+
+    /**
+     * Time Duration (in milliseconds) in future for generating pending feed instances.
+     *
+     * In every cycle pending feed instances are added for monitoring, till this time in
future.
+     */
+    private static final int LOOKAHEAD_WINDOW_MILLIS = 15 * 60 * 1000; // 15 MINUTES
+
+
+    /**
+     * Frequency in milliseconds of serializing(for backup) monitoring service's state.
+     */
+    private int serializationFrequencyMillis;
+
+    /**
+     * Filesystem used for serializing and deserializing.
+     */
+    private FileSystem fileSystem;
+
+    /**
+     * Working directory for the feed sla monitoring service.
+     */
+    private Path storePath;
+
+    /**
+     * Path to store the state of the monitoring service.
+     */
+    private Path filePath;
+
+    @Override
+    public void onAdd(Entity entity) throws FalconException {
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null && feed.getSla() != null) {
+                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        LOG.debug("Adding feed:{} for monitoring", feed.getName());
+                        monitoredFeeds.add(feed.getName());
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onRemove(Entity entity) throws FalconException {
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            if (feed.getSla() != null) {
+                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        monitoredFeeds.remove(feed.getName());
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+        onRemove(oldEntity);
+        onAdd(newEntity);
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        onAdd(entity);
+    }
+
+    @Override
+    public String getName() {
+        return FeedSLAMonitoringService.class.getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
+        storePath = new Path(uri);
+        filePath = new Path(storePath, "feedSLAMonitoringService");
+        fileSystem = initializeFileSystem();
+
+        String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis",
ONE_HOUR);
+        try {
+            serializationFrequencyMillis = Integer.valueOf(freq);
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid value : {} found in startup.properties for the property "
+                    + "feed.sla.serialization.frequency.millis Should be an integer", freq);
+            throw new FalconException("Invalid integer value for property ", e);
+        }
+        deserialize(filePath);
+        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        serializeState(); // store the state of monitoring service to the disk.
+    }
+
+    private FileSystem initializeFileSystem() {
+        try {
+            fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+            if (!fileSystem.exists(storePath)) {
+                LOG.info("Creating directory for pending feed instances: {}", storePath);
+                // set permissions so config store dir is owned by falcon alone
+                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath,
e);
+        }
+    }
+
+    //Periodically update status of pending instances, add new instances and take backup.
+    private class Monitor implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                if (!monitoredFeeds.isEmpty()) {
+                    checkPendingInstanceAvailability();
+
+                    // add Instances from last checked time to 10 minutes from now(some buffer
for status check)
+                    Date now = new Date();
+                    Date newCheckPoint = new Date(now.getTime() + LOOKAHEAD_WINDOW_MILLIS);
+                    addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
+                    lastCheckedAt = newCheckPoint;
+
+                    //take backup
+                    if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis)
{
+                        serializeState();
+                        lastSerializedAt = new Date();
+                    }
+                }
+            } catch (Throwable e) {
+                LOG.error("Feed SLA monitoring failed: ", e);
+            }
+        }
+    }
+
+
+    void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        for (String feedName : monitoredFeeds) {
+
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
+            for (Cluster feedCluster : feed.getClusters().getClusters()) {
+                if (currentClusters.contains(feedCluster.getName())) {
+                    Date nextInstanceTime = from;
+                    Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
+                    Set<Date> instances = pendingInstances.get(key);
+                    if (instances == null) {
+                        instances = new HashSet<>();
+                    }
+
+                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
+                            EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+                    while (nextInstanceTime.before(to)) {
+                        nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster,
nextInstanceTime);
+                        instances.add(nextInstanceTime);
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+                    }
+                    pendingInstances.put(key, instances);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Checks the availability of all the pendingInstances and removes the ones which have
become available.
+     */
+    private void checkPendingInstanceAvailability() throws FalconException {
+        for (Map.Entry<Pair<String, String>, Set<Date>> entry: pendingInstances.entrySet())
{
+            for (Date date : entry.getValue()) {
+                boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second,
date);
+                if (status) {
+                    pendingInstances.get(entry.getKey()).remove(date);
+                }
+            }
+        }
+    }
+
+    // checks whether a given feed instance is available or not
+    private boolean checkFeedInstanceAvailability(String feedName, String clusterName, Date
nominalTime) throws
+        FalconException {
+        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
+
+        try {
+            LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}",
feed.getName(),
+                    clusterName, nominalTime);
+            FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus(feed,
clusterName,
+                    nominalTime);
+            if (status == FeedInstanceStatus.AvailabilityStatus.AVAILABLE) {
+                LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.",
feed.getName(),
+                    clusterName, nominalTime);
+                return true;
+            }
+        } catch (Throwable e) {
+            LOG.error("Couldn't find status for feed:{}, cluster:{}", feedName, clusterName,
e);
+        }
+        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.",
feed.getName(),
+            clusterName, nominalTime);
+        return false;
+    }
+
+    private void serializeState() throws FalconException{
+        LOG.info("Saving context to: [{}]", storePath);
+
+        //create a temporary file and rename it.
+        Path tmp = new Path(storePath , "tmp");
+        ObjectOutputStream oos = null;
+        try {
+            OutputStream out = fileSystem.create(tmp);
+            oos = new ObjectOutputStream(out);
+            Map<String, Object> state = new HashMap<>();
+            state.put("lastSerializedAt", lastSerializedAt.getTime());
+            state.put("lastCheckedAt", lastCheckedAt.getTime());
+            state.put("monitoredFeeds", monitoredFeeds);
+            state.put("pendingInstances", pendingInstances);
+            oos.writeObject(state);
+            fileSystem.rename(tmp, filePath);
+        } catch (IOException e) {
+            throw new FalconException("Error serializing context to : " + storePath.toUri(),
 e);
+        } finally {
+            IOUtils.closeQuietly(oos);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deserialize(Path path) {
+        try {
+            Map<String, Object> state = deserializeInternal(path);
+            pendingInstances = (Map<Pair<String, String>, Set<Date>>) state.get("pendingInstances");
+            lastCheckedAt = (Date) state.get("lastCheckedAt");
+            lastSerializedAt = (Date) state.get("lastSerializedAt");
+            monitoredFeeds = (Set<String>) state.get("monitoredFeeds");
+
+        } catch (Throwable throwable) {
+            LOG.error("Couldn't restore the state of feed sla monitoring service. Resetting
it", throwable);
+            pendingInstances = new HashMap<>();
+            lastCheckedAt = new Date();
+            lastSerializedAt = new Date();
+            monitoredFeeds = new HashSet<>();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> deserializeInternal(Path path) throws IOException,
ClassNotFoundException {
+        Map<String, Object> state;
+        InputStream in = fileSystem.open(path);
+        ObjectInputStream ois = new ObjectInputStream(in);
+        try {
+            state = (Map<String, Object>) ois.readObject();
+        } finally {
+            ois.close();
+        }
+        return state;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/947ed13b/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ca55689..305ac36 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -37,6 +37,7 @@
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
@@ -53,6 +54,7 @@ prism.application.services=org.apache.falcon.entity.store.ConfigurationStore
                         org.apache.falcon.entity.ColoClusterRelation,\
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
+                        org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
 
 ##### Prism Configuration Store Change listeners #####
@@ -77,6 +79,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Location to store user entity configurations
 *.config.store.uri=file://${falcon.home}/data/${falcon.app.type}-store
 
+#Location to store state of Feed SLA monitoring service
+*.feed.sla.service.store.uri = file://${falcon.home}/data/sla/pendingfeedinstances
+
 # Location of libraries that is shipped to Hadoop
 *.system.lib.location=${falcon.home}/server/webapp/${falcon.app.type}/WEB-INF/lib
 
@@ -84,6 +89,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 *.falcon.cleanup.service.frequency=days(1)
 
+# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
+*.feed.sla.serialization.frequency.millis=3600000
+
 
 ######### Properties for configuring JMS provider - activemq #########
 # Default Active MQ url


Mime
View raw message