openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From japet...@apache.org
Subject [incubator-openwhisk-package-kafka] branch master updated: Catch Doctor exceptions and do not persist consumer database connections (#343)
Date Wed, 05 Jun 2019 01:38:00 GMT
This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a88dd  Catch Doctor exceptions and do not persist consumer database connections
(#343)
02a88dd is described below

commit 02a88dd0de5f635f1a9cdbceffb2a5703b35bc49
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Tue Jun 4 21:37:55 2019 -0400

    Catch Doctor exceptions and do not persist consumer database connections (#343)
    
    * Catch Doctor exceptions
    
    * Ensure consumer gets restarted for database connection failures
    
    * A presistant DB connection is not needed for the consumer
---
 provider/consumer.py  | 14 +++++++++++---
 provider/thedoctor.py | 53 +++++++++++++++++++++++++++------------------------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 0647450..5c1de24 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -179,8 +179,6 @@ class ConsumerProcess (Process):
         else:
             self.encodeKeyAsBase64 = False
 
-        self.database = Database()
-
         # always init consumer to None in case the consumer needs to shut down
         # before the KafkaConsumer is fully initialized/assigned
         self.consumer = None
@@ -435,7 +433,17 @@ class ConsumerProcess (Process):
                         # abandon all hope?
                         self.setDesiredState(Consumer.State.Disabled)
                         # mark it disabled in the DB
-                        self.database.disableTrigger(self.trigger, status_code)
+
+                        # when failing to establish a database connection, mark the consumer
as dead to restart the consumer
+                        try:
+                            self.database = Database()
+                            self.database.disableTrigger(self.trigger, status_code)
+                        except Exception as e:
+                            logging.error('[{}] Uncaught exception: {}'.format(self.trigger,
e))
+                            self.__recordState(Consumer.State.Dead)
+                        finally:
+                            self.database.destroy()
+
                         retry = False
                 except requests.exceptions.RequestException as e:
                     logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger,
e))
diff --git a/provider/thedoctor.py b/provider/thedoctor.py
index 42fa9e7..42b4232 100644
--- a/provider/thedoctor.py
+++ b/provider/thedoctor.py
@@ -43,32 +43,35 @@ class TheDoctor (Thread):
         logging.info('[Doctor] The Doctor is in!')
 
         while True:
-            consumers = self.consumerCollection.getCopyForRead()
+            try:
+                consumers = self.consumerCollection.getCopyForRead()
 
-            for consumerId in consumers:
-                consumer = consumers[consumerId]
-                logging.debug('[Doctor] [{}] Consumer is in state: {}'.format(consumerId,
consumer.currentState()))
+                for consumerId in consumers:
+                    consumer = consumers[consumerId]
+                    logging.debug('[Doctor] [{}] Consumer is in state: {}'.format(consumerId,
consumer.currentState()))
 
-                if consumer.currentState() == Consumer.State.Dead and consumer.desiredState()
== Consumer.State.Running:
-                    # well this is unexpected...
-                    logging.error('[Doctor][{}] Consumer is dead, but should be alive!'.format(consumerId))
-                    consumer.restart()
-                elif consumer.currentState() == Consumer.State.Dead and consumer.desiredState()
== Consumer.State.Dead:
-                    # Bring out yer dead...
-                    if consumer.process.is_alive():
-                        logging.info('[{}] Joining dead process.'.format(consumer.trigger))
-                        # if you don't first join the process, it'll be left hanging around
as a "defunct" process
-                        consumer.process.join(1)
-                    else:
-                        logging.info('[{}] Process is already dead.'.format(consumer.trigger))
+                    if consumer.currentState() == Consumer.State.Dead and consumer.desiredState()
== Consumer.State.Running:
+                        # well this is unexpected...
+                        logging.error('[Doctor][{}] Consumer is dead, but should be alive!'.format(consumerId))
+                        consumer.restart()
+                    elif consumer.currentState() == Consumer.State.Dead and consumer.desiredState()
== Consumer.State.Dead:
+                        # Bring out yer dead...
+                        if consumer.process.is_alive():
+                            logging.info('[{}] Joining dead process.'.format(consumer.trigger))
+                            # if you don't first join the process, it'll be left hanging
around as a "defunct" process
+                            consumer.process.join(1)
+                        else:
+                            logging.info('[{}] Process is already dead.'.format(consumer.trigger))
 
-                    logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger))
-                    self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
-                elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and consumer.desiredState()
== Consumer.State.Running:
-                    # there seems to be an issue with the kafka-python client where it gets
into an
-                    # error-handling loop. This causes poll() to never complete, but also
does not
-                    # throw an exception.
-                    logging.error('[Doctor][{}] Consumer timed-out, but should be alive!
Restarting consumer.'.format(consumerId))
-                    consumer.restart()
+                        logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger))
+                        self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
+                    elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and
consumer.desiredState() == Consumer.State.Running:
+                        # there seems to be an issue with the kafka-python client where it
gets into an
+                        # error-handling loop. This causes poll() to never complete, but
also does not
+                        # throw an exception.
+                        logging.error('[Doctor][{}] Consumer timed-out, but should be alive!
Restarting consumer.'.format(consumerId))
+                        consumer.restart()
 
-            time.sleep(self.sleepy_time_seconds)
+                time.sleep(self.sleepy_time_seconds)
+            except Exception as e:
+                logging.error("[Doctor] Uncaught exception: {}".format(e))


Mime
View raw message