falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2034 Make numThreads and timeOut configurable In ConfigurationStore init
Date Tue, 21 Jun 2016 04:51:23 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 037e6821b -> d3ebf0b3d


FALCON-2034 Make numThreads and timeOut configurable In ConfigurationStore init

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao, @peeyushb

Closes #192 from sandeepSamudrala/FALCON-2034 and squashes the following commits:

78b98d5 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore
init
9d00722 [sandeep] FALCON-2034. Make numThreads and timeOut configurable In ConfigurationStore
init


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d3ebf0b3
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d3ebf0b3
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d3ebf0b3

Branch: refs/heads/master
Commit: d3ebf0b3d826c32e068c0c786d51dd4d39a7f08f
Parents: 037e682
Author: sandeep <sandysmdl@gmail.com>
Authored: Tue Jun 21 10:21:14 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Jun 21 10:21:14 2016 +0530

----------------------------------------------------------------------
 .../falcon/entity/store/ConfigurationStore.java | 29 +++++++++++++++++---
 src/conf/startup.properties                     |  6 ++++
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d3ebf0b3/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index debf106..19e10bd 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -70,6 +70,10 @@ public final class ConfigurationStore implements FalconService {
     private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
+    private static final String LOAD_ENTITIES_THREADS = "config.store.num.threads.load.entities";
+    private static final String TIMEOUT_MINS_LOAD_ENTITIES = "config.store.start.timeout.minutes";
+    private int numThreads;
+    private int restoreTimeOutInMins;
     private final boolean shouldPersist;
 
     private static final FsPermission STORE_PERMISSION =
@@ -150,6 +154,21 @@ public final class ConfigurationStore implements FalconService {
 
     @Override
     public void init() throws FalconException {
+        try {
+            numThreads = Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS,
"100"));
+            LOG.info("Number of threads used to restore entities: {}", restoreTimeOutInMins);
+        } catch (NumberFormatException nfe) {
+            throw new FalconException("Invalid value specified for start up property \""
+                    + LOAD_ENTITIES_THREADS + "\".Please provide an integer value");
+        }
+        try {
+            restoreTimeOutInMins = Integer.parseInt(StartupProperties.get().
+                    getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30"));
+            LOG.info("TimeOut to load Entities is taken as {} mins", restoreTimeOutInMins);
+        } catch (NumberFormatException nfe) {
+            throw new FalconException("Invalid value specified for start up property \""
+                    + TIMEOUT_MINS_LOAD_ENTITIES + "\".Please provide an integer value");
+        }
         String listenerClassNames = StartupProperties.get().
                 getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
         for (String listenerClassName : listenerClassNames.split(",")) {
@@ -173,7 +192,8 @@ public final class ConfigurationStore implements FalconService {
             final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
             FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR
+ "*"));
             if (files != null) {
-                final ExecutorService service = Executors.newFixedThreadPool(100);
+
+                final ExecutorService service = Executors.newFixedThreadPool(numThreads);
                 for (final FileStatus file : files) {
                     service.execute(new Runnable() {
                         @Override
@@ -184,6 +204,7 @@ public final class ConfigurationStore implements FalconService {
                                 // ".xml"
                                 String entityName = URLDecoder.decode(encodedEntityName,
UTF_8);
                                 Entity entity = restore(type, entityName);
+                                LOG.info("Restored configuration {}/{}", type, entityName);
                                 entityMap.put(entityName, entity);
                             } catch (IOException | FalconException e) {
                                 LOG.error("Unable to restore entity of", file);
@@ -192,10 +213,10 @@ public final class ConfigurationStore implements FalconService {
                     });
                 }
                 service.shutdown();
-                if (service.awaitTermination(10, TimeUnit.MINUTES)) {
+                if (service.awaitTermination(restoreTimeOutInMins, TimeUnit.MINUTES)) {
                     LOG.info("Restored Configurations for entity type: {} ", type.name());
                 } else {
-                    LOG.warn("Time out happened while waiting for all threads to finish while
restoring entities "
+                    LOG.warn("Timed out while waiting for all threads to finish while restoring
entities "
                             + "for type: {}", type.name());
                 }
                 // Checking if all entities were loaded
@@ -341,6 +362,7 @@ public final class ConfigurationStore implements FalconService {
                 } catch (IOException e) {
                     throw new StoreAccessException(e);
                 }
+                LOG.info("Restored configuration {}/{}", type, name);
                 entityMap.put(name, entity);
                 return entity;
             } else {
@@ -450,7 +472,6 @@ public final class ConfigurationStore implements FalconService {
             throw new StoreAccessException("Unable to un-marshall xml definition for " +
type + "/" + name, e);
         } finally {
             in.close();
-            LOG.info("Restored configuration {}/{}", type, name);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d3ebf0b3/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ae50d51..5ac3d5c 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -136,6 +136,12 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 *.falcon.cleanup.service.frequency=days(1)
 
+# Default number of threads to be used to restore entities.
+*.config.store.num.threads.load.entities=100
+
+# Default timeout in minutes to load entities
+*.config.store.start.timeout.minutes=30
+
 ######### Properties for Feed SLA Monitoring #########
 # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
 *.feed.sla.serialization.frequency.millis=3600000


Mime
View raw message