qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r1530301 [8/8] - in /qpid/trunk/qpid: cpp/src/tests/legacystore/ cpp/src/tests/legacystore/federation/ cpp/src/tests/legacystore/jrnl/ cpp/src/tests/legacystore/jrnl/jtt/ cpp/src/tests/legacystore/python_tests/ tools/src/py/ tools/src/py/qp...
Date Tue, 08 Oct 2013 15:09:01 GMT
Added: qpid/trunk/qpid/tools/src/py/qpidstore/janal.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidstore/janal.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidstore/janal.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidstore/janal.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,608 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import jerr, jrnl
+import os.path, sys
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+    """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock"""
+    
+    def __init__(self):
+        """Constructor"""
+        self.__map = {}
+    
+    def __str__(self):
+        """Print the contents of the map"""
+        return self.report(True, True)
+        
+    def add(self, fid, hdr, lock = False):
+        """Add a new record into the map"""
+        if hdr.rid in self.__map:
+            raise jerr.DuplicateRidError(hdr.rid)
+        self.__map[hdr.rid] = [fid, hdr, lock]
+    
+    def contains(self, rid):
+        """Return True if the map contains the given rid"""
+        return rid in self.__map
+    
+    def delete(self, rid):
+        """Delete the rid and its associated data from the map"""
+        if rid in self.__map:
+            if self.get_lock(rid):
+                raise jerr.DeleteLockedRecordError(rid)
+            del self.__map[rid]
+        else:
+            raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
+    
+    def get(self, rid):
+        """Return a list [fid, hdr, lock] for the given rid"""
+        if self.contains(rid):
+            return self.__map[rid]
+        return None
+    
+    def get_fid(self, rid):
+        """Return the fid for the given rid"""
+        if self.contains(rid):
+            return self.__map[rid][0]
+        return None
+    
+    def get_hdr(self, rid):
+        """Return the header record for the given rid"""
+        if self.contains(rid):
+            return self.__map[rid][1]
+        return None
+    
+    def get_lock(self, rid):
+        """Return the transaction lock value for the given rid""" 
+        if self.contains(rid):
+            return self.__map[rid][2]
+        return None
+    
+    def get_rec_list(self):
+        """Return a list of tuples (fid, hdr, lock) for all entries in the map"""
+        return self.__map.values()
+    
+    def lock(self, rid):
+        """Set the transaction lock for a given rid to True"""
+        if rid in self.__map:
+            if not self.__map[rid][2]: # locked
+                self.__map[rid][2] = True
+            else:
+                raise jerr.AlreadyLockedError(rid)
+        else:
+            raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
+        
+    def report(self, show_stats, show_records):
+        """Return a string containing a text report for all records in the map"""
+        if len(self.__map) == 0:
+            return "No enqueued records found."
+        rstr = "%d enqueued records found" % len(self.__map)
+        if show_records:
+            rstr += ":"
+            rid_list = self.__map.keys()
+            rid_list.sort()
+            for rid in rid_list:
+                if self.__map[rid][2]:
+                    lock_str = " [LOCKED]"
+                else:
+                    lock_str = ""
+                rstr += "\n  lfid=%d %s %s" % (rec[0], rec[1], lock_str)
+        else:
+            rstr += "."
+        return rstr
+    
+    def rids(self):
+        """Return a list of rids in the map"""
+        return self.__map.keys()
+    
+    def size(self):
+        """Return the number of entries in the map"""
+        return len(self.__map)
+    
+    def unlock(self, rid):
+        """Set the transaction lock for a given rid to False"""
+        if rid in self.__map:
+            if self.__map[rid][2]:
+                self.__map[rid][2] = False
+            else:
+                raise jerr.NotLockedError(rid)
+        else:
+            raise jerr.NonExistentRecordError("unlock", rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+    """Transaction map, which maps xids to a list of outstanding actions"""
+    
+    def __init__(self, emap):
+        """Constructor, requires an existing EnqMap instance"""
+        self.__emap = emap
+        self.__map = {}
+    
+    def __str__(self):
+        """Print the contents of the map"""
+        return self.report(True, True)
+    
+    def add(self, fid, hdr):
+        """Add a new transactional record into the map"""
+        if isinstance(hdr, jrnl.DeqRec):
+            try:
+                self.__emap.lock(hdr.deq_rid)
+            except jerr.JWarning:
+                # Not in emap, look for rid in tmap
+                l = self.find_rid(hdr.deq_rid, hdr.xid)
+                if l != None:
+                    if l[2]:
+                        raise jerr.AlreadyLockedError(hdr.deq_rid)
+                    l[2] = True
+        if hdr.xid in self.__map:
+            self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
+        else:
+            self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
+    
+    def contains(self, xid):
+        """Return True if the xid exists in the map; False otherwise"""
+        return xid in self.__map
+    
+    def delete(self, hdr):
+        """Remove a transaction record from the map using either a commit or abort header"""
+        if hdr.magic[-1] == "c":
+            return self._commit(hdr.xid)
+        if hdr.magic[-1] == "a":
+            self._abort(hdr.xid)
+        else:
+            raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid)
+    
+    def find_rid(self, rid, xid_hint = None):
+        """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first"""
+        if xid_hint != None and self.contains(xid_hint):
+            for l in self.__map[xid_hint]:
+                if l[1].rid == rid:
+                    return l
+        for xid in self.__map.iterkeys():
+            if xid_hint == None or xid != xid_hint:
+                for l in self.__map[xid]:
+                    if l[1].rid == rid:
+                        return l
+        
+    def get(self, xid):
+        """Return a list of operations for the given xid"""
+        if self.contains(xid):
+            return self.__map[xid]
+        
+    def report(self, show_stats, show_records):
+        """Return a string containing a text report for all records in the map"""
+        if len(self.__map) == 0:
+            return "No outstanding transactions found."
+        rstr = "%d outstanding transactions found" % len(self.__map)
+        if show_records:
+            rstr += ":"
+            for xid, tup in self.__map.iteritems():
+                rstr += "\n  xid=%s:" % jrnl.Utils.format_xid(xid)
+                for i in tup:
+                    rstr += "\n   %s" % str(i[1])
+        else:
+            rstr += "."
+        return rstr
+    
+    def size(self):
+        """Return the number of xids in the map"""
+        return len(self.__map)
+    
+    def xids(self):
+        """Return a list of xids in the map"""
+        return self.__map.keys()
+    
+    def _abort(self, xid):
+        """Perform an abort operation for the given xid record"""
+        for fid, hdr, lock in self.__map[xid]:
+            if isinstance(hdr, jrnl.DeqRec):
+                self.__emap.unlock(hdr.deq_rid)
+        del self.__map[xid]
+    
+    def _commit(self, xid):
+        """Perform a commit operation for the given xid record"""
+        mismatch_list = []
+        for fid, hdr, lock in self.__map[xid]:
+            if isinstance(hdr, jrnl.EnqRec):
+                self.__emap.add(fid, hdr, lock) # Transfer enq to emap
+            else:
+                if self.__emap.contains(hdr.deq_rid):
+                    self.__emap.unlock(hdr.deq_rid)
+                    self.__emap.delete(hdr.deq_rid)
+                else:
+                    mismatch_list.append("0x%x" % hdr.deq_rid)
+        del self.__map[xid]
+        return mismatch_list
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+    """
+    This class analyzes a set of journal files and determines which is the last to be written
+    (the newest file),  and hence which should be the first to be read for recovery (the oldest
+    file).
+    
+    The analysis is performed on construction; the contents of the JrnlInfo object passed provide
+    the recovery details.
+    """
+
+    def __init__(self, jinf):
+        """Constructor"""
+        self.__oldest = None
+        self.__jinf = jinf
+        self.__flist = self._analyze()
+                
+    def __str__(self):
+        """String representation of this JrnlAnalyzer instance, will print out results of analysis."""
+        ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir()
+        if self.is_empty():
+            ostr += "  <All journal files are empty>\n"
+        else:
+            for tup in self.__flist:
+                tmp = " "
+                if tup[0] == self.__oldest[0]:
+                    tmp = "*"
+                ostr += "  %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2],
+                                                                          tup[3], tup[4], tup[5])
+            for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
+                ostr += "    %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i) 
+        return ostr
+        
+    # Analysis
+    
+    def get_oldest_file(self):
+        """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal"""
+        return self.__oldest
+
+    def get_oldest_file_index(self):
+        """Return the ordinal number of the oldest data file found in the journal"""
+        if self.is_empty():
+            return None
+        return self.__oldest[0]
+
+    def is_empty(self):
+        """Return true if the analysis found that the journal file has never been written to"""
+        return len(self.__flist) == 0
+    
+    def _analyze(self):
+        """Perform the journal file analysis by reading and comparing the file headers of each journal data file"""
+        owi_found = False
+        flist = []
+        for i in range(0, self.__jinf.get_num_jrnl_files()):
+            jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i))
+            fhandle = open(jfn)
+            fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
+            if fhdr.empty():
+                break
+            this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+            flist.append(this_tup)
+            if i == 0:
+                init_owi = fhdr.owi()
+                self.__oldest = this_tup
+            elif fhdr.owi() != init_owi and not owi_found:
+                self.__oldest = this_tup
+                owi_found = True
+        return flist
+        
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+    """
+    This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
+    object list (txn_obj_list) which are populated by reading the journals from the oldest
+    to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+    objects supplied on construction provide the information used for the recovery.
+    
+    The analysis is performed on construction.
+    """
+    
+    def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
+        """Constructor, which reads all """
+        self._jinfo = jinfo
+        self._jra = jra
+        self._qflag = qflag
+        self._rflag = rflag
+        self._vflag = vflag
+        
+        # test callback functions for CSV tests
+        self._csv_store_chk = None
+        self._csv_start_cb = None
+        self._csv_enq_cb = None
+        self._csv_deq_cb = None
+        self._csv_txn_cb = None
+        self._csv_end_cb = None
+        
+        self._emap = EnqMap()
+        self._tmap = TxnMap(self._emap)
+        self._txn_obj_list = {}
+        
+        self._file = None
+        self._file_hdr = None
+        self._file_num = None
+        self._first_rec_flag = None
+        self._fro = None
+        self._last_file_flag = None
+        self._start_file_num = None
+        self._file_hdr_owi = None
+        self._warning = []
+        
+        self._abort_cnt = 0
+        self._commit_cnt = 0
+        self._msg_cnt = 0
+        self._rec_cnt = 0
+        self._txn_msg_cnt = 0
+    
+    def __str__(self):
+        """Print out all the undequeued records"""
+        return self.report(True, self._rflag)
+    
+    def emap(self):
+        """Get the enqueue map"""
+        return self._emap
+
+    def get_abort_cnt(self):
+        """Get the cumulative number of transactional aborts found"""
+        return self._abort_cnt
+
+    def get_commit_cnt(self):
+        """Get the cumulative number of transactional commits found"""
+        return self._commit_cnt
+
+    def get_msg_cnt(self):
+        """Get the cumulative number of messages found"""
+        return self._msg_cnt
+    
+    def get_rec_cnt(self):
+        """Get the cumulative number of journal records (including fillers) found"""
+        return self._rec_cnt
+
+    def is_last_file(self):
+        """Return True if the last file is being read"""
+        return self._last_file_flag
+    
+    def report(self, show_stats = True, show_records = False):
+        """Return a string containing a report on the file analysis"""
+        rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records)
+        #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold
+        return rstr
+    
+    def run(self):
+        """Perform the read of the journal"""
+        if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
+            return
+        if self._jra.is_empty():
+            return
+        stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
+        while not stop and not self._get_next_record():
+            pass
+        if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
+            return
+        if not self._qflag:
+            print
+    
+    def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None,
+                      csv_end_cb = None):
+        """Set callbacks for checks to be made at various points while reading the journal"""
+        self._csv_store_chk = csv_store_chk
+        self._csv_start_cb = csv_start_cb
+        self._csv_enq_cb = csv_enq_cb
+        self._csv_deq_cb = csv_deq_cb
+        self._csv_txn_cb = csv_txn_cb
+        self._csv_end_cb = csv_end_cb
+    
+    def tmap(self):
+        """Return the transaction map"""
+        return self._tmap
+    
+    def get_txn_msg_cnt(self):
+        """Get the cumulative transactional message count"""
+        return self._txn_msg_cnt
+    
+    def txn_obj_list(self):
+        """Get a cumulative list of transaction objects (commits and aborts)"""
+        return self._txn_obj_list
+    
+    def _advance_jrnl_file(self, *oldest_file_info):
+        """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
+        more files to read."""
+        fro_seek_flag = False
+        if len(oldest_file_info) > 0:
+            self._start_file_num = self._file_num = oldest_file_info[0]
+            self._fro = oldest_file_info[4]
+            fro_seek_flag = True # jump to fro to start reading
+            if not self._qflag and not self._rflag:
+                if self._vflag:
+                    print "Recovering journals..."
+                else:
+                    print "Recovering journals",
+        if self._file != None and self._is_file_full():
+            self._file.close()
+            self._file_num = self._incr_file_num()
+            if self._file_num == self._start_file_num:
+                return True
+            if self._start_file_num == 0:
+                self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1
+            else:
+                self._last_file_flag = self._file_num == self._start_file_num - 1
+        if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files():
+            raise jerr.BadFileNumberError(self._file_num)
+        jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
+                           (self._jinfo.get_jrnl_base_name(), self._file_num))
+        self._file = open(jfn)
+        self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+        if fro_seek_flag and self._file.tell() != self._fro:
+            self._file.seek(self._fro)
+        self._first_rec_flag = True
+        if not self._qflag:
+            if self._rflag:
+                print jfn, ": ", self._file_hdr
+            elif self._vflag:
+                print "* Reading %s" % jfn
+            else:
+                print ".",
+                sys.stdout.flush()
+        return False
+
+    def _check_owi(self, hdr):
+        """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
+        indicate whether the last record in a file has been read and now older records which have not yet been
+        overwritten are now being read."""
+        return self._file_hdr_owi == hdr.owi()
+    
+    def _is_file_full(self):
+        """Return True if the current file is full (no more write space); false otherwise"""
+        return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
+    
+    def _get_next_record(self):
+        """Get the next record in the file for analysis"""
+        if self._is_file_full():
+            if self._advance_jrnl_file():
+                return True
+        try:
+            hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+        except:
+            return True
+        if hdr.empty():
+            return True
+        if hdr.check():
+            return True
+        self._rec_cnt += 1
+        self._file_hdr_owi = self._file_hdr.owi()
+        if self._first_rec_flag:
+            if self._file_hdr.fro != hdr.foffs:
+                raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
+            else:
+                if self._rflag:
+                    print " * fro ok: 0x%x" % self._file_hdr.fro
+                self._first_rec_flag = False
+        stop = False
+        if   isinstance(hdr, jrnl.EnqRec):
+            stop = self._handle_enq_rec(hdr)
+        elif isinstance(hdr, jrnl.DeqRec):
+            stop = self._handle_deq_rec(hdr)
+        elif isinstance(hdr, jrnl.TxnRec):
+            stop = self._handle_txn_rec(hdr)
+        wstr = ""
+        for warn in self._warning:
+            wstr += " (%s)" % warn
+        if self._rflag:
+            print " > %s  %s" % (hdr, wstr)
+        self._warning = []
+        return stop
+     
+    def _handle_deq_rec(self, hdr):
+        """Process a dequeue ("RHMd") record"""
+        if self._load_rec(hdr):
+            return True
+        
+        # Check OWI flag
+        if not self._check_owi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
+            return True
+        
+        try:
+            if hdr.xid == None:
+                self._emap.delete(hdr.deq_rid)
+            else:
+                self._tmap.add(self._file_hdr.fid, hdr)
+        except jerr.JWarning, warn:
+            self._warning.append(str(warn))
+        return False
+    
+    def _handle_enq_rec(self, hdr):
+        """Process a dequeue ("RHMe") record"""
+        if self._load_rec(hdr):
+            return True
+        
+        # Check extern flag
+        if hdr.extern and hdr.data != None:
+            raise jerr.ExternFlagDataError(hdr)
+        # Check OWI flag
+        if not self._check_owi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
+            return True
+        
+        if hdr.xid == None:
+            self._emap.add(self._file_hdr.fid, hdr)
+        else:
+            self._txn_msg_cnt += 1
+            self._tmap.add(self._file_hdr.fid, hdr)
+        self._msg_cnt += 1
+        return False
+    
+    def _handle_txn_rec(self, hdr):
+        """Process a transaction ("RHMa or RHMc") record"""
+        if self._load_rec(hdr):
+            return True
+        
+        # Check OWI flag
+        if not self._check_owi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
+            return True
+               
+        if hdr.magic[-1] == "a":
+            self._abort_cnt += 1
+        else:
+            self._commit_cnt += 1
+        
+        if self._tmap.contains(hdr.xid):
+            mismatched_rids = self._tmap.delete(hdr)
+            if mismatched_rids != None and len(mismatched_rids) > 0:
+                self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
+                                     mismatched_rids)
+        else:
+            self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
+        if hdr.magic[-1] == "c": # commits only
+            self._txn_obj_list[hdr.xid] = hdr
+        return False
+
+    def _incr_file_num(self):
+        """Increment the number of files read with wraparound (ie after file n-1, go to 0)"""
+        self._file_num += 1
+        if self._file_num >= self._jinfo.get_num_jrnl_files():
+            self._file_num = 0
+        return self._file_num
+    
+    def _load_rec(self, hdr):
+        """Load a single record for the given header. There may be arbitrarily large xids and data components."""
+        while not hdr.complete():
+            if self._advance_jrnl_file():
+                return True
+            hdr.load(self._file)
+        return False
+
+# =============================================================================
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."

Added: qpid/trunk/qpid/tools/src/py/qpidstore/jerr.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidstore/jerr.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidstore/jerr.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidstore/jerr.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# == Warnings =================================================================
+
+class JWarning(Exception):
+    """Class to convey a warning"""
+    def __init__(self, err):
+        """Constructor"""
+        Exception.__init__(self, err)
+
+# == Errors ===================================================================
+
+class AllJrnlFilesEmptyCsvError(Exception):
+    """All journal files are empty (never been written)"""
+    def __init__(self, tnum, exp_num_msgs):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." %
+                           (tnum, exp_num_msgs))
+
+class AlreadyLockedError(Exception):
+    """Error class for trying to lock a record that is already locked"""
+    def __init__(self, rid):
+        """Constructor"""
+        Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid)
+
+class BadFileNumberError(Exception):
+    """Error class for incorrect or unexpected file number"""
+    def __init__(self, file_num):
+        """Constructor"""
+        Exception.__init__(self, "Bad file number %d" % file_num)
+
+class DataSizeError(Exception):
+    """Error class for data size mismatch"""
+    def __init__(self, exp_size, act_size, data_str):
+        """Constructor"""
+        Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" %
+                           (exp_size, act_size, data_str))
+
+class DeleteLockedRecordError(Exception):
+    """Error class for deleting a locked record from the enqueue map"""
+    def __init__(self, rid):
+        """Constructor"""
+        Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid)
+
+class DequeueNonExistentEnqueueError(Exception):
+    """Error class for attempting to dequeue a non-existent enqueue record (rid)"""
+    def __init__(self, deq_rid):
+        """Constructor"""
+        Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid)
+
+class DuplicateRidError(Exception):
+    """Error class for placing duplicate rid into enqueue map"""
+    def __init__(self, rid):
+        """Constructor"""
+        Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid)
+            
+class EndianMismatchError(Exception):
+    """Error class mismatched record header endian flag"""
+    def __init__(self, exp_endianness):
+        """Constructor"""
+        Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" %
+                           self.endian_str(exp_endianness))
+    #@staticmethod
+    def endian_str(endianness):
+        """Return a string tuple for the endianness error message"""
+        if endianness:
+            return "big", "little"
+        return "little", "big"
+    endian_str = staticmethod(endian_str)
+ 
+class ExternFlagDataError(Exception):
+    """Error class for the extern flag being set and the internal size > 0"""
+    def __init__(self, hdr):
+        """Constructor"""
+        Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr)
+       
+class ExternFlagCsvError(Exception):
+    """External flag mismatch between record and CSV test file"""
+    def __init__(self, tnum, exp_extern_flag):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag))
+
+class ExternFlagWithDataCsvError(Exception):
+    """External flag set and Message data found"""
+    def __init__(self, tnum):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum)
+
+class FillExceedsFileSizeError(Exception):
+    """Internal error from a fill operation which will exceed the specified file size"""
+    def __init__(self, cur_size, file_size):
+        """Constructor"""
+        Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size))
+
+class FillSizeError(Exception):
+    """Internal error from a fill operation that did not match the calculated end point in the file"""
+    def __init__(self, cur_posn, exp_posn):
+        """Constructor"""
+        Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn))
+
+class FirstRecordOffsetMismatch(Exception):
+    """Error class for file header fro mismatch with actual record"""
+    def __init__(self, fro, actual_offs):
+        """Constructor"""
+        Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" %
+                           (fro, actual_offs))
+
+class InvalidHeaderVersionError(Exception):
+    """Error class for invalid record header version"""
+    def __init__(self, exp_ver, act_ver):
+        """Constructor"""
+        Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver))
+
+class InvalidRecordTypeError(Exception):
+    """Error class for any operation using an invalid record type"""
+    def __init__(self, operation, magic, rid):
+        """Constructor"""
+        Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" %
+                           (operation, magic, rid))
+
+class InvalidRecordTailError(Exception):
+    """Error class for invalid record tail"""
+    def __init__(self, magic_err, rid_err, rec):
+        """Constructor"""
+        Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err)))
+    #@staticmethod
+    def tail_err_str(magic_err, rid_err):
+        """Return a string indicating the tail record error(s)"""
+        estr = ""
+        if magic_err:
+            estr = "magic bad"
+            if rid_err:
+                estr += ", "
+        if rid_err:
+            estr += "rid mismatch"
+        return estr
+    tail_err_str = staticmethod(tail_err_str)
+
+class NonExistentRecordError(Exception):
+    """Error class for any operation on an non-existent record"""
+    def __init__(self, operation, rid):
+        """Constructor"""
+        Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid))
+
+class NotLockedError(Exception):
+    """Error class for unlocking a record which is not locked in the first place"""
+    def __init__(self, rid):
+        """Constructor"""
+        Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid)
+
+class JournalSpaceExceededError(Exception):
+    """Error class for when journal space of resized journal is too small to contain the transferred records"""
+    def __init__(self):
+        """Constructor"""
+        Exception.__init__(self, "Ran out of journal space while writing records")
+
+class MessageLengthCsvError(Exception):
+    """Message length mismatch between record and CSV test file"""
+    def __init__(self, tnum, exp_msg_len, actual_msg_len):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" %
+                           (tnum, exp_msg_len, actual_msg_len))
+
+class NumMsgsCsvError(Exception):
+    """Number of messages found mismatched with CSV file"""
+    def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" %
+                           (tnum, exp_num_msgs, actual_num_msgs))
+
+class TransactionCsvError(Exception):
+    """Transaction mismatch between record and CSV file"""
+    def __init__(self, tnum, exp_transactional):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional))
+
+class UnexpectedEndOfFileError(Exception):
+    """Error class for unexpected end-of-file during reading"""
+    def __init__(self, exp_size, curr_offs):
+        """Constructor"""
+        Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" %
+                           (exp_size, curr_offs))
+
+class XidLengthCsvError(Exception):
+    """Message Xid length mismatch between record and CSV file"""
+    def __init__(self, tnum, exp_xid_len, actual_msg_len):
+        """Constructor"""
+        Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" %
+                           (tnum, exp_xid_len, actual_msg_len))
+
+class XidSizeError(Exception):
+    """Error class for Xid size mismatch"""
+    def __init__(self, exp_size, act_size, xid_str):
+        """Constructor"""
+        Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" %
+                           (exp_size, act_size, xid_str))
+
+# =============================================================================
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."
+

Added: qpid/trunk/qpid/tools/src/py/qpidstore/jrnl.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidstore/jrnl.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidstore/jrnl.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidstore/jrnl.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,794 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import jerr
+import os.path, sys, xml.parsers.expat
+from struct import pack, unpack, calcsize
+from time import gmtime, strftime
+
+# TODO: Get rid of these! Use jinf instance instead
+DBLK_SIZE = 128
+SBLK_SIZE = 4 * DBLK_SIZE
+
+# TODO - this is messy - find a better way to handle this
+# This is a global, but is set directly by the calling program
+JRNL_FILE_SIZE = None
+
+#== class Utils ======================================================================
+
+class Utils(object):
+    """Class containing utility functions for dealing with the journal"""
+
+    __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ "
+    
+    # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x)
+    # When RHEL4 support ends, restore these declarations and remove the older
+    # staticmethod() declaration.
+
+    #@staticmethod
+    def format_data(dsize, data):
+        """Format binary data for printing"""
+        if data == None:
+            return ""
+        if Utils._is_printable(data):
+            datastr = Utils._split_str(data)
+        else:
+            datastr = Utils._hex_split_str(data)
+        if dsize != len(data):
+            raise jerr.DataSizeError(dsize, len(data), datastr)
+        return "data(%d)=\"%s\" " % (dsize, datastr)
+    format_data = staticmethod(format_data)
+
+    #@staticmethod
+    def format_xid(xid, xidsize=None):
+        """Format binary XID for printing"""
+        if xid == None and xidsize != None:
+            if xidsize > 0:
+                raise jerr.XidSizeError(xidsize, 0, None)
+            return ""
+        if Utils._is_printable(xid):
+            xidstr = Utils._split_str(xid)
+        else:
+            xidstr = Utils._hex_split_str(xid)
+        if xidsize == None:
+            xidsize = len(xid)
+        elif xidsize != len(xid):
+            raise jerr.XidSizeError(xidsize, len(xid), xidstr)
+        return "xid(%d)=\"%s\" " % (xidsize, xidstr)
+    format_xid = staticmethod(format_xid)
+    
+    #@staticmethod
+    def inv_str(string):
+        """Perform a binary 1's compliment (invert all bits) on a binary string"""
+        istr = ""
+        for index in range(0, len(string)):
+            istr += chr(~ord(string[index]) & 0xff)
+        return istr
+    inv_str = staticmethod(inv_str)
+
+    #@staticmethod
+    def load(fhandle, klass):
+        """Load a record of class klass from a file"""
+        args = Utils._load_args(fhandle, klass)
+        subclass = klass.discriminate(args)
+        result = subclass(*args) # create instance of record
+        if subclass != klass:
+            result.init(fhandle, *Utils._load_args(fhandle, subclass))
+        result.skip(fhandle)
+        return result
+    load = staticmethod(load)
+    
+    #@staticmethod
+    def load_file_data(fhandle, size, data):
+        """Load the data portion of a message from file"""
+        if size == 0:
+            return (data, True)
+        if data == None:
+            loaded = 0
+        else:
+            loaded = len(data)
+        foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
+        if foverflow:
+            rsize = JRNL_FILE_SIZE - fhandle.tell()
+        else:
+            rsize = size - loaded
+        fbin = fhandle.read(rsize)
+        if data == None:
+            data = unpack("%ds" % (rsize), fbin)[0]
+        else:
+            data = data + unpack("%ds" % (rsize), fbin)[0]
+        return (data, not foverflow)
+    load_file_data = staticmethod(load_file_data)
+
+    #@staticmethod
+    def rem_bytes_in_blk(fhandle, blk_size):
+        """Return the remaining bytes in a block"""
+        foffs = fhandle.tell()
+        return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
+    rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
+
+    #@staticmethod
+    def size_in_blks(size, blk_size):
+        """Return the size in terms of data blocks"""
+        return int((size + blk_size - 1) / blk_size)
+    size_in_blks = staticmethod(size_in_blks)
+
+    #@staticmethod
+    def size_in_bytes_to_blk(size, blk_size):
+        """Return the bytes remaining until the next block boundary"""
+        return Utils.size_in_blks(size, blk_size) * blk_size
+    size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
+
+    #@staticmethod
+    def _hex_split_str(in_str, split_size = 50):
+        """Split a hex string into two parts separated by an ellipsis"""
+        if len(in_str) <= split_size:
+            return Utils._hex_str(in_str, 0, len(in_str))
+#        if len(in_str) > split_size + 25:
+#            return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \
+#                   Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+        return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+    _hex_split_str = staticmethod(_hex_split_str)
+
+    #@staticmethod
+    def _hex_str(in_str, begin, end):
+        """Return a binary string as a hex string"""
+        hstr = ""
+        for index in range(begin, end):
+            if Utils._is_printable(in_str[index]):
+                hstr += in_str[index]
+            else:
+                hstr += "\\%02x" % ord(in_str[index])
+        return hstr
+    _hex_str = staticmethod(_hex_str)
+
+    #@staticmethod
+    def _is_printable(in_str):
+        """Return True if in_str in printable; False otherwise."""
+        return in_str.strip(Utils.__printchars) == ""
+    _is_printable = staticmethod(_is_printable)
+
+    #@staticmethod
+    def _load_args(fhandle, klass):
+        """Load the arguments from class klass"""
+        size = calcsize(klass.FORMAT)
+        foffs = fhandle.tell(),
+        fbin = fhandle.read(size)
+        if len(fbin) != size:
+            raise jerr.UnexpectedEndOfFileError(size, len(fbin))
+        return foffs + unpack(klass.FORMAT, fbin)
+    _load_args = staticmethod(_load_args)
+
+    #@staticmethod
+    def _split_str(in_str, split_size = 50):
+        """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
+        if len(in_str) < split_size:
+            return in_str
+        return in_str[:25] + " ... " + in_str[-25:]
+    _split_str = staticmethod(_split_str)
+
+
+#== class Hdr =================================================================
+
+class Hdr:
+    """Class representing the journal header records"""
+ 
+    FORMAT = "=4sBBHQ"
+    HDR_VER = 1
+    OWI_MASK = 0x01
+    BIG_ENDIAN = sys.byteorder == "big"
+    REC_BOUNDARY = DBLK_SIZE
+
+    def __init__(self, foffs, magic, ver, endn, flags, rid):
+        """Constructor"""
+#        Sizeable.__init__(self)
+        self.foffs = foffs
+        self.magic = magic
+        self.ver = ver
+        self.endn = endn
+        self.flags = flags
+        self.rid = long(rid)
+        
+    def __str__(self):
+        """Return string representation of this header"""
+        if self.empty():
+            return "0x%08x: <empty>" % (self.foffs)
+        if self.magic[-1] == "x":
+            return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
+        if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]:
+            return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn,
+                                                                     self.flags, self.rid)
+        return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" %  (self.foffs, self.magic)
+    
+    #@staticmethod
+    def discriminate(args):
+        """Use the last char in the header magic to determine the header type"""
+        return _CLASSES.get(args[1][-1], Hdr)
+    discriminate = staticmethod(discriminate)
+
+    def empty(self):
+        """Return True if this record is empty (ie has a magic of 0x0000"""
+        return self.magic == "\x00"*4
+    
+    def encode(self):
+        """Encode the header into a binary string"""
+        return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
+
+    def owi(self):
+        """Return the OWI (overwrite indicator) for this header"""
+        return self.flags & self.OWI_MASK != 0
+
+    def skip(self, fhandle):
+        """Read and discard the remainder of this record"""
+        fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
+
+    def check(self):
+        """Check that this record is valid"""
+        if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]:
+            return True
+        if self.magic[-1] != "x":
+            if self.ver != self.HDR_VER:
+                raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
+            if bool(self.endn) != self.BIG_ENDIAN:
+                raise jerr.EndianMismatchError(self.BIG_ENDIAN)
+        return False
+        
+
+#== class FileHdr =============================================================
+
+class FileHdr(Hdr):
+    """Class for file headers, found at the beginning of journal files"""
+
+    FORMAT = "=2H4x3Q"
+    REC_BOUNDARY = SBLK_SIZE
+        
+    def __str__(self):
+        """Return a string representation of the this FileHdr instance"""
+        return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro,
+                                                     self.timestamp_str())
+    
+    def encode(self):
+        """Encode this class into a binary string"""
+        return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
+
+    def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
+        """Initialize this instance to known values"""
+        self.fid = fid
+        self.lid = lid
+        self.fro = fro
+        self.time_sec = time_sec
+        self.time_ns = time_ns
+
+    def timestamp(self):
+        """Get the timestamp of this record as a tuple (secs, nsecs)"""
+        return (self.time_sec, self.time_ns)
+
+    def timestamp_str(self):
+        """Get the timestamp of this record in string format"""
+        time = gmtime(self.time_sec)
+        fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
+        return strftime(fstr, time)
+
+
+#== class DeqRec ==============================================================
+
+class DeqRec(Hdr):
+    """Class for a dequeue record"""
+
+    FORMAT = "=QQ"
+
+    def __str__(self):
+        """Return a string representation of the this DeqRec instance"""
+        return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
+
+    def init(self, fhandle, foffs, deq_rid, xidsize):
+        """Initialize this instance to known values"""
+        self.deq_rid = deq_rid
+        self.xidsize = xidsize
+        self.xid = None
+        self.deq_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(fhandle)
+    
+    def encode(self):
+        """Encode this class into a binary string"""
+        buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
+        if self.xidsize > 0:
+            fmt = "%ds" % (self.xidsize)
+            buf += pack(fmt, self.xid)
+            buf += self.deq_tail.encode()
+        return buf
+
+    def load(self, fhandle):
+        """Load the remainder of this record (after the header has been loaded"""
+        if self.xidsize == 0:
+            self.xid_complete = True
+            self.tail_complete = True
+        else:
+            if not self.xid_complete:
+                (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+            if self.xid_complete and not self.tail_complete:
+                ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+                self.tail_bin = ret[0]
+                if ret[1]:
+                    self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+                    magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
+                    rid_err = self.deq_tail.rid != self.rid
+                    if magic_err or rid_err:
+                        raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+                    self.skip(fhandle)
+                self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        """Returns True if the entire record is loaded, False otherwise"""
+        return self.xid_complete and self.tail_complete
+
+
+#== class TxnRec ==============================================================
+
+class TxnRec(Hdr):
+    """Class for a transaction commit/abort record"""
+
+    FORMAT = "=Q"
+
+    def __str__(self):
+        """Return a string representation of the this TxnRec instance"""
+        return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize))
+
+    def init(self, fhandle, foffs, xidsize):
+        """Initialize this instance to known values"""
+        self.xidsize = xidsize
+        self.xid = None
+        self.tx_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(fhandle)
+    
+    def encode(self):
+        """Encode this class into a binary string"""
+        return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \
+               self.tx_tail.encode()
+
+    def load(self, fhandle):
+        """Load the remainder of this record (after the header has been loaded"""
+        if not self.xid_complete:
+            ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.tail_complete:
+            ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+                magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
+                rid_err = self.tx_tail.rid != self.rid
+                if magic_err or rid_err:
+                    raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+                self.skip(fhandle)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        """Returns True if the entire record is loaded, False otherwise"""
+        return self.xid_complete and self.tail_complete
+
+
+#== class EnqRec ==============================================================
+
+class EnqRec(Hdr):
+    """Class for a enqueue record"""
+
+    FORMAT = "=QQ"
+    TRANSIENT_MASK = 0x10
+    EXTERN_MASK = 0x20
+
+    def __str__(self):
+        """Return a string representation of the this EnqRec instance"""
+        return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize),
+                                  Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags())
+    
+    def encode(self):
+        """Encode this class into a binary string"""
+        buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
+        if self.xidsize > 0:
+            buf += pack("%ds" % self.xidsize, self.xid)
+        if self.dsize > 0:
+            buf += pack("%ds" % self.dsize, self.data)
+        if self.xidsize > 0 or self.dsize > 0:
+            buf += self.enq_tail.encode()
+        return buf
+
+    def init(self, fhandle, foffs, xidsize, dsize):
+        """Initialize this instance to known values"""
+        self.xidsize = xidsize
+        self.dsize = dsize
+        self.transient = self.flags & self.TRANSIENT_MASK > 0
+        self.extern = self.flags & self.EXTERN_MASK > 0
+        self.xid = None
+        self.data = None
+        self.enq_tail = None
+        self.xid_complete = False
+        self.data_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(fhandle)
+
+    def load(self, fhandle):
+        """Load the remainder of this record (after the header has been loaded"""
+        if not self.xid_complete:
+            ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.data_complete:
+            if self.extern:
+                self.data_complete = True
+            else:
+                ret = Utils.load_file_data(fhandle, self.dsize, self.data)
+                self.data = ret[0]
+                self.data_complete = ret[1]
+        if self.data_complete and not self.tail_complete:
+            ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+                magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
+                rid_err = self.enq_tail.rid != self.rid
+                if magic_err or rid_err:
+                    raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+                self.skip(fhandle)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        """Returns True if the entire record is loaded, False otherwise"""
+        return self.xid_complete and self.data_complete and self.tail_complete
+
+    def print_flags(self):
+        """Utility function to decode the flags field in the header and print a string representation"""
+        fstr = ""
+        if self.transient:
+            fstr = "*TRANSIENT"
+        if self.extern:
+            if len(fstr) > 0:
+                fstr += ",EXTERNAL"
+            else:
+                fstr = "*EXTERNAL"
+        if len(fstr) > 0:
+            fstr += "*"
+        return fstr
+
+
+#== class RecTail =============================================================
+
+class RecTail:
+    """Class for a record tail - for all records where either an XID or data separate the header from the end of the
+    record"""
+
+    FORMAT = "=4sQ"
+
+    def __init__(self, foffs, magic_inv, rid):
+        """Initialize this instance to known values"""
+        self.foffs = foffs
+        self.magic_inv = magic_inv
+        self.rid = long(rid)
+
+    def __str__(self):
+        """Return a string representation of the this RecTail instance"""
+        magic = Utils.inv_str(self.magic_inv)
+        return "[\"%s\" rid=0x%x]" % (magic, self.rid)
+    
+    def encode(self):
+        """Encode this class into a binary string"""
+        return pack(RecTail.FORMAT, self.magic_inv, self.rid)                
+
+
+#== class JrnlInfo ============================================================
+
+class JrnlInfo(object):
+    """
+    This object reads and writes journal information files (<basename>.jinf). Methods are provided
+    to read a file, query its properties and reset just those properties necessary for normalizing
+    and resizing a journal.
+    
+    Normalizing: resetting the directory and/or base filename to different values. This is necessary
+    if a set of journal files is copied from one location to another before being restored, as the
+    value of the path in the file no longer matches the actual path.
+    
+    Resizing: If the journal geometry parameters (size and number of journal files) changes, then the
+    .jinf file must reflect these changes, as this file is the source of information for journal
+    recovery.
+    
+    NOTE: Data size vs File size: There are methods which return the data size and file size of the
+    journal files.
+    
+    +-------------+--------------------/ /----------+
+    | File header |           File data             |
+    +-------------+--------------------/ /----------+
+    |             |                                 |
+    |             |<---------- Data size ---------->|
+    |<------------------ File Size ---------------->|
+    
+    Data size: The size of the data content of the journal, ie that part which stores the data records.
+    
+    File size: The actual disk size of the journal including data and the file header which precedes the
+    data.
+    
+    The file header is fixed to 1 sblk, so  file size = jrnl size + sblk size.
+    """
+    
+    def __init__(self, jdir, bfn = "JournalData"):
+        """Constructor"""
+        self.__jdir = jdir
+        self.__bfn = bfn
+        self.__jinf_dict = {}
+        self._read_jinf()
+    
+    def __str__(self):
+        """Create a string containing all of the journal info contained in the jinf file"""
+        ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
+        for key, val in self.__jinf_dict.iteritems():
+            ostr += "  %s = %s\n" % (key, val)
+        return ostr
+    
+    def normalize(self, jdir = None, bfn = None):
+        """Normalize the directory (ie reset the directory path to match the actual current location) for this
+        jinf file"""
+        if jdir == None:
+            self.__jinf_dict["directory"] = self.__jdir
+        else:
+            self.__jdir = jdir
+            self.__jinf_dict["directory"] = jdir
+        if bfn != None:
+            self.__bfn = bfn
+            self.__jinf_dict["base_filename"] = bfn
+    
+    def resize(self, num_jrnl_files = None, jrnl_file_size = None):
+        """Reset the journal size information to allow for resizing the journal"""
+        if num_jrnl_files != None:
+            self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
+        if jrnl_file_size != None:
+            self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes()
+
+    def write(self, jdir = None, bfn = None):
+        """Write the .jinf file"""
+        self.normalize(jdir, bfn)
+        if not os.path.exists(self.get_jrnl_dir()):
+            os.makedirs(self.get_jrnl_dir())
+        fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w")
+        fhandle.write("<?xml version=\"1.0\" ?>\n")
+        fhandle.write("<jrnl>\n")
+        fhandle.write("  <journal_version value=\"%d\" />\n" % self.get_jrnl_version())
+        fhandle.write("  <journal_id>\n")
+        fhandle.write("    <id_string value=\"%s\" />\n" % self.get_jrnl_id())
+        fhandle.write("    <directory value=\"%s\" />\n" % self.get_jrnl_dir())
+        fhandle.write("    <base_filename value=\"%s\" />\n" % self.get_jrnl_base_name())
+        fhandle.write("  </journal_id>\n")
+        fhandle.write("  <creation_time>\n")
+        fhandle.write("    <seconds value=\"%d\" />\n" % self.get_creation_time()[0])
+        fhandle.write("    <nanoseconds value=\"%d\" />\n" % self.get_creation_time()[1])
+        fhandle.write("    <string value=\"%s\" />\n" % self.get_creation_time_str())
+        fhandle.write("  </creation_time>\n")
+        fhandle.write("  <journal_file_geometry>\n")
+        fhandle.write("    <number_jrnl_files value=\"%d\" />\n" % self.get_num_jrnl_files())
+        fhandle.write("    <auto_expand value=\"%s\" />\n" % str.lower(str(self.get_auto_expand())))
+        fhandle.write("    <jrnl_file_size_sblks value=\"%d\" />\n" % self.get_jrnl_data_size_sblks())
+        fhandle.write("    <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_sblk_size_dblks())
+        fhandle.write("    <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_dblk_size_bytes())
+        fhandle.write("  </journal_file_geometry>\n")
+        fhandle.write("  <cache_geometry>\n")
+        fhandle.write("    <wcache_pgsize_sblks value=\"%d\" />\n" % self.get_wr_buf_pg_size_sblks())
+        fhandle.write("    <wcache_num_pages value=\"%d\" />\n" % self.get_num_wr_buf_pgs())
+        fhandle.write("    <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.get_rd_buf_pg_size_sblks())
+        fhandle.write("    <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.get_num_rd_buf_pgs())
+        fhandle.write("  </cache_geometry>\n")
+        fhandle.write("</jrnl>\n")
+        fhandle.close()
+    
+    # Journal ID
+    
+    def get_jrnl_version(self):
+        """Get the journal version"""
+        return self.__jinf_dict["journal_version"]
+    
+    def get_jrnl_id(self):
+        """Get the journal id"""
+        return self.__jinf_dict["id_string"]
+    
+    def get_current_dir(self):
+        """Get the current directory of the store (as opposed to that value saved in the .jinf file)"""
+        return self.__jdir
+    
+    def get_jrnl_dir(self):
+        """Get the journal directory stored in the .jinf file"""
+        return self.__jinf_dict["directory"]
+    
+    def get_jrnl_base_name(self):
+        """Get the base filename - that string used to name the journal files <basefilename>-nnnn.jdat and
+        <basefilename>.jinf"""
+        return self.__jinf_dict["base_filename"]
+    
+    # Journal creation time
+    
+    def get_creation_time(self):
+        """Get journal creation time as a tuple (secs, nsecs)"""
+        return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"])
+    
+    def get_creation_time_str(self):
+        """Get journal creation time as a string"""
+        return self.__jinf_dict["string"]
+    
+    # --- Files and geometry ---
+    
+    def get_num_jrnl_files(self):
+        """Get number of data files in the journal"""
+        return self.__jinf_dict["number_jrnl_files"]
+    
+    def get_auto_expand(self):
+        """Return True if auto-expand is enabled; False otherwise"""
+        return self.__jinf_dict["auto_expand"]
+    
+    def get_jrnl_sblk_size_dblks(self):
+        """Get the journal softblock size in dblks"""
+        return self.__jinf_dict["JRNL_SBLK_SIZE"]
+     
+    def get_jrnl_sblk_size_bytes(self):
+        """Get the journal softblock size in bytes"""
+        return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
+   
+    def get_jrnl_dblk_size_bytes(self):
+        """Get the journal datablock size in bytes"""
+        return self.__jinf_dict["JRNL_DBLK_SIZE"]
+    
+    def get_jrnl_data_size_sblks(self):
+        """Get the data capacity (excluding the file headers) for one journal file in softblocks"""
+        return self.__jinf_dict["jrnl_file_size_sblks"]
+    
+    def get_jrnl_data_size_dblks(self):
+        """Get the data capacity (excluding the file headers) for one journal file in datablocks"""
+        return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
+    
+    def get_jrnl_data_size_bytes(self):
+        """Get the data capacity (excluding the file headers) for one journal file in bytes"""
+        return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
+    
+    def get_jrnl_file_size_sblks(self):
+        """Get the size of one journal file on disk (including the file headers) in softblocks"""
+        return self.get_jrnl_data_size_sblks() + 1
+    
+    def get_jrnl_file_size_dblks(self):
+        """Get the size of one journal file on disk (including the file headers) in datablocks"""
+        return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
+    
+    def get_jrnl_file_size_bytes(self):
+        """Get the size of one journal file on disk (including the file headers) in bytes"""
+        return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
+    
+    def get_tot_jrnl_data_size_sblks(self):
+        """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+        softblocks"""
+        return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+    
+    def get_tot_jrnl_data_size_dblks(self):
+        """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+        datablocks"""
+        return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
+    
+    def get_tot_jrnl_data_size_bytes(self):
+        """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+        bytes"""
+        return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+    
+    # Read and write buffers
+    
+    def get_wr_buf_pg_size_sblks(self):
+        """Get the size of the write buffer pages in softblocks"""
+        return self.__jinf_dict["wcache_pgsize_sblks"]
+    
+    def get_wr_buf_pg_size_dblks(self):
+        """Get the size of the write buffer pages in datablocks"""
+        return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
+    
+    def get_wr_buf_pg_size_bytes(self):
+        """Get the size of the write buffer pages in bytes"""
+        return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
+    
+    def get_num_wr_buf_pgs(self):
+        """Get the number of write buffer pages"""
+        return self.__jinf_dict["wcache_num_pages"]
+    
+    def get_rd_buf_pg_size_sblks(self):
+        """Get the size of the read buffer pages in softblocks"""
+        return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
+    
+    def get_rd_buf_pg_size_dblks(self):
+        """Get the size of the read buffer pages in datablocks"""
+        return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
+    
+    def get_rd_buf_pg_size_bytes(self):
+        """Get the size of the read buffer pages in bytes"""
+        return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
+    
+    def get_num_rd_buf_pgs(self):
+        """Get the number of read buffer pages"""
+        return self.__jinf_dict["JRNL_RMGR_PAGES"]
+    
+    def _read_jinf(self):
+        """Read and initialize this instance from an existing jinf file located at the directory named in the
+        constructor - called by the constructor"""
+        fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
+        parser = xml.parsers.expat.ParserCreate()
+        parser.StartElementHandler = self._handle_xml_start_elt
+        parser.CharacterDataHandler = self._handle_xml_char_data
+        parser.EndElementHandler = self._handle_xml_end_elt
+        parser.ParseFile(fhandle)
+        fhandle.close()
+
+    def _handle_xml_start_elt(self, name, attrs):
+        """Callback for handling XML start elements. Used by the XML parser."""
+        # bool values
+        if name == "auto_expand":
+            self.__jinf_dict[name] = attrs["value"] == "true"
+        # long values
+        elif name == "seconds" or \
+             name == "nanoseconds":
+            self.__jinf_dict[name] = long(attrs["value"])
+        # int values
+        elif name == "journal_version" or \
+             name == "number_jrnl_files" or \
+             name == "jrnl_file_size_sblks" or \
+             name == "JRNL_SBLK_SIZE" or \
+             name == "JRNL_DBLK_SIZE" or \
+             name == "wcache_pgsize_sblks" or \
+             name == "wcache_num_pages" or \
+             name == "JRNL_RMGR_PAGE_SIZE" or \
+             name == "JRNL_RMGR_PAGES":
+            self.__jinf_dict[name] = int(attrs["value"])
+        # strings
+        elif "value" in attrs:
+            self.__jinf_dict[name] = attrs["value"]
+
+    def _handle_xml_char_data(self, data):
+        """Callback for handling character data (ie within <elt>...</elt>). The jinf file does not use this in its
+        data. Used by the XML parser.""" 
+        pass
+
+    def _handle_xml_end_elt(self, name):
+        """Callback for handling XML end elements. Used by XML parser."""
+        pass
+        
+
+#==============================================================================
+
+_CLASSES = {
+    "a": TxnRec,
+    "c": TxnRec,
+    "d": DeqRec,
+    "e": EnqRec,
+    "f": FileHdr
+}
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message