openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From japet...@apache.org
Subject [openwhisk-package-kafka] branch master updated: Ensure proper encoding for improper encoded keys (#353)
Date Tue, 15 Oct 2019 17:16:14 GMT
This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ea0c7e  Ensure proper encoding for improper encoded keys (#353)
9ea0c7e is described below

commit 9ea0c7ed8ec1ee6382e1a93baa0ae009693a7ded
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Tue Oct 15 13:16:06 2019 -0400

    Ensure proper encoding for improper encoded keys (#353)
    
    * Ensure proper encoding for invalid JSON messages
    
    * Fix formatting
---
 provider/consumer.py | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 5c1de24..74b243a 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -485,8 +485,7 @@ class ConsumerProcess (Process):
 
         return offsets
 
-    def __encodeMessageIfNeeded(self, value):
-        # let's make sure whatever data we're getting is utf-8 encoded
+    def __getUTF8Encoding(self, value):
         try:
             value.decode('utf-8')
         except UnicodeDecodeError:
@@ -497,8 +496,13 @@ class ConsumerProcess (Process):
                 logging.warn('[{}] Value contains non-unicode bytes. Replacing invalid bytes.'.format(self.trigger))
                 value = unicode(value, errors='replace').encode('utf-8')
         except AttributeError:
-           logging.warn('[{}] Cannot decode a NoneType message value'.format(self.trigger))
-           return value
+            logging.warn('[{}] Cannot decode a NoneType message value'.format(self.trigger))
+
+        return value
+
+
+    def __encodeMessageIfNeeded(self, value):
+        value = self.__getUTF8Encoding(value)
 
         if self.encodeValueAsJSON:
             try:
@@ -506,10 +510,10 @@ class ConsumerProcess (Process):
                 logging.debug('[{}] Successfully encoded a message as JSON.'.format(self.trigger))
                 return parsed
             except ValueError as e:
-                # no big deal, just return the original value
+                # message is not a JSON object, return the message as a JSON value
                 logging.warn('[{}] I was asked to encode a message as JSON, but I failed
with "{}".'.format(self.trigger, e))
                 value = "\"{}\"".format(value)
-                pass
+                return value
         elif self.encodeValueAsBase64:
             try:
                 parsed = value.encode("base64").strip()
@@ -532,6 +536,8 @@ class ConsumerProcess (Process):
                 logging.warn('[{}] Unable to encode a binary key.'.format(self.trigger))
                 pass
 
+        key = self.__getUTF8Encoding(key)
+
         logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
         return key
 


Mime
View raw message