openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dube...@apache.org
Subject [openwhisk-package-kafka] branch master updated: Retry failed database changes (#365)
Date Wed, 08 Jan 2020 14:35:08 GMT
This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 507fa2e  Retry failed database changes (#365)
507fa2e is described below

commit 507fa2e614dc60903771ef973508dd3803977e83
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Wed Jan 8 09:34:59 2020 -0500

    Retry failed database changes (#365)
---
 provider/service.py | 147 ++++++++++++++++++++++++++++++----------------------
 1 file changed, 84 insertions(+), 63 deletions(-)

diff --git a/provider/service.py b/provider/service.py
index b8a272f..26e7909 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,69 +68,7 @@ class Service (Thread):
                     # check whether or not the feed is capable of detecting canary
                     # documents
                     if change != None:
-                        if "deleted" in change and change["deleted"] == True:
-                            logging.info('[changes] Found a delete')
-                            consumer = self.consumers.getConsumerForTrigger(change['id'])
-                            if consumer != None:
-                                if consumer.desiredState() == Consumer.State.Disabled:
-                                    # just remove it from memory
-                                    logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
-                                    self.consumers.removeConsumerForTrigger(consumer.trigger)
-                                else:
-                                    logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
-                                    consumer.shutdown()
-                        # since we can't use a filter function for the feed (then
-                        # you don't get deletes) we need to manually verify this
-                        # is a valid trigger doc that has changed
-                        elif 'triggerURL' in change['doc']:
-                            logging.info('[changes] Found a change in a trigger document')
-                            document = change['doc']
-                            triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
-
-                            if not self.consumers.hasConsumerForTrigger(change["id"]):
-                                if triggerIsAssignedToMe:
-                                    logging.info('[{}] Found a new trigger to create'.format(change["id"]))
-                                    self.createAndRunConsumer(document)
-                                else:
-                                    logging.info("[{}] Found a new trigger, but is assigned
to another worker: {}".format(change["id"], document["worker"]))
-                            else:
-                                existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
-
-                                if existingConsumer.desiredState() == Consumer.State.Running
and not self.__isTriggerDocActive(document):
-                                    # running trigger should become disabled
-                                    # this should be done regardless of which worker the
document claims to be assigned to
-                                    logging.info('[{}] Existing running trigger should become
disabled'.format(change["id"]))
-                                    existingConsumer.disable()
-                                elif triggerIsAssignedToMe:
-                                    logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
-
-                                    if existingConsumer.desiredState() == Consumer.State.Dead
and self.__isTriggerDocActive(document):
-                                        # if a delete occurs followed quickly by a create
the consumer might get stuck in a dead state,
-                                        # so we need to forcefully delete the process before
recreating it.
-                                        logging.info('[{}] A create event occurred for a
trigger that is shutting down'.format(change["id"]))
-
-                                        if existingConsumer.process.is_alive():
-                                            logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
-                                            existingConsumer.process.join(1)
-                                        else:
-                                            logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
-
-                                        self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
-                                        self.createAndRunConsumer(document)
-                                    elif existingConsumer.desiredState() == Consumer.State.Disabled
and self.__isTriggerDocActive(document):
-                                        # disabled trigger has become active
-                                        logging.info('[{}] Existing disabled trigger should
become active'.format(change["id"]))
-                                        self.createAndRunConsumer(document)
-                                else:
-                                    # trigger has become reassigned to a different worker
-                                    logging.info("[{}] Shutting down trigger as it has been
re-assigned to {}".format(change["id"], document["worker"]))
-                                    existingConsumer.shutdown()
-                        elif 'canary-timestamp' in change['doc']:
-                            # found a canary - update lastCanaryTime
-                            logging.info('[canary] I found a canary. The last one was {}
seconds ago.'.format(secondsSince(self.lastCanaryTime)))
-                            self.lastCanaryTime = datetime.now()
-                        else:
-                            logging.debug('[changes] Found a change for a non-trigger document')
+                        self.__handleDocChange(change)
 
                         # Record the sequence in case the changes feed needs to be
                         # restarted. This way the new feed can pick up right where
@@ -143,6 +81,89 @@ class Service (Thread):
 
             logging.debug("[changes] I made it out of the changes loop!")
 
+    def __handleDocChange(self, change):
+        retry = True
+        retryCount = 0
+        maxRetries = 5
+
+        while retry:
+            try:
+                if "deleted" in change and change["deleted"] == True:
+                    logging.info('[changes] Found a delete')
+                    consumer = self.consumers.getConsumerForTrigger(change['id'])
+                    if consumer != None:
+                        if consumer.desiredState() == Consumer.State.Disabled:
+                            # just remove it from memory
+                            logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
+                            self.consumers.removeConsumerForTrigger(consumer.trigger)
+                        else:
+                            logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
+                            consumer.shutdown()
+                # since we can't use a filter function for the feed (then
+                # you don't get deletes) we need to manually verify this
+                # is a valid trigger doc that has changed
+                elif 'triggerURL' in change['doc']:
+                    logging.info('[changes] Found a change in a trigger document')
+                    document = change['doc']
+                    triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
+
+                    if not self.consumers.hasConsumerForTrigger(change["id"]):
+                        if triggerIsAssignedToMe:
+                            logging.info('[{}] Found a new trigger to create'.format(change["id"]))
+                            self.createAndRunConsumer(document)
+                        else:
+                            logging.info("[{}] Found a new trigger, but is assigned to another
worker: {}".format(change["id"], document["worker"]))
+                    else:
+                        existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
+
+                        if existingConsumer.desiredState() == Consumer.State.Running and
not self.__isTriggerDocActive(document):
+                            # running trigger should become disabled
+                            # this should be done regardless of which worker the document
claims to be assigned to
+                            logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
+                            existingConsumer.disable()
+                        elif triggerIsAssignedToMe:
+                            logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
+
+                            if existingConsumer.desiredState() == Consumer.State.Dead and
self.__isTriggerDocActive(document):
+                                # if a delete occurs followed quickly by a create the consumer
might get stuck in a dead state,
+                                # so we need to forcefully delete the process before recreating
it.
+                                logging.info('[{}] A create event occurred for a trigger
that is shutting down'.format(change["id"]))
+
+                                if existingConsumer.process.is_alive():
+                                    logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
+                                    existingConsumer.process.join(1)
+                                else:
+                                    logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
+
+                                self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+                                self.createAndRunConsumer(document)
+                            elif existingConsumer.desiredState() == Consumer.State.Disabled
and self.__isTriggerDocActive(document):
+                                # disabled trigger has become active
+                                logging.info('[{}] Existing disabled trigger should become
active'.format(change["id"]))
+                                self.createAndRunConsumer(document)
+                        else:
+                            # trigger has become reassigned to a different worker
+                            logging.info("[{}] Shutting down trigger as it has been re-assigned
to {}".format(change["id"], document["worker"]))
+                            existingConsumer.shutdown()
+                elif 'canary-timestamp' in change['doc']:
+                    # found a canary - update lastCanaryTime
+                    logging.info('[canary] I found a canary. The last one was {} seconds
ago.'.format(secondsSince(self.lastCanaryTime)))
+                    self.lastCanaryTime = datetime.now()
+                else:
+                    logging.debug('[changes] Found a change for a non-trigger document')
+
+                retry = False
+            except Exception as e:
+                logging.error('[{}] Exception caught while handling change.'.format(change["id"]))
+                logging.error(e)
+
+                if retry:
+                    retryCount += 1
+
+                    if retryCount >= maxRetries:
+                        logging.warn('[{}] Maximum number of retries exceeded for failed
change.'.format(change["id"]))
+                        retry = False
+
     def __isTriggerDocAssignedToMe(self, doc):
         if "worker" in doc:
             return doc["worker"] == self.workerId


Mime
View raw message