qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject [1/2] qpid-interop-test git commit: QPIDIT-79: Fixed threading issues in shims, ctl+c now stops shims as they should. QPIDIT-110: Also added check for sender shim failure, if so, signals receiver shim to stop
Date Thu, 01 Feb 2018 15:21:51 GMT
Repository: qpid-interop-test
Updated Branches:
  refs/heads/master aea81083a -> f93886f54


http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f93886f5/src/python/qpid_interop_test/jms_hdrs_props_test.py
----------------------------------------------------------------------
diff --git a/src/python/qpid_interop_test/jms_hdrs_props_test.py b/src/python/qpid_interop_test/jms_hdrs_props_test.py
index 9059e19..ea5dfd6 100755
--- a/src/python/qpid_interop_test/jms_hdrs_props_test.py
+++ b/src/python/qpid_interop_test/jms_hdrs_props_test.py
@@ -23,6 +23,7 @@ Module to test JMS headers and properties on messages accross different
clients
 # under the License.
 #
 
+import signal
 import sys
 import unittest
 
@@ -32,6 +33,7 @@ from json import dumps
 import qpid_interop_test.broker_properties
 import qpid_interop_test.qit_common
 import qpid_interop_test.shims
+from qpid_interop_test.interop_test_errors import InteropTestError
 
 
 class JmsHdrPropTypes(qpid_interop_test.qit_common.QitTestTypeMap):
@@ -296,57 +298,55 @@ class JmsMessageHdrsPropsTestCase(qpid_interop_test.qit_common.QitTestCase):
         # Start the receiver shim
         receiver = receive_shim.create_receiver(receiver_addr, queue_name, jms_message_type,
                                                 dumps([num_test_values_map, flags_map]))
-        receiver.start()
 
         # Start the send shim
         sender = send_shim.create_sender(sender_addr, queue_name, jms_message_type,
                                          dumps([test_values, msg_hdrs, msg_props]))
-        sender.start()
 
-        # Wait for both shims to finish
-        sender.join_or_kill(qpid_interop_test.shims.THREAD_TIMEOUT)
-        receiver.join_or_kill(qpid_interop_test.shims.THREAD_TIMEOUT)
-
-        # Process return string from sender
-        send_obj = sender.get_return_object()
+        # Wait for sender, process return string
+        try:
+            send_obj = sender.wait_for_completion()
+        except KeyboardInterrupt as err:
+            receiver.send_signal(signal.SIGINT)
+            raise err
         if send_obj is not None:
             if isinstance(send_obj, str):
                 if send_obj: # len > 0
-                    self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
+                    receiver.send_signal(signal.SIGINT)
+                    raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
             else:
-                self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, str(send_obj)))
-
-        # Process return string from receiver
-        receive_obj = receiver.get_return_object()
-        if receive_obj is None:
-            self.fail('JmsReceiver shim returned None')
-        else:
-            if isinstance(receive_obj, tuple):
-                if len(receive_obj) == 2:
-                    return_jms_message_type, return_list = receive_obj
-                    if len(return_list) == 3:
-                        return_test_values = return_list[0]
-                        return_msg_hdrs = return_list[1]
-                        return_msg_props = return_list[2]
-                        self.assertEqual(return_jms_message_type, jms_message_type,
-                                         msg='JMS message type error:\n\n    sent:%s\n\n
   received:%s' % \
-                                         (jms_message_type, return_jms_message_type))
-                        self.assertEqual(return_test_values, test_values,
-                                         msg='JMS message body error:\n\n    sent:%s\n\n
   received:%s' % \
-                                         (test_values, return_test_values))
-                        self.assertEqual(return_msg_hdrs, msg_hdrs,
-                                         msg='JMS message headers error:\n\n    sent:%s\n\n
   received:%s' % \
-                                         (msg_hdrs, return_msg_hdrs))
-                        self.assertEqual(return_msg_props, msg_props,
-                                         msg='JMS message properties error:\n\n    sent:%s\n\n
   received:%s' % \
-                                         (msg_props, return_msg_props))
-                    else:
-                        self.fail('Return value list needs 3 items, found %d items: %s' %
(len(return_list),
-                                                                                        
  str(return_list)))
+                receiver.send_signal(signal.SIGINT)
+                raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
+
+        # Wait for receiver, process return string
+        receive_obj = receiver.wait_for_completion()
+        if isinstance(receive_obj, tuple):
+            if len(receive_obj) == 2:
+                return_jms_message_type, return_list = receive_obj
+                if len(return_list) == 3:
+                    return_test_values = return_list[0]
+                    return_msg_hdrs = return_list[1]
+                    return_msg_props = return_list[2]
+                    self.assertEqual(return_jms_message_type, jms_message_type,
+                                     msg='JMS message type error:\n\n    sent:%s\n\n    received:%s'
% \
+                                     (jms_message_type, return_jms_message_type))
+                    self.assertEqual(return_test_values, test_values,
+                                     msg='JMS message body error:\n\n    sent:%s\n\n    received:%s'
% \
+                                     (test_values, return_test_values))
+                    self.assertEqual(return_msg_hdrs, msg_hdrs,
+                                     msg='JMS message headers error:\n\n    sent:%s\n\n 
  received:%s' % \
+                                     (msg_hdrs, return_msg_hdrs))
+                    self.assertEqual(return_msg_props, msg_props,
+                                     msg='JMS message properties error:\n\n    sent:%s\n\n
   received:%s' % \
+                                     (msg_props, return_msg_props))
                 else:
-                    self.fail('Return tuple needs 2 items, found %d items: %s' % (len(receive_obj),
str(receive_obj)))
+                    raise InteropTestError('Receive shim \'%s\':\n' \
+                                           'Return value list needs 3 items, found %d items:
%s' % \
+                                           (receive_shim.NAME, len(return_list), str(return_list)))
             else:
-                self.fail(str(receive_obj))
+                raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
+        else:
+            raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
 
 
 class TestOptions(qpid_interop_test.qit_common.QitCommonTestOptions):

http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f93886f5/src/python/qpid_interop_test/jms_messages_test.py
----------------------------------------------------------------------
diff --git a/src/python/qpid_interop_test/jms_messages_test.py b/src/python/qpid_interop_test/jms_messages_test.py
index efd3885..9dc29b9 100755
--- a/src/python/qpid_interop_test/jms_messages_test.py
+++ b/src/python/qpid_interop_test/jms_messages_test.py
@@ -23,6 +23,7 @@ Module to test JMS message types across different clients
 # under the License.
 #
 
+import signal
 import sys
 import unittest
 
@@ -32,6 +33,7 @@ from json import dumps
 import qpid_interop_test.broker_properties
 import qpid_interop_test.qit_common
 import qpid_interop_test.shims
+from qpid_interop_test.interop_test_errors import InteropTestError
 
 
 class JmsMessageTypes(qpid_interop_test.qit_common.QitTestTypeMap):
@@ -208,44 +210,40 @@ class JmsMessageTypeTestCase(qpid_interop_test.qit_common.QitTestCase):
         # Start the receiver shim
         receiver = receive_shim.create_receiver(receiver_addr, queue_name, jms_message_type,
                                                 dumps(num_test_values_map))
-        receiver.start()
 
         # Start the send shim
-        sender = send_shim.create_sender(sender_addr, queue_name, jms_message_type,
-                                         dumps(test_values))
-        sender.start()
-
-        # Wait for both shims to finish
-        sender.join_or_kill(qpid_interop_test.shims.THREAD_TIMEOUT)
-        receiver.join_or_kill(qpid_interop_test.shims.THREAD_TIMEOUT)
-
-        # Process return string from sender
-        send_obj = sender.get_return_object()
+        sender = send_shim.create_sender(sender_addr, queue_name, jms_message_type, dumps(test_values))
+
+        # Wait for sender, process return string
+        try:
+            send_obj = sender.wait_for_completion()
+        except KeyboardInterrupt as err:
+            receiver.send_signal(signal.SIGINT)
+            raise err
         if send_obj is not None:
             if isinstance(send_obj, str):
                 if send_obj: # len > 0
-                    self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
+                    receiver.send_signal(signal.SIGINT)
+                    raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
             else:
-                self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, str(send_obj)))
-
-        # Process return string from receiver
-        receive_obj = receiver.get_return_object()
-        if receive_obj is None:
-            self.fail('JmsReceiver shim returned None')
-        else:
-            if isinstance(receive_obj, tuple):
-                if len(receive_obj) == 2:
-                    return_jms_message_type, return_test_values = receive_obj
-                    self.assertEqual(return_jms_message_type, jms_message_type,
-                                     msg='JMS message type error:\n\n    sent:%s\n\n    received:%s'
% \
-                                     (jms_message_type, return_jms_message_type))
-                    self.assertEqual(return_test_values, test_values,
-                                     msg='JMS message body error:\n\n    sent:%s\n\n    received:%s'
% \
-                                     (test_values, return_test_values))
-                else:
-                    self.fail('Received incorrect tuple format: %s' % str(receive_obj))
+                receiver.send_signal(signal.SIGINT)
+                raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
+
+        # Wait for receiver, process return string
+        receive_obj = receiver.wait_for_completion()
+        if isinstance(receive_obj, tuple):
+            if len(receive_obj) == 2:
+                return_amqp_type, return_test_value_list = receive_obj
+                self.assertEqual(return_amqp_type, jms_message_type,
+                                 msg='JMS message type error:\n\n    sent:%s\n\n    received:%s'
% \
+                                 (jms_message_type, return_amqp_type))
+                self.assertEqual(return_test_value_list, test_values,
+                                 msg='JMS message body error:\n\n    sent:%s\n\n    received:%s'
% \
+                                 (test_values, return_test_value_list))
             else:
-                self.fail('Received non-tuple: %s' % str(receive_obj))
+                raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
+        else:
+            raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
 
 
 class TestOptions(qpid_interop_test.qit_common.QitCommonTestOptions):

http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f93886f5/src/python/qpid_interop_test/qit_common.py
----------------------------------------------------------------------
diff --git a/src/python/qpid_interop_test/qit_common.py b/src/python/qpid_interop_test/qit_common.py
index be0d15b..43d2b58 100644
--- a/src/python/qpid_interop_test/qit_common.py
+++ b/src/python/qpid_interop_test/qit_common.py
@@ -207,6 +207,10 @@ class QitTestCase(unittest.TestCase):
         super(QitTestCase, self).__init__(methodName)
         self.duration = 0
 
+    def name(self):
+        """Return test name"""
+        return self._testMethodName
+
     def setUp(self):
         """Called when test starts"""
         self.start_time = time()
@@ -215,9 +219,13 @@ class QitTestCase(unittest.TestCase):
         """Called when test finishes"""
         self.duration = time() - self.start_time
 
-    def name(self):
-        """Return test name"""
-        return self._testMethodName
+
+#class QitTestRunner(unittest.TextTestRunner):
+#    """..."""
+#    def run(self, test):
+#        result = self._makeResult()
+#        unittest.registerResult(result)
+#        test(result)
 
 
 class QitTest(object):
@@ -236,6 +244,7 @@ class QitTest(object):
         self._generate_tests()
         self.test_result = None
         self.duration = 0
+#        unittest.installHandler()
 
     def get_result(self):
         """Get success of test run, True = success, False = failure/error"""
@@ -246,6 +255,7 @@ class QitTest(object):
     def run_test(self):
         """Run the test"""
         start_time = time()
+#        self.test_result = QitTestRunner(verbosity=2).run(self.test_suite)
         self.test_result = unittest.TextTestRunner(verbosity=2).run(self.test_suite)
         self.duration = time() - start_time
 

http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f93886f5/src/python/qpid_interop_test/shims.py
----------------------------------------------------------------------
diff --git a/src/python/qpid_interop_test/shims.py b/src/python/qpid_interop_test/shims.py
index 03b8dc0..e5977ee 100644
--- a/src/python/qpid_interop_test/shims.py
+++ b/src/python/qpid_interop_test/shims.py
@@ -20,154 +20,52 @@ Module containing worker thread classes and shims
 # under the License.
 #
 
-from copy import deepcopy
-from json import loads
-from os import environ, getenv, getpgid, killpg, path, setsid
-from signal import SIGKILL, SIGTERM
-from subprocess import Popen, PIPE, CalledProcessError
-from sys import stdout
-from threading import Thread
-from time import sleep
+import copy
+import json
+import os
+import signal
+import subprocess
 
 
-THREAD_TIMEOUT = 800.0 # seconds to complete before join is forced
-
-
-class ShimWorkerThread(Thread):
-    """Parent class for shim worker threads and return a string once the thread has ended"""
-    def __init__(self, thread_name):
-        super(ShimWorkerThread, self).__init__(name=thread_name)
-        self.arg_list = []
-        self.return_obj = None
-        self.proc = None
-
-    def get_return_object(self):
-        """Get the return object from the completed thread"""
-        return self.return_obj
-
-    def join_or_kill(self, timeout):
-        """
-        Wait for thread to join after timeout (seconds). If still alive, it is then terminated,
then if still alive,
-        killed
-        """
-        self.join(timeout)
-        self.kill()
-
-    def kill(self):
-        """
-        First try terminating, then killing this thread
-        """
-        if self.is_alive():
-            if self.proc is not None:
-                if self._terminate_pg_loop():
-                    if self._kill_pg_loop():
-                        print('\n  ERROR: Thread %s (pid=%d) alive after kill' % (self.name,
self.proc.pid))
-                    else:
-                        print('Killed')
-                        stdout.flush()
-                else:
-                    print('Terminated')
-                    stdout.flush()
-            else:
-                print('ERROR: shims.join_or_kill(): Process joined and is alive, yet proc
is None.')
-
-    def _terminate_pg_loop(self, num_attempts=2, wait_time=2):
-        cnt = 0
-        while cnt < num_attempts and self.is_alive():
-            cnt += 1
-            print('\n  Thread %s (pid=%d) alive after timeout, terminating (try #%d)...'
% (self.name, self.proc.pid,
-                                                                                        
   cnt),)
-            stdout.flush()
-            killpg(getpgid(self.proc.pid), SIGTERM)
-            sleep(wait_time)
-        return self.is_alive()
-
-    def _kill_pg_loop(self, num_attempts=2, wait_time=5):
-        cnt = 0
-        while cnt < num_attempts and self.is_alive():
-            cnt += 1
-            print('\n  Thread %s (pid=%d) alive after terminate, killing (try #%d)...' %
(self.name, self.proc.pid,
-                                                                                        
 cnt),)
-            stdout.flush()
-            killpg(getpgid(self.proc.pid), SIGKILL)
-            sleep(wait_time)
-        return self.is_alive()
-
-
-class Sender(ShimWorkerThread):
-    """Sender class for multi-threaded send"""
-    def __init__(self, use_shell_flag, send_shim_args, broker_addr, queue_name, test_key,
json_test_str, python3_flag):
-        super(Sender, self).__init__('sender_thread_%s' % queue_name)
-        if send_shim_args is None:
-            print('ERROR: Sender: send_shim_args == None')
-        self.use_shell_flag = use_shell_flag
-        self.arg_list.extend(send_shim_args)
-        self.arg_list.extend([broker_addr, queue_name, test_key, json_test_str])
-        self.env = deepcopy(environ)
-        if python3_flag:
-            self.env['PYTHONPATH'] = self.env['PYTHON3PATH']
-
-    def run(self):
-        """Thread starts here"""
-        try:
-            #print str('\n>>SNDR>>' + str(self.arg_list)) # DEBUG - useful to
see command-line sent to shim
-            self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE, shell=self.use_shell_flag,
preexec_fn=setsid,
-                              env=self.env)
-            (stdoutdata, stderrdata) = self.proc.communicate()
-            if stderrdata: # length > 0
-                #print '<<SNDR ERROR<<', stderrdata # DEBUG - useful to see shim's
failure message
-                self.return_obj = (stdoutdata, stderrdata)
-            else:
-                #print '<<SNDR<<', stdoutdata # DEBUG - useful to see text received
from shim
-                str_tvl = stdoutdata.split('\n')[0:-1] # remove trailing \n
-                if len(str_tvl) == 2:
-                    try:
-                        self.return_obj = (str_tvl[0], loads(str_tvl[1]))
-                    except ValueError:
-                        self.return_obj = stdoutdata
-                else: # Make a single line of all the bits and return that
-                    self.return_obj = stdoutdata
-        except OSError as exc:
-            self.return_obj = str(exc) + ': shim=' + self.arg_list[0]
-        except CalledProcessError as exc:
-            self.return_obj = str(exc) + '\n\nOutput:\n' + exc.output
-
-
-class Receiver(ShimWorkerThread):
-    """Receiver class for multi-threaded receive"""
-    def __init__(self, receive_shim_args, broker_addr, queue_name, test_key, json_test_str,
python3_flag):
-        super(Receiver, self).__init__('receiver_thread_%s' % queue_name)
-        if receive_shim_args is None:
-            print('ERROR: Receiver: receive_shim_args == None')
-        self.arg_list.extend(receive_shim_args)
-        self.arg_list.extend([broker_addr, queue_name, test_key, json_test_str])
-        self.env = deepcopy(environ)
+class ShimProcess(subprocess.Popen):
+    """Abstract parent class for Sender and Receiver shim process"""
+    def __init__(self, args, python3_flag):
+        self.env = copy.deepcopy(os.environ)
         if python3_flag:
             self.env['PYTHONPATH'] = self.env['PYTHON3PATH']
+        super(ShimProcess, self).__init__(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=os.setsid,
+                                          env=self.env)
 
-    def run(self):
-        """Thread starts here"""
+    def wait_for_completion(self):
+        """Wait for process to end and return tuple containing (stdout, stderr) from process"""
         try:
-            #print(str('\n>>RCVR>>' + str(self.arg_list))) # DEBUG - useful to
see command-line sent to shim
-            self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE, preexec_fn=setsid,
env=self.env)
-            (stdoutdata, stderrdata) = self.proc.communicate()
+            (stdoutdata, stderrdata) = self.communicate()
             if stderrdata: # length > 0
-                #print('<<RCVR ERROR<<', stderrdata) # DEBUG - useful to see
shim's failure message
-                self.return_obj = (stdoutdata, stderrdata)
-            else:
-                #print('<<RCVR<<', stdoutdata) # DEBUG - useful to see text received
from shim
-                str_tvl = stdoutdata.split('\n')[0:-1] # remove trailing \n
-                if len(str_tvl) == 2:
-                    try:
-                        self.return_obj = (str_tvl[0], loads(str_tvl[1]))
-                    except ValueError:
-                        self.return_obj = stdoutdata
-                else: # Make a single line of all the bits and return that
-                    self.return_obj = stdoutdata
-        except OSError as exc:
-            self.return_obj = str(exc) + ': shim=' + self.arg_list[0]
-        except CalledProcessError as exc:
-            self.return_obj = str(exc) + '\n\n' + exc.output
+                return stderrdata # ERROR: return single string
+            if not stdoutdata: # zero length
+                return None
+            type_value_list = stdoutdata.split('\n')[0:-1] # remove trailing '\n', split
by only remaining '\n'
+            if len(type_value_list) == 2:
+                try:
+                    return (type_value_list[0], json.loads(type_value_list[1])) # Return
tuple
+                except ValueError:
+                    return stdoutdata # ERROR: return single string
+            return stdoutdata # ERROR: return single string
+        except KeyboardInterrupt as err:
+            self.send_signal(signal.SIGINT)
+            raise err
+
+class Sender(ShimProcess):
+    """Sender shim process"""
+    def __init__(self, params, python3_flag):
+        #print('\n>>>SNDR>>> %s python3_flag=%s' % (params, python3_flag))
+        super(Sender, self).__init__(params, python3_flag)
+
+class Receiver(ShimProcess):
+    """Receiver shim process"""
+    def __init__(self, params, python3_flag):
+        #print('\n>>>RCVR>>> %s python3_flag=%s' % (params, python3_flag))
+        super(Receiver, self).__init__(params, python3_flag)
 
 class Shim(object):
     """Abstract shim class, parent of all shims."""
@@ -182,17 +80,17 @@ class Shim(object):
 
     def create_sender(self, broker_addr, queue_name, test_key, json_test_str):
         """Create a new sender instance"""
-        sender = Sender(self.use_shell_flag, self.send_params, broker_addr, queue_name, test_key,
json_test_str,
-                        'Python3' in self.NAME)
-        sender.daemon = True
-        return sender
+        args = []
+        args.extend(self.send_params)
+        args.extend([broker_addr, queue_name, test_key, json_test_str])
+        return Sender(args, 'Python3' in self.NAME)
 
     def create_receiver(self, broker_addr, queue_name, test_key, json_test_str):
         """Create a new receiver instance"""
-        receiver = Receiver(self.receive_params, broker_addr, queue_name, test_key, json_test_str,
-                            'Python3' in self.NAME)
-        receiver.daemon = True
-        return receiver
+        args = []
+        args.extend(self.receive_params)
+        args.extend([broker_addr, queue_name, test_key, json_test_str])
+        return Receiver(args, 'Python3' in self.NAME)
 
 
 class ProtonPython2Shim(Shim):
@@ -236,8 +134,8 @@ class QpidJmsShim(Shim):
     NAME = 'QpidJms'
     JMS_CLIENT = True
 
-    JAVA_HOME = getenv('JAVA_HOME', '/usr/bin') # Default only works in Linux
-    JAVA_EXEC = path.join(JAVA_HOME, 'java')
+    JAVA_HOME = os.getenv('JAVA_HOME', '/usr/bin') # Default only works in Linux
+    JAVA_EXEC = os.path.join(JAVA_HOME, 'java')
 
     def __init__(self, dependency_class_path, sender_shim, receiver_shim):
         super(QpidJmsShim, self).__init__(sender_shim, receiver_shim)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message