qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1705310 - in /qpid/trunk/qpid/tools/src/py: qlslibs/analyze.py qlslibs/jrnl.py qlslibs/utils.py qpid-qls-analyze
Date Fri, 25 Sep 2015 14:16:41 GMT
Author: kpvdr
Date: Fri Sep 25 14:16:41 2015
New Revision: 1705310

URL: http://svn.apache.org/viewvc?rev=1705310&view=rev
Log:
QPID-6765 [linearstore] Add qpid-txtest mode to qpid-qls-analyze which extracts message number
from message body

Modified:
    qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py
    qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py
    qpid/trunk/qpid/tools/src/py/qlslibs/utils.py
    qpid/trunk/qpid/tools/src/py/qpid-qls-analyze

Modified: qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py?rev=1705310&r1=1705309&r2=1705310&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py Fri Sep 25 14:16:41 2015
@@ -52,12 +52,12 @@ class JournalRecoveryManager(object):
         self.journals = {}
         self.high_rid_counter = HighCounter()
         self.prepared_list = None
-    def report(self, print_stats_flag):
+    def report(self):
         self._reconcile_transactions(self.prepared_list, self.args.txn)
         if self.tpl is not None:
-            self.tpl.report(print_stats_flag, self.args.show_recovered_recs)
+            self.tpl.report(self.args)
         for queue_name in sorted(self.journals.keys()):
-            self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs)
+            self.journals[queue_name].report(self.args)
     def run(self):
         tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
         if os.path.exists(tpl_dir):
@@ -78,7 +78,7 @@ class JournalRecoveryManager(object):
     def _reconcile_transactions(self, prepared_list, txn_flag):
         print 'Transaction reconciliation report:'
         print '=================================='
-        print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+        print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list)
         for xid in prepared_list.keys():
             commit_flag = prepared_list[xid]
             if commit_flag is None:
@@ -97,18 +97,22 @@ class JournalRecoveryManager(object):
                                                              enqueue_record.record_id, xid,
None)
                 if txn_flag:
                     self.tpl.add_record(dequeue_record)
+        print
+        print 'Open transactions found in queues:'
+        print '----------------------------------'
         for queue_name in sorted(self.journals.keys()):
             self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
-        if len(prepared_list) > 0:
-            print 'Completing prepared transactions in prepared transaction list:'
-        for xid in prepared_list.keys():
-            print ' ', qlslibs.utils.format_xid(xid)
-            transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT,
0, \
-                                                             self.tpl.current_journal_file,
\
-                                                             self.high_rid_counter.get_next(),
None, xid, None)
-            if txn_flag:
-                self.tpl.add_record(transaction_record)
         print
+        if len(prepared_list) > 0:
+            print 'Creating commit records for the following prepared transactions in TPL:'
+            for xid in prepared_list.keys():
+                print ' ', qlslibs.utils.format_xid(xid)
+                transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT,
0, \
+                                                                 self.tpl.current_journal_file,
\
+                                                                 self.high_rid_counter.get_next(),
None, xid, None)
+                if txn_flag:
+                    self.tpl.add_record(transaction_record)
+            print
 
 class EnqueueMap(object):
     """
@@ -139,22 +143,21 @@ class EnqueueMap(object):
         if dequeue_record.dequeue_record_id not in self.enq_map:
             raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
         self.enq_map[dequeue_record.dequeue_record_id][2] = True
-    def report_str(self, _, show_records):
+    def report_str(self, args):
         """Return a string containing a text report for all records in the map"""
         if len(self.enq_map) == 0:
             return 'No enqueued records found.'
         rstr = '%d enqueued records found' % len(self.enq_map)
-        if show_records:
+        if args.show_recovered_recs:
             rstr += ":"
             rid_list = self.enq_map.keys()
             rid_list.sort()
             for rid in rid_list:
                 journal_file, record, locked_flag = self.enq_map[rid]
+                rstr += '\n  0x%x:' % journal_file.file_header.file_num
+                rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
                 if locked_flag:
-                    lock_str = '[LOCKED]'
-                else:
-                    lock_str = ''
-                rstr += '\n  0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+                    rstr += ' [LOCKED]'
         else:
             rstr += '.'
         return rstr
@@ -248,17 +251,18 @@ class TransactionMap(object):
         return prepared_list
     def get_xid_list(self):
         return self.txn_map.keys()
-    def report_str(self, _, show_records):
+    def report_str(self, args):
         """Return a string containing a text report for all records in the map"""
         if len(self.txn_map) == 0:
             return 'No outstanding transactions found.'
         rstr = '%d outstanding transaction(s)' % len(self.txn_map)
-        if show_records:
+        if args.show_recovered_recs:
             rstr += ':'
             for xid, op_list in self.txn_map.iteritems():
                 rstr += '\n  %s containing %d operations:' % (qlslibs.utils.format_xid(xid),
len(op_list))
                 for journal_file, record, _ in op_list:
-                    rstr += '\n    0x%x:%s' % (journal_file.file_header.file_num, record)
+                    rstr += '\n    0x%x:' % journal_file.file_header.file_num
+                    rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
         else:
             rstr += '.'
         return rstr
@@ -334,6 +338,7 @@ class Journal(object):
         self.num_filler_records_required = None # TODO: Move into JournalFile
         self.fill_to_offset = None
     def add_record(self, record):
+        """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()"""
         if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord):
             if record.xid_size > 0:
                 self.txn_map.add(self.current_journal_file, record)
@@ -385,16 +390,16 @@ class Journal(object):
                     if txn_flag:
                         self.txn_map.abort(xid)
             else:
-                print '  ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction
list'
+                print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction
list'
                 if txn_flag:
                     self.txn_map.abort(xid)
-    def report(self, print_stats_flag, show_recovered_records):
+    def report(self, args):
         print 'Journal "%s":' % self.queue_name
         print '=' * (11 + len(self.queue_name))
-        if print_stats_flag:
+        if args.stats:
             print str(self.statistics)
-        print self.enq_map.report_str(True, show_recovered_records)
-        print self.txn_map.report_str(True, show_recovered_records)
+        print self.enq_map.report_str(args)
+        print self.txn_map.report_str(args)
         JournalFile.report_header()
         for file_num in sorted(self.files.keys()):
             self.files[file_num].report()
@@ -485,17 +490,17 @@ class Journal(object):
             high_rid_counter.check(this_record.record_id)
             if self.args.show_recovery_recs or self.args.show_all_recs:
                 print '0x%x:%s' % (start_journal_file.file_header.file_num, \
-                                   this_record.to_string(self.args.show_xids, self.args.show_data))
+                                   this_record.to_string(self.args.show_xids, self.args.show_data,
self.args.txtest))
         elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
             ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
             high_rid_counter.check(this_record.record_id)
             if self.args.show_recovery_recs or self.args.show_all_recs:
-                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids,
None, None))
         elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
             ok_flag = self._handle_transaction_record(this_record, start_journal_file)
             high_rid_counter.check(this_record.record_id)
             if self.args.show_recovery_recs or self.args.show_all_recs:
-                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids,
None, None))
         else:
             self.statistics.filler_record_count += 1
             ok_flag = True
@@ -555,10 +560,12 @@ class Journal(object):
                 return False
         if not transaction_record.is_valid(start_journal_file):
             return False
-        if transaction_record.magic[-1] == 'a':
+        if transaction_record.magic[-1] == 'a': # Abort
             self.statistics.transaction_abort_count += 1
-        else:
+        elif transaction_record.magic[-1] == 'c': # Commit
             self.statistics.transaction_commit_count += 1
+        else:
+            raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic)
         if self.txn_map.contains(transaction_record.xid):
             self.txn_map.delete(self.current_journal_file, transaction_record)
         else:

Modified: qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py?rev=1705310&r1=1705309&r2=1705310&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py Fri Sep 25 14:16:41 2015
@@ -78,9 +78,6 @@ class RecordHeader(object):
         for warn in self.warnings:
             warn_str += '<%s>' % warn
         return warn_str
-    def __str__(self):
-        """Return string representation of this header"""
-        return self.to_rh_string()
 
 class RecordTail(object):
     FORMAT = '<4sL2Q'
@@ -119,9 +116,6 @@ class RecordTail(object):
         magic = qlslibs.utils.inv_str(self.xmagic)
         magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
         return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
-    def __str__(self):
-        """Return a string representation of the this RecordTail instance"""
-        return self.to_string()
 
 class FileHeader(RecordHeader):
     FORMAT = '<2H4x5QH'
@@ -185,9 +179,6 @@ class FileHeader(RecordHeader):
                                                              self.first_record_offset, self.partition_num,
                                                              self.efp_data_size_kb, self.timestamp_str(),
                                                              self._get_warnings())
-    def __str__(self):
-        """Return a string representation of the this FileHeader instance"""
-        return self.to_string()
 
 class EnqueueRecord(RecordHeader):
     FORMAT = '<2Q'
@@ -249,7 +240,7 @@ class EnqueueRecord(RecordHeader):
             else:
                 return True
         return False
-    def to_string(self, show_xid_flag, show_data_flag):
+    def to_string(self, show_xid_flag, show_data_flag, txtest_flag):
         """Return a string representation of the this EnqueueRecord instance"""
         if self.truncated_flag:
             return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
@@ -260,7 +251,7 @@ class EnqueueRecord(RecordHeader):
             record_tail_str = self.record_tail.to_string()
         return '%s %s %s %s %s %s' % (self.to_rh_string(),
                                       qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
-                                      qlslibs.utils.format_data(self.data, self.data_size,
show_data_flag),
+                                      qlslibs.utils.format_data(self.data, self.data_size,
show_data_flag, txtest_flag),
                                       record_tail_str, self._print_flags(), self._get_warnings())
     def _print_flags(self):
         """Utility function to decode the flags field in the header and print a string representation"""
@@ -275,9 +266,6 @@ class EnqueueRecord(RecordHeader):
         if len(fstr) > 0:
             fstr += ']'
         return fstr
-    def __str__(self):
-        """Return a string representation of the this EnqueueRecord instance"""
-        return self.to_string(False, False)
 
 class DequeueRecord(RecordHeader):
     FORMAT = '<2Q'
@@ -323,7 +311,7 @@ class DequeueRecord(RecordHeader):
             else:
                 return True
         return False
-    def to_string(self, show_xid_flag):
+    def to_string(self, show_xid_flag, _u1, _u2):
         """Return a string representation of the this DequeueRecord instance"""
         if self.truncated_flag:
             return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
@@ -344,9 +332,6 @@ class DequeueRecord(RecordHeader):
             else:
                 return '[ABORT]'
         return ''
-    def __str__(self):
-        """Return a string representation of the this DequeueRecord instance"""
-        return self.to_string(False)
 
 class TransactionRecord(RecordHeader):
     FORMAT = '<Q'
@@ -384,7 +369,7 @@ class TransactionRecord(RecordHeader):
             else:
                 return True
         return False
-    def to_string(self, show_xid_flag):
+    def to_string(self, show_xid_flag, _u1, _u2):
         """Return a string representation of the this TransactionRecord instance"""
         if self.truncated_flag:
             return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
self.xid_size)
@@ -395,9 +380,6 @@ class TransactionRecord(RecordHeader):
         return '%s %s %s %s' % (self.to_rh_string(),
                                 qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
                                 record_tail_str, self._get_warnings())
-    def __str__(self):
-        """Return a string representation of the this TransactionRecord instance"""
-        return self.to_string(False)
 
 # =============================================================================
 

Modified: qpid/trunk/qpid/tools/src/py/qlslibs/utils.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/utils.py?rev=1705310&r1=1705309&r2=1705310&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/utils.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/utils.py Fri Sep 25 14:16:41 2015
@@ -73,13 +73,13 @@ def efp_directory_size(directory_name):
         pass
     return 0
 
-def format_data(data, data_size=None, show_data_flag=True):
+def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False):
     """Format binary data for printing"""
-    return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError,
False)
+    return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError,
False, txtest_flag)
 
 def format_xid(xid, xid_size=None, show_xid_flag=True):
     """Format binary XID for printing"""
-    return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError,
True)
+    return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError,
True, False)
 
 def get_avail_disk_space(path):
     df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
@@ -133,7 +133,7 @@ def skip(file_handle, boundary):
 
 #--- protected functions ---
 
-def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag):
+def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag):
     """Format binary XID for printing"""
     if bin_str is None and bin_size is not None:
         if bin_size > 0:
@@ -144,14 +144,16 @@ def _format_binary(bin_str, bin_size, sh
     elif bin_size != len(bin_str):
         raise err_class(bin_size, len(bin_str), bin_str)
     out_str = '%s(%d)' % (prefix, bin_size)
-    if show_bin_flag:
+    if txtest_flag:
+        out_str += '=\'%s\'' % _txtest_msg_str(bin_str)
+    elif show_bin_flag:
         if _is_printable(bin_str):
             binstr = '"%s"' % _split_str(bin_str)
         elif hex_num_flag:
             binstr = '0x%s' % _str_to_hex_num(bin_str)
         else:
-            binstr = _hex_split_str(bin_str)
-        out_str += '=%s' % binstr
+            binstr = _hex_split_str(bin_str, 50, 10, 10)
+        out_str += '=\'%s\'' % binstr
     return out_str
 
 def _hex_str(in_str, begin, end):
@@ -164,12 +166,20 @@ def _hex_str(in_str, begin, end):
             hstr += '\\%02x' % ord(in_str[index])
     return hstr
 
-def _hex_split_str(in_str, split_size = 50):
+def _hex_split_str(in_str, split_size, head_size, tail_size):
     """Split a hex string into two parts separated by an ellipsis"""
     if len(in_str) <= split_size:
         return _hex_str(in_str, 0, len(in_str))
-    return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str))
-    #return ''.join(x.encode('hex') for x in reversed(in_str))
+    return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size,
len(in_str))
+
+def _txtest_msg_str(bin_str):
+    """Extract the message number used in qpid-txtest"""
+    msg_index = bin_str.find('msg')
+    if msg_index >= 0:
+        end_index = bin_str.find('\x00', msg_index)
+        assert end_index >= 0
+        return bin_str[msg_index:end_index]
+    return None
 
 def _is_printable(in_str):
     """Return True if in_str in printable; False otherwise."""

Modified: qpid/trunk/qpid/tools/src/py/qpid-qls-analyze
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-qls-analyze?rev=1705310&r1=1705309&r2=1705310&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-qls-analyze (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-qls-analyze Fri Sep 25 14:16:41 2015
@@ -52,11 +52,17 @@ class QlsAnalyzerArgParser(argparse.Argu
         self.add_argument('--show-all-recs', action='store_true',
                           help='Show all records (including fillers) found during recovery')
         self.add_argument('--show-xids', action='store_true',
-                          help='Show xid as hex number, otherwise show only xid length')
+                          help='Show xid as hex number, otherwise show only xid length. Only
has effect when records are shown')
+# TODO: Add ability to show xid as an index rather than a value, helps analysis when xid
is a long value with 
+# small differences which cannot easily be seen when looking at an output. Also prints a
table of indeces vs xid values.
+#        self.add_argument('--show-xid-index', action='store_true',
+#                          help='Show xids by index rather than by their value. Useful for
long xids. Prints xid index table')
         self.add_argument('--show-data', action='store_true',
-                          help='Show data, otherwise show only data length')
+                          help='Show data, otherwise show only data length. Only has effect
when records are shown')
         self.add_argument('--stats', action='store_true',
                           help='Print journal record stats')
+        self.add_argument('--txtest', action='store_true',
+                          help='Show qpid-txtest message number as the message content when
viewing records. Only has effect when records are shown')
         self.add_argument('--txn', action='store_true',
                           help='Reconcile incomplete transactions')
         self.add_argument('--version', action='version',
@@ -91,7 +97,7 @@ class QqpdLinearStoreAnalyzer(object):
         """ Create a report on the linear store previously analyzed using analyze() """
         if self.args.efp:
             self.efp_manager.report()
-        self.jrnl_recovery_mgr.report(self.args.stats)
+        self.jrnl_recovery_mgr.report()
     def run(self):
         """ Run the analyzer, which reads and analyzes the linear store """
         if self.args.efp:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message