Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EEEB18DF8 for ; Fri, 25 Sep 2015 14:16:57 +0000 (UTC) Received: (qmail 6439 invoked by uid 500); 25 Sep 2015 14:16:57 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 6409 invoked by uid 500); 25 Sep 2015 14:16:57 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 6400 invoked by uid 99); 25 Sep 2015 14:16:57 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2015 14:16:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CD357C0C0C for ; Fri, 25 Sep 2015 14:16:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.42 X-Spam-Level: *** X-Spam-Status: No, score=3.42 tagged_above=-999 required=6.31 tests=[HK_RANDOM_ENVFROM=0.626, HK_RANDOM_FROM=0.999, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mFI2CwUcEyhi for ; Fri, 25 Sep 2015 14:16:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTP id 39F43202AB for ; Fri, 25 Sep 2015 14:16:43 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 18F5EE00B0 for ; Fri, 25 Sep 2015 14:16:42 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id E93CE3A0233 for ; Fri, 25 Sep 2015 14:16:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1705310 - in /qpid/trunk/qpid/tools/src/py: qlslibs/analyze.py qlslibs/jrnl.py qlslibs/utils.py qpid-qls-analyze Date: Fri, 25 Sep 2015 14:16:41 -0000 To: commits@qpid.apache.org From: kpvdr@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150925141641.E93CE3A0233@svn01-us-west.apache.org> Author: kpvdr Date: Fri Sep 25 14:16:41 2015 New Revision: 1705310 URL: http://svn.apache.org/viewvc?rev=1705310&view=rev Log: QPID-6765 [linearstore] Add qpid-txtest mode to qpid-qls-analyze which extracts message number from message body Modified: qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py qpid/trunk/qpid/tools/src/py/qlslibs/utils.py qpid/trunk/qpid/tools/src/py/qpid-qls-analyze Modified: qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py?rev=1705310&r1=1705309&r2=1705310&view=diff ============================================================================== --- qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py (original) +++ qpid/trunk/qpid/tools/src/py/qlslibs/analyze.py Fri Sep 25 14:16:41 2015 @@ -52,12 +52,12 @@ class JournalRecoveryManager(object): self.journals = {} self.high_rid_counter = HighCounter() self.prepared_list = None - def report(self, print_stats_flag): + def report(self): self._reconcile_transactions(self.prepared_list, self.args.txn) if self.tpl is not None: - self.tpl.report(print_stats_flag, self.args.show_recovered_recs) + self.tpl.report(self.args) for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs) + self.journals[queue_name].report(self.args) def run(self): tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) if os.path.exists(tpl_dir): @@ -78,7 +78,7 @@ class JournalRecoveryManager(object): def _reconcile_transactions(self, prepared_list, txn_flag): print 'Transaction reconciliation report:' print '==================================' - print len(prepared_list), 'open transaction(s) found in prepared transaction list:' + print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list) for xid in prepared_list.keys(): commit_flag = prepared_list[xid] if commit_flag is None: @@ -97,18 +97,22 @@ class JournalRecoveryManager(object): enqueue_record.record_id, xid, None) if txn_flag: self.tpl.add_record(dequeue_record) + print + print 'Open transactions found in queues:' + print '----------------------------------' for queue_name in sorted(self.journals.keys()): self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - if len(prepared_list) > 0: - print 'Completing prepared transactions in prepared transaction list:' - for xid in prepared_list.keys(): - print ' ', qlslibs.utils.format_xid(xid) - transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) print + if len(prepared_list) > 0: + print 'Creating commit records for the following prepared transactions in TPL:' + for xid in prepared_list.keys(): + print ' ', qlslibs.utils.format_xid(xid) + transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), None, xid, None) + if txn_flag: + self.tpl.add_record(transaction_record) + print class EnqueueMap(object): """ @@ -139,22 +143,21 @@ class EnqueueMap(object): if dequeue_record.dequeue_record_id not in self.enq_map: raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, _, show_records): + def report_str(self, args): """Return a string containing a text report for all records in the map""" if len(self.enq_map) == 0: return 'No enqueued records found.' rstr = '%d enqueued records found' % len(self.enq_map) - if show_records: + if args.show_recovered_recs: rstr += ":" rid_list = self.enq_map.keys() rid_list.sort() for rid in rid_list: journal_file, record, locked_flag = self.enq_map[rid] + rstr += '\n 0x%x:' % journal_file.file_header.file_num + rstr += record.to_string(args.show_xids, args.show_data, args.txtest) if locked_flag: - lock_str = '[LOCKED]' - else: - lock_str = '' - rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) + rstr += ' [LOCKED]' else: rstr += '.' return rstr @@ -248,17 +251,18 @@ class TransactionMap(object): return prepared_list def get_xid_list(self): return self.txn_map.keys() - def report_str(self, _, show_records): + def report_str(self, args): """Return a string containing a text report for all records in the map""" if len(self.txn_map) == 0: return 'No outstanding transactions found.' rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if show_records: + if args.show_recovered_recs: rstr += ':' for xid, op_list in self.txn_map.iteritems(): rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) for journal_file, record, _ in op_list: - rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) + rstr += '\n 0x%x:' % journal_file.file_header.file_num + rstr += record.to_string(args.show_xids, args.show_data, args.txtest) else: rstr += '.' return rstr @@ -334,6 +338,7 @@ class Journal(object): self.num_filler_records_required = None # TODO: Move into JournalFile self.fill_to_offset = None def add_record(self, record): + """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()""" if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): if record.xid_size > 0: self.txn_map.add(self.current_journal_file, record) @@ -385,16 +390,16 @@ class Journal(object): if txn_flag: self.txn_map.abort(xid) else: - print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' + print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) - def report(self, print_stats_flag, show_recovered_records): + def report(self, args): print 'Journal "%s":' % self.queue_name print '=' * (11 + len(self.queue_name)) - if print_stats_flag: + if args.stats: print str(self.statistics) - print self.enq_map.report_str(True, show_recovered_records) - print self.txn_map.report_str(True, show_recovered_records) + print self.enq_map.report_str(args) + print self.txn_map.report_str(args) JournalFile.report_header() for file_num in sorted(self.files.keys()): self.files[file_num].report() @@ -485,17 +490,17 @@ class Journal(object): high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, \ - this_record.to_string(self.args.show_xids, self.args.show_data)) + this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest)) elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): ok_flag = self._handle_dequeue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): ok_flag = self._handle_transaction_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) else: self.statistics.filler_record_count += 1 ok_flag = True @@ -555,10 +560,12 @@ class Journal(object): return False if not transaction_record.is_valid(start_journal_file): return False - if transaction_record.magic[-1] == 'a': + if transaction_record.magic[-1] == 'a': # Abort self.statistics.transaction_abort_count += 1 - else: + elif transaction_record.magic[-1] == 'c': # Commit self.statistics.transaction_commit_count += 1 + else: + raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic) if self.txn_map.contains(transaction_record.xid): self.txn_map.delete(self.current_journal_file, transaction_record) else: Modified: qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py?rev=1705310&r1=1705309&r2=1705310&view=diff ============================================================================== --- qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py (original) +++ qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py Fri Sep 25 14:16:41 2015 @@ -78,9 +78,6 @@ class RecordHeader(object): for warn in self.warnings: warn_str += '<%s>' % warn return warn_str - def __str__(self): - """Return string representation of this header""" - return self.to_rh_string() class RecordTail(object): FORMAT = '<4sL2Q' @@ -119,9 +116,6 @@ class RecordTail(object): magic = qlslibs.utils.inv_str(self.xmagic) magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) - def __str__(self): - """Return a string representation of the this RecordTail instance""" - return self.to_string() class FileHeader(RecordHeader): FORMAT = '<2H4x5QH' @@ -185,9 +179,6 @@ class FileHeader(RecordHeader): self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) - def __str__(self): - """Return a string representation of the this FileHeader instance""" - return self.to_string() class EnqueueRecord(RecordHeader): FORMAT = '<2Q' @@ -249,7 +240,7 @@ class EnqueueRecord(RecordHeader): else: return True return False - def to_string(self, show_xid_flag, show_data_flag): + def to_string(self, show_xid_flag, show_data_flag, txtest_flag): """Return a string representation of the this EnqueueRecord instance""" if self.truncated_flag: return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), @@ -260,7 +251,7 @@ class EnqueueRecord(RecordHeader): record_tail_str = self.record_tail.to_string() return '%s %s %s %s %s %s' % (self.to_rh_string(), qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - qlslibs.utils.format_data(self.data, self.data_size, show_data_flag), + qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag), record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" @@ -275,9 +266,6 @@ class EnqueueRecord(RecordHeader): if len(fstr) > 0: fstr += ']' return fstr - def __str__(self): - """Return a string representation of the this EnqueueRecord instance""" - return self.to_string(False, False) class DequeueRecord(RecordHeader): FORMAT = '<2Q' @@ -323,7 +311,7 @@ class DequeueRecord(RecordHeader): else: return True return False - def to_string(self, show_xid_flag): + def to_string(self, show_xid_flag, _u1, _u2): """Return a string representation of the this DequeueRecord instance""" if self.truncated_flag: return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), @@ -344,9 +332,6 @@ class DequeueRecord(RecordHeader): else: return '[ABORT]' return '' - def __str__(self): - """Return a string representation of the this DequeueRecord instance""" - return self.to_string(False) class TransactionRecord(RecordHeader): FORMAT = ' 0: @@ -144,14 +144,16 @@ def _format_binary(bin_str, bin_size, sh elif bin_size != len(bin_str): raise err_class(bin_size, len(bin_str), bin_str) out_str = '%s(%d)' % (prefix, bin_size) - if show_bin_flag: + if txtest_flag: + out_str += '=\'%s\'' % _txtest_msg_str(bin_str) + elif show_bin_flag: if _is_printable(bin_str): binstr = '"%s"' % _split_str(bin_str) elif hex_num_flag: binstr = '0x%s' % _str_to_hex_num(bin_str) else: - binstr = _hex_split_str(bin_str) - out_str += '=%s' % binstr + binstr = _hex_split_str(bin_str, 50, 10, 10) + out_str += '=\'%s\'' % binstr return out_str def _hex_str(in_str, begin, end): @@ -164,12 +166,20 @@ def _hex_str(in_str, begin, end): hstr += '\\%02x' % ord(in_str[index]) return hstr -def _hex_split_str(in_str, split_size = 50): +def _hex_split_str(in_str, split_size, head_size, tail_size): """Split a hex string into two parts separated by an ellipsis""" if len(in_str) <= split_size: return _hex_str(in_str, 0, len(in_str)) - return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str)) - #return ''.join(x.encode('hex') for x in reversed(in_str)) + return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str)) + +def _txtest_msg_str(bin_str): + """Extract the message number used in qpid-txtest""" + msg_index = bin_str.find('msg') + if msg_index >= 0: + end_index = bin_str.find('\x00', msg_index) + assert end_index >= 0 + return bin_str[msg_index:end_index] + return None def _is_printable(in_str): """Return True if in_str in printable; False otherwise.""" Modified: qpid/trunk/qpid/tools/src/py/qpid-qls-analyze URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-qls-analyze?rev=1705310&r1=1705309&r2=1705310&view=diff ============================================================================== --- qpid/trunk/qpid/tools/src/py/qpid-qls-analyze (original) +++ qpid/trunk/qpid/tools/src/py/qpid-qls-analyze Fri Sep 25 14:16:41 2015 @@ -52,11 +52,17 @@ class QlsAnalyzerArgParser(argparse.Argu self.add_argument('--show-all-recs', action='store_true', help='Show all records (including fillers) found during recovery') self.add_argument('--show-xids', action='store_true', - help='Show xid as hex number, otherwise show only xid length') + help='Show xid as hex number, otherwise show only xid length. Only has effect when records are shown') +# TODO: Add ability to show xid as an index rather than a value, helps analysis when xid is a long value with +# small differences which cannot easily be seen when looking at an output. Also prints a table of indeces vs xid values. +# self.add_argument('--show-xid-index', action='store_true', +# help='Show xids by index rather than by their value. Useful for long xids. Prints xid index table') self.add_argument('--show-data', action='store_true', - help='Show data, otherwise show only data length') + help='Show data, otherwise show only data length. Only has effect when records are shown') self.add_argument('--stats', action='store_true', help='Print journal record stats') + self.add_argument('--txtest', action='store_true', + help='Show qpid-txtest message number as the message content when viewing records. Only has effect when records are shown') self.add_argument('--txn', action='store_true', help='Reconcile incomplete transactions') self.add_argument('--version', action='version', @@ -91,7 +97,7 @@ class QqpdLinearStoreAnalyzer(object): """ Create a report on the linear store previously analyzed using analyze() """ if self.args.efp: self.efp_manager.report() - self.jrnl_recovery_mgr.report(self.args.stats) + self.jrnl_recovery_mgr.report() def run(self): """ Run the analyzer, which reads and analyzes the linear store """ if self.args.efp: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org