openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From japet...@apache.org
Subject [openwhisk-package-kafka] branch master updated: Disable triggers for invalid auth when using custom auth handler (#354)
Date Thu, 17 Oct 2019 14:18:13 GMT
This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb2c9a  Disable triggers for invalid auth when using custom auth handler (#354)
bdb2c9a is described below

commit bdb2c9a98b693aadcb28a9ad89896655b544a653
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Thu Oct 17 10:18:03 2019 -0400

    Disable triggers for invalid auth when using custom auth handler (#354)
    
    * Disable triggers for invalid auth when using custom auth handler
    
    * Fix bad indentation
    
    * Call __shouldDisable on auth handler exception
    
    * Always dump response on auth handler exception
    
    * Use response.ok instead of checking status code
    
    * Fix typo
---
 provider/authHandler.py | 25 +++++++++++-----
 provider/consumer.py    | 80 +++++++++++++++++++++++++++----------------------
 2 files changed, 62 insertions(+), 43 deletions(-)

diff --git a/provider/authHandler.py b/provider/authHandler.py
index 0325161..58b1ef0 100644
--- a/provider/authHandler.py
+++ b/provider/authHandler.py
@@ -23,6 +23,9 @@ import time
 
 from requests.auth import AuthBase
 
+class AuthHandlerException(Exception):
+    def __init__(self, response):
+        self.response = response
 
 class IAMAuth(AuthBase):
 
@@ -35,18 +38,24 @@ class IAMAuth(AuthBase):
         r.headers['Authorization'] = 'Bearer {}'.format(self.__getToken())
         return r
 
-
     def __getToken(self):
         if 'expires_in' not in self.tokenInfo or self.__isRefreshTokenExpired():
-            self.tokenInfo = self.__requestToken()
-            return self.tokenInfo['access_token']
+            response = self.__requestToken()
+            if response.ok and 'access_token' in response.json():
+                self.tokenInfo = response.json()
+                return self.tokenInfo['access_token']
+            else:
+                raise AuthHandlerException(response)
         elif self.__isTokenExpired():
-            self.tokenInfo = self.__refreshToken()
-            return self.tokenInfo['access_token']
+            response = self.__refreshToken()
+            if response.ok and 'access_token' in response.json():
+                self.tokenInfo = response.json()
+                return self.tokenInfo['access_token']
+            else:
+                raise AuthHandlerException(response)
         else:
             return self.tokenInfo['access_token']
 
-
     def __requestToken(self):
         headers = {
             'Content-type': 'application/x-www-form-urlencoded',
@@ -86,7 +95,7 @@ class IAMAuth(AuthBase):
 
     def __isRefreshTokenExpired(self):
         if 'expiration' not in self.tokenInfo:
-            return true
+            return True
 
         sevenDays = 7 * 24 * 3600
         currentTime = int(time.time())
@@ -96,4 +105,4 @@ class IAMAuth(AuthBase):
 
     def __sendRequest(self, payload, headers):
         response = requests.post(self.endpoint, data=payload, headers=headers)
-        return response.json()
+        return response
diff --git a/provider/consumer.py b/provider/consumer.py
index 74b243a..8742af3 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -32,6 +32,7 @@ from datetime import datetime
 from datetimeutils import secondsSince
 from multiprocessing import Process, Manager
 from urlparse import urlparse
+from authHandler import AuthHandlerException
 from authHandler import IAMAuth
 from requests.auth import HTTPBasicAuth
 from datetime import datetime, timedelta
@@ -409,44 +410,19 @@ class ConsumerProcess (Process):
                         self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
                         retry = False
                     elif self.__shouldDisable(status_code):
-                        logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger,
status_code))
-                        response_dump = {
-                            'request': {
-                                'method': response.request.method,
-                                'url': response.request.url,
-                                'path_url': response.request.path_url,
-                                'headers': response.request.headers,
-                                'body': response.request.body
-                            },
-                            'response': {
-                                'status_code': response.status_code,
-                                'ok': response.ok,
-                                'reason': response.reason,
-                                'url': response.url,
-                                'headers': response.headers,
-                                'content': response.content
-                            }
-                        }
-
-                        logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger,
response_dump))
-
-                        # abandon all hope?
-                        self.setDesiredState(Consumer.State.Disabled)
-                        # mark it disabled in the DB
-
-                        # when failing to establish a database connection, mark the consumer
as dead to restart the consumer
-                        try:
-                            self.database = Database()
-                            self.database.disableTrigger(self.trigger, status_code)
-                        except Exception as e:
-                            logging.error('[{}] Uncaught exception: {}'.format(self.trigger,
e))
-                            self.__recordState(Consumer.State.Dead)
-                        finally:
-                            self.database.destroy()
-
                         retry = False
+                        logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger,
status_code))
+                        self.__dumpRequestResponse(response)
+                        self.__disableTrigger(status_code)
                 except requests.exceptions.RequestException as e:
                     logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger,
e))
+                except AuthHandlerException as e:
+                    logging.error("[{}] Encountered an exception from auth handler, status
code {}").format(self.trigger, e.response.status_code)
+                    self.__dumpRequestResponse(e.response)
+
+                    if self.__shouldDisable(e.response.status_code):
+                        retry = False
+                        self.__disableTrigger(e.response.status_code)
 
                 if retry:
                     retry_count += 1
@@ -460,6 +436,40 @@ class ConsumerProcess (Process):
                         self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
                         retry = False
 
+    def __disableTrigger(self, status_code):
+        self.setDesiredState(Consumer.State.Disabled)
+
+        # when failing to establish a database connection, mark the consumer as dead to restart
the consumer
+        try:
+            self.database = Database()
+            self.database.disableTrigger(self.trigger, status_code)
+        except Exception as e:
+            logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))
+            self.__recordState(Consumer.State.Dead)
+        finally:
+            self.database.destroy()
+
+    def __dumpRequestResponse(self, response):
+        response_dump = {
+            'request': {
+                'method': response.request.method,
+                'url': response.request.url,
+                'path_url': response.request.path_url,
+                'headers': response.request.headers,
+                'body': response.request.body
+            },
+            'response': {
+                'status_code': response.status_code,
+                'ok': response.ok,
+                'reason': response.reason,
+                'url': response.url,
+                'headers': response.headers,
+                'content': response.content
+            }
+        }
+
+        logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger,
response_dump))
+
     # return the dict that will be sent as the trigger payload
     def __getMessagePayload(self, message):
         return {


Mime
View raw message