openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csantan...@apache.org
Subject [incubator-openwhisk-package-cloudant] branch master updated: Applying filter on all db changes since 0 takes too long (#121)
Date Mon, 07 Aug 2017 18:55:21 GMT
This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-cloudant.git


The following commit(s) were added to refs/heads/master by this push:
     new 5da607a  Applying filter on all db changes since 0 takes too long (#121)
5da607a is described below

commit 5da607ae78082acb0732a0387a3f15e01a032226
Author: Jason Peterson <jasonpet@us.ibm.com>
AuthorDate: Mon Aug 7 14:55:20 2017 -0400

    Applying filter on all db changes since 0 takes too long (#121)
---
 provider/app.js           | 160 ++++++++++++++++++++++++++++------------------
 provider/lib/constants.js |  10 +--
 provider/lib/utils.js     |  52 ++++++++-------
 3 files changed, 131 insertions(+), 91 deletions(-)

diff --git a/provider/app.js b/provider/app.js
index 4e37f85..2746a54 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -23,7 +23,7 @@ app.use(bodyParser.urlencoded({ extended: false }));
 app.set('port', process.env.PORT || 8080);
 
 // Allow invoking servers with self-signed certificates.
-process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
+process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
 
 // If it does not already exist, create the triggers database.  This is the database that
will
 // store the managed triggers.
@@ -34,7 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
 var redisUrl = process.env.REDIS_URL;
-var ddname = '_design/' + constants.DESIGN_DOC_NAME;
+var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
+var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
 
 // Create the Provider Server
 var server = http.createServer(app);
@@ -42,56 +43,88 @@ server.listen(app.get('port'), function() {
     logger.info('server.listen', 'Express server listening on port ' + app.get('port'));
 });
 
-function createDatabase(nanop) {
+function createDatabase() {
     var method = 'createDatabase';
     logger.info(method, 'creating the trigger database');
 
-    return new Promise(function(resolve, reject) {
-        nanop.db.create(databaseName, function (err, body) {
-            if (!err) {
-                logger.info(method, 'created trigger database:', databaseName);
-            }
-            else if (err.statusCode !== 412) {
-                logger.info(method, 'failed to create trigger database:', databaseName, err);
-            }
-            var db = nanop.db.use(databaseName);
+    var nano = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@' +
dbHost);
+
+    if (nano !== null) {
+        return new Promise(function (resolve, reject) {
+            nano.db.create(databaseName, function (err, body) {
+                if (!err) {
+                    logger.info(method, 'created trigger database:', databaseName);
+                }
+                else if (err.statusCode !== 412) {
+                    logger.info(method, 'failed to create trigger database:', databaseName,
err);
+                }
 
-            var only_triggers_by_worker = function(doc, req) {
-                return doc.maxTriggers && ((!doc.worker && req.query.worker
=== 'worker0') || (doc.worker === req.query.worker));
-            }.toString();
+                var viewDD = {
+                    views: {
+                        triggers_by_worker: {
+                            map: function (doc) {
+                                if (doc.maxTriggers) {
+                                    emit(doc.worker || 'worker0', 1);
+                                }
+                            }.toString(),
+                            reduce: '_count'
+                        }
+                    }
+                };
 
-            db.get(ddname, function (error, body) {
-                if (error) {
-                    //new design doc
-                    db.insert({
+                createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
+                .then((db) => {
+                    var filterDD = {
                         filters: {
-                            only_triggers_by_worker: only_triggers_by_worker
-                        },
-                    }, ddname, function (error, body) {
-                        if (error && error.statusCode !== 409) {
-                            reject("filter could not be created: " + error);
+                            triggers_by_worker:
+                                function (doc, req) {
+                                    return doc.maxTriggers && ((!doc.worker &&
req.query.worker === 'worker0') ||
+                                            (doc.worker === req.query.worker));
+                                }.toString()
                         }
-                        resolve(db);
-                    });
-                }
-                else {
+                    };
+                    return createDesignDoc(db, filterDDName, filterDD);
+                })
+                .then((db) => {
                     resolve(db);
-                }
+                })
+                .catch(err => {
+                    reject(err);
+                });
+
             });
         });
-    });
-}
-
-function createTriggerDb() {
-    var nanop = require('nano')(dbProtocol + '://' + dbUsername + ':' + dbPassword + '@'
+ dbHost);
-    if (nanop !== null) {
-        return createDatabase(nanop);
     }
     else {
         Promise.reject('nano provider did not get created.  check db URL: ' + dbHost);
     }
 }
 
+function createDesignDoc(db, ddName, designDoc) {
+    var method = 'createDesignDoc';
+
+    return new Promise(function(resolve, reject) {
+
+        db.get(ddName, function (error, body) {
+            if (error) {
+                //new design doc
+                db.insert(designDoc, ddName, function (error, body) {
+                    if (error && error.statusCode !== 409) {
+                        logger.error(method, error);
+                        reject('design doc could not be created: ' + error);
+                    }
+                    else {
+                        resolve(db);
+                    }
+                });
+            }
+            else {
+                resolve(db);
+            }
+        });
+    });
+}
+
 function createRedisClient() {
     var method = 'createRedisClient';
 
@@ -101,11 +134,11 @@ function createRedisClient() {
             bluebird.promisifyAll(redis.RedisClient.prototype);
             var client = redis.createClient(redisUrl);
 
-            client.on("connect", function () {
+            client.on('connect', function () {
                 resolve(client);
             });
 
-            client.on("error", function (err) {
+            client.on('error', function (err) {
                 logger.error(method, 'Error connecting to redis', err);
                 reject(err);
             });
@@ -130,31 +163,32 @@ function init(server) {
         }
     }
 
-    createTriggerDb()
-        .then(db => {
-            nanoDb = db;
-            return createRedisClient();
-        })
-        .then(client => {
-            providerUtils = new ProviderUtils(logger, nanoDb, client);
-            return providerUtils.initRedis();
-        })
-        .then(() => {
-            var providerRAS = new ProviderRAS();
-            var providerHealth = new ProviderHealth(providerUtils);
-            var providerActivation = new ProviderActivation(logger, providerUtils);
-
-            // RAS Endpoint
-            app.get(providerRAS.endPoint, providerRAS.ras);
-
-            // Health Endpoint
-            app.get(providerHealth.endPoint, providerUtils.authorize, providerHealth.health);
-
-            // Activation Endpoint
-            app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
-
-            providerUtils.initAllTriggers();
-        }).catch(err => {
+    createDatabase()
+    .then(db => {
+        nanoDb = db;
+        return createRedisClient();
+    })
+    .then(client => {
+        providerUtils = new ProviderUtils(logger, nanoDb, client);
+        return providerUtils.initRedis();
+    })
+    .then(() => {
+        var providerRAS = new ProviderRAS();
+        var providerHealth = new ProviderHealth(providerUtils);
+        var providerActivation = new ProviderActivation(logger, providerUtils);
+
+        // RAS Endpoint
+        app.get(providerRAS.endPoint, providerRAS.ras);
+
+        // Health Endpoint
+        app.get(providerHealth.endPoint, providerUtils.authorize, providerHealth.health);
+
+        // Activation Endpoint
+        app.get(providerActivation.endPoint, providerUtils.authorize, providerActivation.active);
+
+        providerUtils.initAllTriggers();
+    })
+    .catch(err => {
         logger.error(method, 'an error occurred creating database:', err);
     });
 
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 2f2eca1..0203027 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -3,8 +3,9 @@ const DEFAULT_MAX_TRIGGERS = -1;
 const RETRY_ATTEMPTS = 12;
 const RETRY_DELAY = 1000; //in milliseconds
 const REDIS_KEY = 'active';
-const DESIGN_DOC_NAME = 'triggers';
-const FILTER_FUNCTION = 'only_triggers_by_worker';
+const FILTERS_DESIGN_DOC = 'triggerFilters';
+const VIEWS_DESIGN_DOC = 'triggerViews';
+const TRIGGERS_BY_WORKER = 'triggers_by_worker';
 
 
 module.exports = {
@@ -13,6 +14,7 @@ module.exports = {
     RETRY_ATTEMPTS: RETRY_ATTEMPTS,
     RETRY_DELAY: RETRY_DELAY,
     REDIS_KEY: REDIS_KEY,
-    DESIGN_DOC_NAME: DESIGN_DOC_NAME,
-    FILTER_FUNCTION: FILTER_FUNCTION
+    FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
+    VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
+    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
 };
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 31ba55b..5c631ff 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -3,6 +3,7 @@ var request = require('request');
 var HttpStatus = require('http-status-codes');
 var constants = require('./constants.js');
 
+
 module.exports = function(
   logger,
   triggerDB,
@@ -12,7 +13,7 @@ module.exports = function(
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
-    this.worker = process.env.WORKER || "worker0";
+    this.worker = process.env.WORKER || 'worker0';
     this.host = process.env.HOST_INDEX || 'host0';
     this.hostMachine = process.env.HOST_MACHINE;
     this.activeHost = 'host0'; //default value on init (will be updated for existing redis)
@@ -21,8 +22,9 @@ module.exports = function(
     this.redisKey = constants.REDIS_KEY;
 
     var retryAttempts = constants.RETRY_ATTEMPTS;
-    var ddname = constants.DESIGN_DOC_NAME;
-    var filter = constants.FILTER_FUNCTION;
+    var filterDDName = constants.FILTERS_DESIGN_DOC;
+    var viewDDName = constants.VIEWS_DESIGN_DOC;
+    var triggersByWorker = constants.TRIGGERS_BY_WORKER;
     var utils = this;
 
     // Add a trigger: listen for changes and dispatch.
@@ -102,12 +104,12 @@ module.exports = function(
             id: newTrigger.id,
             host: newTrigger.host,
             port: newTrigger.port,
-            protocol: newTrigger.protocol || "https",
+            protocol: newTrigger.protocol || 'https',
             dbname: newTrigger.dbname,
             user: newTrigger.user,
             pass: newTrigger.pass,
             apikey: newTrigger.apikey,
-            since: newTrigger.since || "now",
+            since: newTrigger.since || 'now',
             maxTriggers: maxTriggers,
             triggersLeft: maxTriggers,
             filter: newTrigger.filter,
@@ -193,7 +195,8 @@ module.exports = function(
                 utils.disableTrigger(dataTrigger.id, undefined, 'Automatically disabled after
reaching max triggers');
                 logger.error(method, 'no more triggers left, disabled', dataTrigger.id);
             }
-        }).catch(err => {
+        })
+        .catch(err => {
             logger.error(method, err);
         });
     };
@@ -230,7 +233,8 @@ module.exports = function(
                                     utils.postTrigger(dataTrigger, form, uri, auth, (retryCount
+ 1))
                                     .then(triggerId => {
                                         resolve(triggerId);
-                                    }).catch(err => {
+                                    })
+                                    .catch(err => {
                                         reject(err);
                                     });
                                 }, timeout);
@@ -257,15 +261,17 @@ module.exports = function(
     this.initAllTriggers = function() {
         var method = 'initAllTriggers';
 
-        logger.info(method, 'resetting system from last state');
+        //follow the trigger DB
+        utils.setupFollow('now');
 
-        triggerDB.changes({ since: 0, include_docs: true, filter: ddname + '/' + filter,
worker: utils.worker }, (err, changes) => {
+        logger.info(method, 'resetting system from last state');
+        triggerDB.view(viewDDName, triggersByWorker, {reduce: false, include_docs: true,
key: utils.worker}, function(err, body) {
             if (!err) {
-                changes.results.forEach(function (change) {
-                    var triggerIdentifier = change.id;
-                    var doc = change.doc;
+                body.rows.forEach(function (trigger) {
+                    var triggerIdentifier = trigger.id;
+                    var doc = trigger.doc;
 
-                    if (!doc.status || doc.status.active === true) {
+                    if ((!doc.status || doc.status.active === true) && !(triggerIdentifier
in utils.triggers)) {
                         //check if trigger still exists in whisk db
                         var triggerObj = utils.parseQName(triggerIdentifier);
                         var host = 'https://' + utils.routerHost + ':' + 443;
@@ -290,8 +296,9 @@ module.exports = function(
                             else {
                                 utils.createTrigger(utils.initTrigger(doc))
                                 .then(triggerIdentifier => {
-                                    logger.info(method, 'Trigger was added.', triggerIdentifier);
-                                }).catch(err => {
+                                    logger.info(method, triggerIdentifier, 'created successfully');
+                                })
+                                .catch(err => {
                                     var message = 'Automatically disabled after receiving
exception on init trigger: ' + err;
                                     utils.disableTrigger(triggerIdentifier, undefined, message);
                                     logger.error(method, 'Disabled trigger', triggerIdentifier,
'due to exception:', err);
@@ -299,11 +306,7 @@ module.exports = function(
                             }
                         });
                     }
-                    else {
-                        logger.info(method, 'ignoring trigger', triggerIdentifier, 'since
it is disabled.');
-                    }
                 });
-                utils.setupFollow(changes.last_seq);
             } else {
                 logger.error(method, 'could not get latest state from database', err);
             }
@@ -317,7 +320,7 @@ module.exports = function(
             var feed = triggerDB.follow({
                 since: seq,
                 include_docs: true,
-                filter: ddname + '/' + filter,
+                filter: filterDDName + '/' + triggersByWorker,
                 query_params: {worker: utils.worker}
             });
 
@@ -338,7 +341,8 @@ module.exports = function(
                         utils.createTrigger(utils.initTrigger(doc))
                         .then(triggerIdentifier => {
                             logger.info(method, triggerIdentifier, 'created successfully');
-                        }).catch(err => {
+                        })
+                        .catch(err => {
                             var message = 'Automatically disabled after receiving exception
on create trigger: ' + err;
                             utils.disableTrigger(triggerIdentifier, undefined, message);
                             logger.error(method, 'Disabled trigger', triggerIdentifier, 'due
to exception:', err);
@@ -425,14 +429,14 @@ module.exports = function(
                 var subscriber = redisClient.duplicate();
 
                 //create a subscriber client that listens for requests to perform swap
-                subscriber.on("message", function (channel, message) {
+                subscriber.on('message', function (channel, message) {
                     if (message === 'host0' || message === 'host1') {
-                        logger.info(method, message, "set to active host in channel", channel);
+                        logger.info(method, message, 'set to active host in channel', channel);
                         utils.activeHost = message;
                     }
                 });
 
-                subscriber.on("error", function (err) {
+                subscriber.on('error', function (err) {
                     logger.error(method, 'Error connecting to redis', err);
                     reject(err);
                 });

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message