qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1530024 [3/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/
Date Mon, 07 Oct 2013 18:39:25 GMT
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Oct  7 18:39:24 2013
@@ -30,10 +30,12 @@
 #include <iomanip>
 #include <iostream>
 #include <qpid/linearstore/jrnl/EmptyFilePool.h>
+#include <qpid/linearstore/jrnl/EmptyFilePoolManager.h>
 //#include "qpid/linearstore/jrnl/file_hdr.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
 //#include "qpid/linearstore/jrnl/jinf.h"
-#include "qpid/linearstore/jrnl/JournalFileController.h"
+//#include "qpid/linearstore/jrnl/JournalFileController.h"
+#include "qpid/linearstore/jrnl/utils/enq_hdr.h"
 #include <limits>
 #include <sstream>
 #include <unistd.h>
@@ -72,15 +74,16 @@ jcntl::jcntl(const std::string& jid, con
     _stop_flag(false),
     _readonly_flag(false),
 //    _autostop(true),
-    _jfcp(0),
+    _linearFileController(*this),
+    _emptyFilePoolPtr(0),
 //    _jfsize_sblks(0),
 //    _lpmgr(),
     _emap(),
     _tmap(),
 //    _rrfc(&_lpmgr),
 //    _wrfc(&_lpmgr),
-    _rmgr(this, _emap, _tmap/*, _rrfc*/),
-    _wmgr(this, _emap, _tmap/*, _wrfc*/),
+//    _rmgr(this, _emap, _tmap/*, _rrfc*/),
+    _wmgr(this, _emap, _tmap, _linearFileController/*, _wrfc*/),
     _rcvdat()
 {}
 
@@ -90,11 +93,7 @@ jcntl::~jcntl()
         try { stop(true); }
         catch (const jexception& e) { std::cerr << e << std::endl; }
 //    _lpmgr.finalize();
-    if (_jfcp) {
-        _jfcp->finalize();
-        delete _jfcp;
-        _jfcp = 0;
-    }
+    _linearFileController.finalize();
 }
 
 void
@@ -109,11 +108,7 @@ jcntl::initialize(/*const uint16_t num_j
     _emap.clear();
     _tmap.clear();
 
-    if (_jfcp) {
-        _jfcp->finalize();
-        delete _jfcp;
-        _jfcp = 0;
-    }
+    _linearFileController.finalize();
 
 //    _lpmgr.finalize();
 
@@ -129,14 +124,15 @@ jcntl::initialize(/*const uint16_t num_j
 
     // Clear any existing journal files
     _jdir.clear_dir();
-//    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl);
+//    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
 
-    _jfcp = new JournalFileController(_jdir.dirname(), efpp);
-    _jfcp->pullEmptyFileFromEfp(1, 4096, _jid);
+    _linearFileController.initialize(_jdir.dirname(), efpp);
+    _linearFileController.pullEmptyFileFromEfp();
+    std::cout << _linearFileController.status(2);
 //    _wrfc.initialize(_jfsize_sblks);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(0);
-    _rmgr.initialize(cbp);
+//    _rmgr.initialize(cbp);
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
@@ -146,11 +142,12 @@ jcntl::initialize(/*const uint16_t num_j
 }
 
 void
-jcntl::recover(/*const uint16_t num_jfiles, const bool ae, const uint16_t ae_max_jfiles,
-        const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
-//         const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const std::vector<std::string>* prep_txn_list_ptr,
-        aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr,
-        uint64_t& highest_rid)
+jcntl::recover(EmptyFilePoolManager* efpm,
+               const uint16_t wcache_num_pages,
+               const uint32_t wcache_pgsize_sblks,
+               aio_callback* const cbp,
+               const std::vector<std::string>* prep_txn_list_ptr,
+               uint64_t& highest_rid)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -159,6 +156,8 @@ jcntl::recover(/*const uint16_t num_jfil
     _emap.clear();
     _tmap.clear();
 
+    _linearFileController.finalize();
+
 //    _lpmgr.finalize();
 
 //    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
@@ -171,18 +170,19 @@ jcntl::recover(/*const uint16_t num_jfil
     _jdir.verify_dir();
 //    _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
 
-    rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
+    rcvr_janalyze(prep_txn_list_ptr, efpm);
     highest_rid = _rcvdat._h_rid;
-    if (_rcvdat._jfull)
-        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
-    this->log(LOG_DEBUG, _jid, _rcvdat.to_log(_jid));
+//    if (_rcvdat._jfull)
+//        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
+    this->log(/*LOG_DEBUG*/LOG_INFO, _jid, _rcvdat.to_log(_jid));
 
 //    _lpmgr.recover(_rcvdat, this, &new_fcntl);
 
+    _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr);
 //    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(_rcvdat.ffid());
-    _rmgr.initialize(cbp);
+//    _rmgr.initialize(cbp);
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS,
             (_rcvdat._lffull ? 0 : _rcvdat._eo));
 
@@ -200,7 +200,7 @@ jcntl::recover_complete()
 //    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
 //    _rrfc.initialize();
 //    _rrfc.set_findex(_rcvdat.ffid());
-    _rmgr.recover_complete();
+//    _rmgr.recover_complete();
     _readonly_flag = false;
 }
 
@@ -208,7 +208,7 @@ void
 jcntl::delete_jrnl_files()
 {
     stop(true); // wait for AIO to complete
-    _jfcp->purgeFilesToEfp();
+    _linearFileController.purgeFilesToEfp();
     _jdir.delete_dir();
 }
 
@@ -287,8 +287,55 @@ jcntl::discard_data_record(data_tok* con
 
 iores
 jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
+        bool& transient, bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/)
 {
+    if (!dtokp->is_readable()) {
+        std::ostringstream oss;
+        oss << std::hex << std::setfill('0');
+        oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+        oss << "; dtok_wstate=" << dtokp->wstate_str();
+        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
+    }
+    std::vector<uint64_t> ridl;
+    _emap.rid_list(ridl);
+    enq_map::emap_data_struct_t eds;
+    for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
+        short res = _emap.get_data(*i, eds);
+        if (res == enq_map::EMAP_OK) {
+            std::ifstream ifs(_rcvdat._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
+            if (!ifs.good()) {
+                std::ostringstream oss;
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "read_data_record");
+            }
+            ifs.seekg(eds._file_posn, std::ifstream::beg);
+            ::enq_hdr_t eh;
+            ifs.read((char*)&eh, sizeof(::enq_hdr_t));
+            if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
+                std::ostringstream oss;
+                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _rcvdat._fm[eds._pfid] << " file_posn=" << eds._file_posn;
+                throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
+            }
+            dsize = eh._dsize;
+            xidsize = eh._xidsize;
+            transient = ::is_enq_transient(&eh);
+            external = ::is_enq_external(&eh);
+            if (xidsize) {
+                *xidpp = ::malloc(xidsize);
+                ifs.read((char*)(*xidpp), xidsize);
+            } else {
+                *xidpp = 0;
+            }
+            if (dsize) {
+                *datapp = ::malloc(dsize);
+                ifs.read((char*)(*datapp), dsize);
+            } else {
+                *datapp = 0;
+            }
+        }
+    }
+/*
     check_rstatus("read_data");
     iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
     if (res == RHM_IORES_RCINVALID)
@@ -302,6 +349,8 @@ jcntl::read_data_record(void** const dat
         res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
     }
     return res;
+*/
+    return RHM_IORES_SUCCESS;
 }
 
 iores
@@ -370,11 +419,13 @@ jcntl::get_wr_events(timespec* const tim
     return res;
 }
 
+/*
 int32_t
 jcntl::get_rd_events(timespec* const timeout)
 {
     return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
 }
+*/
 
 void
 jcntl::stop(const bool block_till_aio_cmpl)
@@ -386,26 +437,13 @@ jcntl::stop(const bool block_till_aio_cm
     _stop_flag = true;
     if (!_readonly_flag)
         flush(block_till_aio_cmpl);
-//    _rrfc.finalize();
-//    _lpmgr.finalize();
+    _linearFileController.finalize();
 }
 
-/*
-uint16_t
-jcntl::get_earliest_fid()
-{
-    uint16_t ffid = _wrfc.earliest_index();
-    uint16_t fid = _wrfc.index();
-    while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_pfid_cnt(ffid) == 0 && ffid != fid)
-    {
-        if (++ffid >= _lpmgr.num_jfiles())
-            ffid = 0;
-    }
-    if (!_rrfc.is_active())
-        _rrfc.set_findex(ffid);
-    return ffid;
+LinearFileController&
+jcntl::getLinearFileControllerRef() {
+    return _linearFileController;
 }
-*/
 
 iores
 jcntl::flush(const bool block_till_aio_cmpl)
@@ -496,22 +534,6 @@ jcntl::check_rstatus(const char* fn_name
         throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
 }
 
-/*
-void
-jcntl::write_infofile() const
-{
-    timespec ts;
-    if (::clock_gettime(CLOCK_REALTIME, &ts))
-    {
-        std::ostringstream oss;
-        oss << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl", "write_infofile");
-    }
-    jinf ji(_jid, _jdir.dirname(), _base_filename, _lpmgr.num_jfiles(), _lpmgr.is_ae(), _lpmgr.ae_max_jfiles(),
-            _jfsize_sblks, _wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts);
-    ji.write();
-}
-*/
 
 void
 jcntl::aio_cmpl_wait()
@@ -530,6 +552,7 @@ jcntl::aio_cmpl_wait()
     }
 }
 
+
 bool
 jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
 {
@@ -569,64 +592,71 @@ jcntl::handle_aio_wait(const iores res, 
     return false;
 }
 
-void
-jcntl::rcvr_janalyze(rcvdat& /*rd*/, const std::vector<std::string>* /*prep_txn_list_ptr*/)
-{
-/*
-    jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
 
-    // If the number of files does not tie up with the jinf file from the journal being recovered,
-    // use the jinf data.
-    if (rd._njf != ji.num_jfiles())
-    {
-        std::ostringstream oss;
-        oss << "Recovery found " << ji.num_jfiles() <<
-                " files (different from --num-jfiles value of " << rd._njf << ").";
-        this->log(LOG_INFO, oss.str());
-        rd._njf = ji.num_jfiles();
-        _rcvdat._enq_cnt_list.resize(rd._njf);
-    }
-    _emap.set_num_jfiles(rd._njf);
-    _tmap.set_num_jfiles(rd._njf);
-    if (_jfsize_sblks != ji.jfsize_sblks())
-    {
+// static
+void
+jcntl::rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName) {
+    const std::size_t headerBlockSize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024;
+    char buffer[headerBlockSize];
+    std::ifstream ifs(jfn.c_str(), std::ifstream::in | std::ifstream::binary);
+    if (!ifs.good()) {
         std::ostringstream oss;
-        oss << "Recovery found file size = " << (ji.jfsize_sblks() / JRNL_RMGR_PAGE_SIZE) <<
-                " (different from --jfile-size-pgs value of " <<
-                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) << ").";
-        this->log(LOG_INFO, oss.str());
-        _jfsize_sblks = ji.jfsize_sblks();
+        oss << "File=" << jfn;
+        throw jexception(jerrno::JERR_JCNTL_OPENRD, oss.str(), "jcntl", "rcvr_read_jfile");
     }
-    if (_jdir.dirname().compare(ji.jdir()))
-    {
+    ifs.read(buffer, headerBlockSize);
+    if (!ifs) {
+        std::streamsize s = ifs.gcount();
+        ifs.close();
         std::ostringstream oss;
-        oss << "Journal file location change: original = \"" << ji.jdir() <<
-                "\"; current = \"" << _jdir.dirname() << "\"";
-        this->log(LOG_WARN, oss.str());
-        ji.set_jdir(_jdir.dirname());
+        oss << "File=" << jfn << "; attempted_read_size=" << headerBlockSize << "; actual_read_size=" << s;
+        throw jexception(jerrno::JERR_JCNTL_READ, oss.str(), "jcntl", "rcvr_read_jfile");
     }
+    ifs.close();
+    ::memcpy(fh, buffer, sizeof(::file_hdr_t));
+    queueName.assign(buffer + sizeof(::file_hdr_t), fh->_queue_name_len);
+}
 
-    try
-    {
-        rd._ffid = ji.get_first_pfid();
-        rd._lfid = ji.get_last_pfid();
-        rd._owi = ji.get_initial_owi();
-        rd._frot = ji.get_frot();
-        rd._jempty = false;
-        ji.get_normalized_pfid_list(rd._fid_list); // _pfid_list
+
+void jcntl::rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp) {
+    std::string headerQueueName;
+    ::file_hdr_t fh;
+    efpIdentity_t efpid;
+//    std::map<uint64_t, std::string> fileMap;
+    std::vector<std::string> dirList;
+    jdir::read_dir(_jdir.dirname(), dirList, false, true, false, true);
+    for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+        rcvr_read_jfile(*i, &fh, headerQueueName);
+        if (headerQueueName.compare(_jid) != 0) {
+            std::ostringstream oss;
+            oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
+            log(LOG_WARN, _jid, oss.str());
+        } else {
+            _rcvdat._fm[fh._file_number] = *i;
+            efpid.first = fh._efp_partition;
+            efpid.second = fh._file_size_kib;
+        }
     }
-    catch (const jexception& e)
-    {
-        if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY) throw;
+    _rcvdat._jfl.clear();
+    for (std::map<uint64_t, std::string>::iterator i=_rcvdat._fm.begin(); i!=_rcvdat._fm.end(); ++i) {
+        _rcvdat._jfl.push_back(i->second);
     }
+    _rcvdat._enq_cnt_list.resize(_rcvdat._jfl.size(), 0);
+    _emptyFilePoolPtr = efpmp->getEmptyFilePool(efpid);
+}
+
+
+void jcntl::rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp) {
+    // Analyze file headers of existing journal files
+    rcvr_analyze_fhdrs(efpmp);
 
     // Restore all read and write pointers and transactions
-    if (!rd._jempty)
+    if (!_rcvdat._jempty)
     {
-        uint16_t fid = rd._ffid;
+        uint16_t fid = 0;
         std::ifstream ifs;
-        bool lowi = rd._owi; // local copy of owi to be used during analysis
-        while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
+        //bool lowi = rd._owi; // local copy of owi to be used during analysis
+        while (rcvr_get_next_record(fid, &ifs)) ;
         if (ifs.is_open()) ifs.close();
 
         // Remove all txns from tmap that are not in the prepared list
@@ -645,7 +675,7 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, con
                     for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
                     {
                         if (i->_enq_flag) // enq op - decrement enqueue count
-                            rd._enq_cnt_list[i->_pfid]--;
+                            _rcvdat._enq_cnt_list[i->_pfid]--;
                         else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
                         {
                             int16_t ret = _emap.unlock(i->_drid);
@@ -662,18 +692,14 @@ jcntl::rcvr_janalyze(rcvdat& /*rd*/, con
             }
         }
 
-        // Check for file full condition - add one to _jfsize_sblks to account for file header
-        rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE;
-
-        // Check for journal full condition
-        uint16_t next_wr_fid = (rd._lfid + 1) % rd._njf;
-        rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid] && rd._lffull;
+        // Check for file full condition
+        _rcvdat._lffull = _rcvdat._eo == _emptyFilePoolPtr->fileSize_kib() * 1024;
     }
-*/
 }
 
+
 bool
-jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd)
+jcntl::rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp)
 {
     std::size_t cum_size_read = 0;
     void* xidp = 0;
@@ -685,7 +711,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
     {
         if (!ifsp->is_open())
         {
-            if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true))
+            if (!jfile_cycle(fid, ifsp, true))
                 return false;
         }
         file_pos = ifsp->tellg();
@@ -694,7 +720,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
             hdr_ok = true;
         else
         {
-            if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, true))
+            if (!jfile_cycle(fid, ifsp, true))
                 return false;
         }
     }
@@ -703,13 +729,14 @@ jcntl::rcvr_get_next_record(uint16_t& fi
     {
         case QLS_ENQ_MAGIC:
             {
+                std::cout << " e" << std::flush;
                 enq_rec er;
                 uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
-                if (!decode(er, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+                if (!decode(er, fid, ifsp, cum_size_read, h, file_pos))
                     return false;
                 if (!er.is_transient()) // Ignore transient msgs
                 {
-                    rd._enq_cnt_list[start_fid]++;
+                    _rcvdat._enq_cnt_list[start_fid]++;
                     if (er.xid_size())
                     {
                         er.get_xid(&xidp);
@@ -726,7 +753,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
                     }
                     else
                     {
-                        if (_emap.insert_pfid(h._rid, start_fid) < enq_map::EMAP_OK) // fail
+                        if (_emap.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) // fail
                         {
                             // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                             std::ostringstream oss;
@@ -739,9 +766,10 @@ jcntl::rcvr_get_next_record(uint16_t& fi
             break;
         case QLS_DEQ_MAGIC:
             {
+                std::cout << " d" << std::flush;
                 deq_rec dr;
                 uint16_t start_fid = fid; // fid may increment in decode() if record folds over file boundary
-                if (!decode(dr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+                if (!decode(dr, fid, ifsp, cum_size_read, h, file_pos))
                     return false;
                 if (dr.xid_size())
                 {
@@ -762,16 +790,17 @@ jcntl::rcvr_get_next_record(uint16_t& fi
                 }
                 else
                 {
-                    int16_t enq_fid = _emap.get_remove_pfid(dr.deq_rid(), true);
-                    if (enq_fid >= enq_map::EMAP_OK) // ignore not found error
-                        rd._enq_cnt_list[enq_fid]--;
+                    int16_t enq_fid;
+                    if (_emap.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+                        _rcvdat._enq_cnt_list[enq_fid]--;
                 }
             }
             break;
         case QLS_TXA_MAGIC:
             {
+                std::cout << " a" << std::flush;
                 txn_rec ar;
-                if (!decode(ar, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+                if (!decode(ar, fid, ifsp, cum_size_read, h, file_pos))
                     return false;
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
@@ -781,7 +810,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
                 for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
                     if (itr->_enq_flag)
-                        rd._enq_cnt_list[itr->_pfid]--;
+                        _rcvdat._enq_cnt_list[itr->_pfid]--;
                     else
                         _emap.unlock(itr->_drid); // ignore not found error
                 }
@@ -790,8 +819,9 @@ jcntl::rcvr_get_next_record(uint16_t& fi
             break;
         case QLS_TXC_MAGIC:
             {
+                std::cout << " t" << std::flush;
                 txn_rec cr;
-                if (!decode(cr, fid, ifsp, cum_size_read, h/*, lowi*/, rd, file_pos))
+                if (!decode(cr, fid, ifsp, cum_size_read, h, file_pos))
                     return false;
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
@@ -802,7 +832,7 @@ jcntl::rcvr_get_next_record(uint16_t& fi
                 {
                     if (itr->_enq_flag) // txn enqueue
                     {
-                        if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
+                        if (_emap.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) // fail
                         {
                             // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                             std::ostringstream oss;
@@ -812,9 +842,9 @@ jcntl::rcvr_get_next_record(uint16_t& fi
                     }
                     else // txn dequeue
                     {
-                        int16_t enq_fid = _emap.get_remove_pfid(itr->_drid, true);
-                        if (enq_fid >= enq_map::EMAP_OK)
-                            rd._enq_cnt_list[enq_fid]--;
+                        int16_t enq_fid;
+                        if (_emap.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+                            _rcvdat._enq_cnt_list[enq_fid]--;
                     }
                 }
                 std::free(xidp);
@@ -822,32 +852,40 @@ jcntl::rcvr_get_next_record(uint16_t& fi
             break;
         case QLS_EMPTY_MAGIC:
             {
+                std::cout << " x" << std::flush;
                 uint32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr_t));
-                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr_t));
+                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE_BYTES - sizeof(rec_hdr_t));
                 assert(!ifsp->fail() && !ifsp->bad());
-                if (!jfile_cycle(fid, ifsp/*, lowi*/, rd, false))
+                if (!jfile_cycle(fid, ifsp, false))
                     return false;
             }
             break;
         case 0:
-            check_journal_alignment(fid, file_pos, rd);
+            std::cout << " 0" << std::endl << std::flush;
+            check_journal_alignment(fid, file_pos);
             return false;
         default:
+            std::cout << " ?" << std::endl << std::flush;
             // Stop as this is the overwrite boundary.
-            check_journal_alignment(fid, file_pos, rd);
+            check_journal_alignment(fid, file_pos);
             return false;
     }
     return true;
 }
 
+
 bool
 jcntl::decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
-        rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& file_offs)
+        rec_hdr_t& h, std::streampos& file_offs)
 {
     uint16_t start_fid = fid;
     std::streampos start_file_offs = file_offs;
-//    if (!check_owi(fid, h, lowi, rd, file_offs))
-//        return false;
+
+    if (_rcvdat._h_rid == 0)
+        _rcvdat._h_rid = h._rid;
+    else if (h._rid - _rcvdat._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+        _rcvdat._h_rid = h._rid;
+
     bool done = false;
     while (!done)
     {
@@ -860,64 +898,60 @@ jcntl::decode(jrec& rec, uint16_t& fid, 
 //                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
 // Tried this, but did not work
 //             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
-            check_journal_alignment(start_fid, start_file_offs, rd);
+            check_journal_alignment(start_fid, start_file_offs);
 //             rd._lfid = start_fid;
             return false;
         }
-        if (!done && !jfile_cycle(fid, ifsp/*, lowi*/, rd, false))
+        if (!done && !jfile_cycle(fid, ifsp, /*lowi, rd,*/ false))
         {
-            check_journal_alignment(start_fid, start_file_offs, rd);
+            check_journal_alignment(start_fid, start_file_offs);
             return false;
         }
     }
     return true;
 }
 
+
 bool
-jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd, const bool jump_fro)
+jcntl::jfile_cycle(uint16_t& fid, std::ifstream* ifsp, const bool jump_fro)
 {
     if (ifsp->is_open())
     {
         if (ifsp->eof() || !ifsp->good())
         {
             ifsp->clear();
-            rd._eo = ifsp->tellg(); // remember file offset before closing
-            assert(rd._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
+            _rcvdat._eo = ifsp->tellg(); // remember file offset before closing
+            assert(_rcvdat._eo != std::numeric_limits<std::size_t>::max()); // Check for error code -1
             ifsp->close();
-            if (++fid >= rd._njf)
-            {
-                fid = 0;
-//                lowi = !lowi; // Flip local owi
-            }
-            if (fid == rd._ffid) // used up all journal files
+            if (++fid == _rcvdat._jfl.size()) // used up all known journal files
                 return false;
         }
     }
     if (!ifsp->is_open())
     {
-        std::ostringstream oss;
-        oss << _jdir.dirname() << "/" /*<< _base_filename*/ << "."; // TODO - linear journal name
-        oss << std::hex << std::setfill('0') << std::setw(4) << fid << QLS_JRNL_FILE_EXTENSION;
         ifsp->clear(); // clear eof flag, req'd for older versions of c++
-        ifsp->open(oss.str().c_str(), std::ios_base::in | std::ios_base::binary);
+        ifsp->open(_rcvdat._jfl[fid].c_str(), std::ios_base::in | std::ios_base::binary);
         if (!ifsp->good())
-            throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "jfile_cycle");
+            throw jexception(jerrno::JERR__FILEIO, _rcvdat._jfl[fid], "jcntl", "jfile_cycle");
 
         // Read file header
+        std::cout << " F" << fid << std::flush;
         file_hdr_t fhdr;
         ifsp->read((char*)&fhdr, sizeof(fhdr));
         assert(ifsp->good());
         if (fhdr._rhdr._magic == QLS_FILE_MAGIC)
         {
-//            assert(fhdr._lfid == fid);
-            if (!rd._fro)
-                rd._fro = fhdr._fro;
-            std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE;
+            if (!_rcvdat._fro)
+                _rcvdat._fro = fhdr._fro;
+            std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_SBLK_SIZE_BYTES;
             ifsp->seekg(foffs);
         }
         else
         {
             ifsp->close();
+            if (fid == 0) {
+                _rcvdat._jempty = true;
+            }
             return false;
         }
     }
@@ -925,46 +959,17 @@ jcntl::jfile_cycle(uint16_t& fid, std::i
 }
 
 
-/*
-bool
-jcntl::check_owi(const uint16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos)
-{
-    if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
-    {
-        uint16_t expected_fid = rd._ffid ? rd._ffid - 1 : rd._njf - 1;
-        if (fid == expected_fid)
-        {
-            check_journal_alignment(fid, file_pos, rd);
-            return false;
-        }
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-        oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
-        oss << " foffs=0x" << std::setw(8) << file_pos;
-        oss << " expected_fid=0x" << std::setw(4) << expected_fid;
-        throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
-                "check_owi");
-    }
-    if (rd._h_rid == 0)
-        rd._h_rid = h._rid;
-    else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
-        rd._h_rid = h._rid;
-    return true;
-}
-*/
-
-
 void
-jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos, rcvdat& rd)
+jcntl::check_journal_alignment(const uint16_t fid, std::streampos& file_pos/*, rcvdat& rd*/)
 {
-    unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE;
+    unsigned sblk_offs = file_pos % JRNL_SBLK_SIZE_BYTES;
     if (sblk_offs)
     {
         {
             std::ostringstream oss;
             oss << std::hex << "Bad record alignment found at fid=0x" << fid;
             oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
-            oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
+            oss << (JRNL_SBLK_SIZE_DBLKS - (sblk_offs/JRNL_DBLK_SIZE_BYTES)) << " filler record(s) required.";
             this->log(LOG_WARN, _jid, oss.str());
         }
         const uint32_t xmagic = QLS_EMPTY_MAGIC;
@@ -976,17 +981,17 @@ jcntl::check_journal_alignment(const uin
         if (!ofsp.good())
             throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
         ofsp.seekp(file_pos);
-        void* buff = std::malloc(JRNL_DBLK_SIZE);
+        void* buff = std::malloc(JRNL_DBLK_SIZE_BYTES);
         assert(buff != 0);
         std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
         // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
         // situation (i.e. performance is not an issue), and it makes the location of the write
         // clear should inspection of the file be required.
-        std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+        std::memset((char*)buff + sizeof(xmagic), QLS_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic));
 
-        while (file_pos % JRNL_SBLK_SIZE)
+        while (file_pos % JRNL_SBLK_SIZE_BYTES)
         {
-            ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+            ofsp.write((const char*)buff, JRNL_DBLK_SIZE_BYTES);
             assert(!ofsp.fail());
             std::ostringstream oss;
             oss << std::hex << "Recover phase write: Wrote filler record: fid=0x" << fid << " offs=0x" << file_pos;
@@ -995,12 +1000,9 @@ jcntl::check_journal_alignment(const uin
         }
         ofsp.close();
         std::free(buff);
-        rd._lfid = fid;
-//        if (!rd._frot)
-//            rd._ffid = (fid + 1) % rd._njf;
         this->log(LOG_INFO, _jid, "Bad record alignment fixed.");
     }
-    rd._eo = file_pos;
+    _rcvdat._eo = file_pos;
 }
 
 }}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.h Mon Oct  7 18:39:24 2013
@@ -19,8 +19,8 @@
  *
  */
 
-#ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H
-#define QPID_LEGACYSTORE_JRNL_JCNTL_H
+#ifndef QPID_LINEARSTORE_JRNL_JCNTL_H
+#define QPID_LINEARSTORE_JRNL_JCNTL_H
 
 namespace qpid
 {
@@ -31,6 +31,7 @@ namespace qls_jrnl
 
 #include <cstddef>
 #include <deque>
+#include <qpid/linearstore/jrnl/LinearFileController.h>
 #include <qpid/linearstore/jrnl/JournalLog.h>
 #include "qpid/linearstore/jrnl/jdir.h"
 //#include "qpid/linearstore/jrnl/fcntl.h"
@@ -46,8 +47,8 @@ namespace qpid
 {
 namespace qls_jrnl
 {
-class EmptyFilePool;
-class JournalFileController;
+    class EmptyFilePool;
+    class EmptyFilePoolManager;
 
     /**
     * \brief Access and control interface for the journal. This is the top-level class for the
@@ -127,17 +128,14 @@ class JournalFileController;
         //bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
 
         // Journal control structures
-        JournalFileController* _jfcp;///< Journal File Controller
-        //uint32_t _jfsize_sblks;    ///< Journal file size in sblks
-        //lpmgr _lpmgr;               ///< LFID-PFID manager tracks inserted journal files
-        enq_map _emap;              ///< Enqueue map for low water mark management
-        txn_map _tmap;              ///< Transaction map open transactions
-        //rrfc _rrfc;                 ///< Read journal rotating file controller
-        //wrfc _wrfc;                 ///< Write journal rotating file controller
-        rmgr _rmgr;                 ///< Read page manager which manages AIO
-        wmgr _wmgr;                 ///< Write page manager which manages AIO
-        rcvdat _rcvdat;             ///< Recovery data used for recovery
-        smutex _wr_mutex;           ///< Mutex for journal writes
+        LinearFileController _linearFileController; ///< Linear File Controller
+        EmptyFilePool* _emptyFilePoolPtr;           ///< Pointer to Empty File Pool for this queue
+        enq_map _emap;                              ///< Enqueue map for low water mark management
+        txn_map _tmap;                              ///< Transaction map open transactions
+        //rmgr _rmgr;                                 ///< Read page manager which manages AIO
+        wmgr _wmgr;                                 ///< Write page manager which manages AIO
+        rcvdat _rcvdat;                             ///< Recovery data used for recovery
+        smutex _wr_mutex;                           ///< Mutex for journal writes
 
     public:
         static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
@@ -230,9 +228,12 @@ class JournalFileController;
         *
         * \exception TODO
         */
-        void recover(/*const uint16_t num_jfiles, const bool auto_expand, const uint16_t ae_max_jfiles,
-                const uint32_t jfsize_sblks,*/ const uint16_t wcache_num_pages, const uint32_t wcache_pgsize_sblks,
-                aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, uint64_t& highest_rid);
+        void recover(EmptyFilePoolManager* efpm,
+                     const uint16_t wcache_num_pages,
+                     const uint32_t wcache_pgsize_sblks,
+                     aio_callback* const cbp,
+                     const std::vector<std::string>* prep_txn_list_ptr,
+                     uint64_t& highest_rid);
 
         /**
         * \brief Notification to the journal that recovery is complete and that normal operation
@@ -532,7 +533,7 @@ class JournalFileController;
         * operations, but if these operations cease, then this call needs to be made to force the
         * processing of any outstanding AIO operations.
         */
-        int32_t get_rd_events(timespec* const timeout);
+//        int32_t get_rd_events(timespec* const timeout);
 
         /**
         * \brief Stop the journal from accepting any further requests to read or write data.
@@ -554,11 +555,11 @@ class JournalFileController;
         */
         iores flush(const bool block_till_aio_cmpl = false);
 
-        inline uint32_t get_enq_cnt() const { return _emap.size(); }
+        inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: Thread safe?
 
         inline uint32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }
 
-        inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
+//        inline uint32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }
 
         inline uint32_t get_wr_outstanding_aio_dblks() const;
                 /*{ return _wrfc.aio_outstanding_dblks(); }*/
@@ -575,6 +576,7 @@ class JournalFileController;
 //        inline uint16_t get_rd_fid() const { return _rrfc.index(); }
 //        inline uint16_t get_wr_fid() const { return _wrfc.index(); }
 //        uint16_t get_earliest_fid();
+        LinearFileController& getLinearFileControllerRef();
 
         /**
         * \brief Check if a particular rid is enqueued. Note that this function will return
@@ -692,22 +694,25 @@ class JournalFileController;
         /**
         * \brief Analyze journal for recovery.
         */
-        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
+        static void rcvr_read_jfile(const std::string& jfn, ::file_hdr_t* fh, std::string& queueName);
 
-        bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd);
+        void rcvr_analyze_fhdrs(EmptyFilePoolManager* efpmp);
+
+        void rcvr_janalyze(const std::vector<std::string>* prep_txn_list_ptr, EmptyFilePoolManager* efpmp);
+
+        bool rcvr_get_next_record(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi, rcvdat& rd*/);
 
         bool decode(jrec& rec, uint16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
-                rec_hdr_t& h/*, bool& lowi*/, rcvdat& rd, std::streampos& rec_offset);
+                rec_hdr_t& h, /*bool& lowi, rcvdat& rd,*/ std::streampos& rec_offset);
 
-        bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp/*, bool& lowi*/, rcvdat& rd,
-                const bool jump_fro);
+        bool jfile_cycle(uint16_t& fid, std::ifstream* ifsp, /*bool& lowi, rcvdat& rd,*/ const bool jump_fro);
 
         //bool check_owi(const uint16_t fid, rec_hdr_t& h, bool& lowi, rcvdat& rd,
         //        std::streampos& read_pos);
 
-        void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset, rcvdat& rd);
+        void check_journal_alignment(const uint16_t fid, std::streampos& rec_offset/*, rcvdat& rd*/);
     };
 
 }}
 
-#endif // ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H
+#endif // ifndef QPID_LINEARSTORE_JRNL_JCNTL_H

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.cpp Mon Oct  7 18:39:24 2013
@@ -409,7 +409,7 @@ jdir::exists(const std::string& name)
 }
 
 void
-jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links) {
+jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn) {
     struct stat s;
     if (is_dir(name)) {
         DIR* dir = ::opendir(name.c_str());
@@ -425,8 +425,13 @@ jdir::read_dir(const std::string& name, 
                         oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
                         throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
                     }
-                    if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links))
-                        dir_list.push_back(entry->d_name);
+                    if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) {
+                        if (return_fqfn) {
+                            dir_list.push_back(name + "/" + entry->d_name);
+                        } else {
+                            dir_list.push_back(entry->d_name);
+                        }
+                    }
                 }
             }
         }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jdir.h Mon Oct  7 18:39:24 2013
@@ -336,7 +336,7 @@ namespace qls_jrnl
         */
         static bool exists(const std::string& name);
 
-        static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links);
+        static void read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn);
 
         /**
         * \brief Stream operator

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Oct  7 18:39:24 2013
@@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__TIMEOUT    
 const uint32_t jerrno::JERR__UNEXPRESPONSE       = 0x0108;
 const uint32_t jerrno::JERR__RECNFOUND           = 0x0109;
 const uint32_t jerrno::JERR__NOTIMPL             = 0x010a;
+const uint32_t jerrno::JERR__NULL                = 0x010b;
 
 // class jcntl
 const uint32_t jerrno::JERR_JCNTL_STOPPED        = 0x0200;
@@ -49,8 +50,11 @@ const uint32_t jerrno::JERR_JCNTL_READON
 const uint32_t jerrno::JERR_JCNTL_AIOCMPLWAIT    = 0x0202;
 const uint32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC   = 0x0203;
 const uint32_t jerrno::JERR_JCNTL_NOTRECOVERED   = 0x0204;
-const uint32_t jerrno::JERR_JCNTL_RECOVERJFULL   = 0x0205;
-const uint32_t jerrno::JERR_JCNTL_OWIMISMATCH    = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_OPENRD         = 0x0205;
+const uint32_t jerrno::JERR_JCNTL_READ           = 0x0206;
+const uint32_t jerrno::JERR_JCNTL_ENQSTATE       = 0x0207;
+const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR  = 0x0208;
+
 
 // class jdir
 const uint32_t jerrno::JERR_JDIR_NOTDIR          = 0x0300;
@@ -65,21 +69,14 @@ const uint32_t jerrno::JERR_JDIR_STAT   
 const uint32_t jerrno::JERR_JDIR_UNLINK          = 0x0309;
 const uint32_t jerrno::JERR_JDIR_BADFTYPE        = 0x030a;
 
-// class fcntl
-//const uint32_t jerrno::JERR_FCNTL_OPENWR       = 0x0400;
-//const uint32_t jerrno::JERR_FCNTL_WRITE        = 0x0401;
-//const uint32_t jerrno::JERR_FCNTL_CLOSE        = 0x0402;
-//const uint32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403;
-//const uint32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404;
-//const uint32_t jerrno::JERR_FCNTL_RDOFFSOVFL   = 0x0405;
-
-// class lfmgr
-//const uint32_t jerrno::JERR_LFMGR_BADAEFNUMLIM = 0x0500;
-//const uint32_t jerrno::JERR_LFMGR_AEFNUMLIMIT  = 0x0501;
-//const uint32_t jerrno::JERR_LFMGR_AEDISABLED   = 0x0502;
+// class JournalFile
+const uint32_t jerrno::JERR_JNLF_OPEN            = 0x0400;
+const uint32_t jerrno::JERR_JNLF_CLOSE           = 0x0401;
+const uint32_t jerrno::JERR_JNLF_FILEOFFSOVFL    = 0x0402;
+const uint32_t jerrno::JERR_JNLF_CMPLOFFSOVFL    = 0x0403;
 
-// class rrfc
-//const uint32_t jerrno::JERR_RRFC_OPENRD        = 0x0600;
+// class LinearFileController
+const uint32_t jerrno::JERR_LFCR_SEQNUMNOTFOUND  = 0x0500;
 
 // class jrec, enq_rec, deq_rec, txn_rec
 const uint32_t jerrno::JERR_JREC_BADRECHDR       = 0x0700;
@@ -91,13 +88,14 @@ const uint32_t jerrno::JERR_WMGR_BADDTOK
 const uint32_t jerrno::JERR_WMGR_ENQDISCONT      = 0x0803;
 const uint32_t jerrno::JERR_WMGR_DEQDISCONT      = 0x0804;
 const uint32_t jerrno::JERR_WMGR_DEQRIDNOTENQ    = 0x0805;
+const uint32_t jerrno::JERR_WMGR_BADFH           = 0x0806;
 
-// class rmgr
-const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC    = 0x0900;
-const uint32_t jerrno::JERR_RMGR_RIDMISMATCH     = 0x0901;
-//const uint32_t jerrno::JERR_RMGR_FIDMISMATCH   = 0x0902;
-const uint32_t jerrno::JERR_RMGR_ENQSTATE        = 0x0903;
-const uint32_t jerrno::JERR_RMGR_BADRECTYPE      = 0x0904;
+//// class rmgr
+//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC    = 0x0900;
+//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH     = 0x0901;
+////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH   = 0x0902;
+//const uint32_t jerrno::JERR_RMGR_ENQSTATE        = 0x0903;
+//const uint32_t jerrno::JERR_RMGR_BADRECTYPE      = 0x0904;
 
 // class data_tok
 const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE    = 0x0a00;
@@ -108,19 +106,6 @@ const uint32_t jerrno::JERR_MAP_DUPLICAT
 const uint32_t jerrno::JERR_MAP_NOTFOUND         = 0x0b01;
 const uint32_t jerrno::JERR_MAP_LOCKED           = 0x0b02;
 
-// class jinf
-//const uint32_t jerrno::JERR_JINF_CVALIDFAIL    = 0x0c00;
-//const uint32_t jerrno::JERR_JINF_NOVALUESTR    = 0x0c01;
-//const uint32_t jerrno::JERR_JINF_BADVALUESTR   = 0x0c02;
-//const uint32_t jerrno::JERR_JINF_JDATEMPTY     = 0x0c03;
-//const uint32_t jerrno::JERR_JINF_TOOMANYFILES  = 0x0c04;
-//const uint32_t jerrno::JERR_JINF_INVALIDFHDR   = 0x0c05;
-//const uint32_t jerrno::JERR_JINF_STAT          = 0x0c06;
-//const uint32_t jerrno::JERR_JINF_NOTREGFILE    = 0x0c07;
-//const uint32_t jerrno::JERR_JINF_BADFILESIZE   = 0x0c08;
-//const uint32_t jerrno::JERR_JINF_OWIBAD        = 0x0c09;
-//const uint32_t jerrno::JERR_JINF_ZEROLENFILE   = 0x0c0a;
-
 // EFP errors
 const uint32_t jerrno::JERR_EFP_BADPARTITIONNAME = 0x0d01;
 const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR  = 0x0d02;
@@ -150,6 +135,7 @@ jerrno::__init()
     _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event.";
     _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
     _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
+    _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer";
 
     // class jcntl
     _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
@@ -157,8 +143,10 @@ jerrno::__init()
     _err_map[JERR_JCNTL_AIOCMPLWAIT] = "JERR_JCNTL_AIOCMPLWAIT: Timeout waiting for AIOs to complete.";
     _err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic.";
     _err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first.";
-    _err_map[JERR_JCNTL_RECOVERJFULL] = "JERR_JCNTL_RECOVERJFULL: Journal data files full, cannot write.";
-    _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: Overwrite Indicator (OWI) change found in unexpected location.";
+    _err_map[JERR_JCNTL_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
+    _err_map[JERR_JCNTL_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+    _err_map[JERR_JCNTL_ENQSTATE] = "JERR_JCNTL_ENQSTATE: Read error: Record not in ENQ state";
+    _err_map[JERR_JCNTL_INVALIDENQHDR] = "JERR_JCNTL_INVALIDENQHDR: Invalid ENQ header";
 
     // class jdir
     _err_map[JERR_JDIR_NOTDIR] = "JERR_JDIR_NOTDIR: Directory name exists but is not a directory.";
@@ -173,21 +161,14 @@ jerrno::__init()
     _err_map[JERR_JDIR_UNLINK] = "JERR_JDIR_UNLINK: File delete failed.";
     _err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type (stat mode).";
 
-    // class fcntl
-//    _err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for write.";
-//    _err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file.";
-//    _err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed.";
-//    _err_map[JERR_FCNTL_FILEOFFSOVFL] = "JERR_FCNTL_FILEOFFSOVFL: Attempted increase file offset past file size.";
-//    _err_map[JERR_FCNTL_CMPLOFFSOVFL] = "JERR_FCNTL_CMPLOFFSOVFL: Attempted increase completed file offset past submitted offset.";
-//    _err_map[JERR_FCNTL_RDOFFSOVFL] = "JERR_FCNTL_RDOFFSOVFL: Attempted increase read offset past write offset.";
-
-    // class lfmgr
-//    _err_map[JERR_LFMGR_BADAEFNUMLIM] = "JERR_LFMGR_BADAEFNUMLIM: Auto-expand file number limit lower than initial number of journal files.";
-//    _err_map[JERR_LFMGR_AEFNUMLIMIT] = "JERR_LFMGR_AEFNUMLIMIT: Exceeded auto-expand file number limit.";
-//    _err_map[JERR_LFMGR_AEDISABLED] = "JERR_LFMGR_AEDISABLED: Attempted to expand with auto-expand disabled.";
+    // class JournalFile
+    _err_map[JERR_JNLF_OPEN] = "JERR_JNLF_OPEN: Unable to open file for write";
+    _err_map[JERR_JNLF_CLOSE] = "JERR_JNLF_CLOSE: Unable to close file";
+    _err_map[JERR_JNLF_FILEOFFSOVFL] = "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size.";
+    _err_map[JERR_JNLF_CMPLOFFSOVFL] = "JERR_JNLF_CMPLOFFSOVFL: Attempted to increase completed file offset past submitted offset.";
 
-    // class rrfc
-//    _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for read.";
+    // class LinearFileController
+    _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found";
 
     // class jrec, enq_rec, deq_rec, txn_rec
     _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
@@ -199,13 +180,14 @@ jerrno::__init()
     _err_map[JERR_WMGR_ENQDISCONT] = "JERR_WMGR_ENQDISCONT: Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
     _err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
     _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
+    _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
 
-    // class rmgr
-    _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
-    _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
-    //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
-    _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
-    _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
+//    // class rmgr
+//    _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
+//    _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
+//    //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
+//    _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
+//    _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
 
     // class data_tok
     _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";
@@ -216,19 +198,6 @@ jerrno::__init()
     _err_map[JERR_MAP_NOTFOUND] = "JERR_MAP_NOTFOUND: Key not found in map.";
     _err_map[JERR_MAP_LOCKED] = "JERR_MAP_LOCKED: Record ID locked by a pending transaction.";
 
-    // class jinf
-//    _err_map[JERR_JINF_CVALIDFAIL] = "JERR_JINF_CVALIDFAIL: Journal compatibility validation failure.";
-//    _err_map[JERR_JINF_NOVALUESTR] = "JERR_JINF_NOVALUESTR: No value attribute found in jinf file.";
-//    _err_map[JERR_JINF_BADVALUESTR] = "JERR_JINF_BADVALUESTR: Bad format for value attribute in jinf file";
-//    _err_map[JERR_JINF_JDATEMPTY] = "JERR_JINF_JDATEMPTY: Journal data files empty.";
-//    _err_map[JERR_JINF_TOOMANYFILES] = "JERR_JINF_TOOMANYFILES: Too many journal data files.";
-//    _err_map[JERR_JINF_INVALIDFHDR] = "JERR_JINF_INVALIDFHDR: Invalid journal data file header";
-//    _err_map[JERR_JINF_STAT] = "JERR_JINF_STAT: Error while trying to stat a journal data file";
-//    _err_map[JERR_JINF_NOTREGFILE] = "JERR_JINF_NOTREGFILE: Target journal data file is not a regular file";
-//    _err_map[JERR_JINF_BADFILESIZE] = "JERR_JINF_BADFILESIZE: Journal data file is of incorrect or unexpected size";
-//    _err_map[JERR_JINF_OWIBAD] = "JERR_JINF_OWIBAD: Journal data files have inconsistent OWI flags; >1 transition found in non-auto-expand or min-size journal";
-//    _err_map[JERR_JINF_ZEROLENFILE] = "JERR_JINF_ZEROLENFILE: Journal info file zero length";
-
     // EFP errors
     _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)";
     _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Oct  7 18:39:24 2013
@@ -61,6 +61,7 @@ namespace qls_jrnl
         static const uint32_t JERR__UNEXPRESPONSE;     ///< Unexpected response to call or event
         static const uint32_t JERR__RECNFOUND;         ///< Record not found
         static const uint32_t JERR__NOTIMPL;           ///< Not implemented
+        static const uint32_t JERR__NULL;              ///< Operation on null pointer
 
         // class jcntl
         static const uint32_t JERR_JCNTL_STOPPED;      ///< Operation on stopped journal
@@ -68,8 +69,10 @@ namespace qls_jrnl
         static const uint32_t JERR_JCNTL_AIOCMPLWAIT;  ///< Timeout waiting for AIOs to complete
         static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
         static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
-        static const uint32_t JERR_JCNTL_RECOVERJFULL; ///< Journal data files full, cannot write
-        static const uint32_t JERR_JCNTL_OWIMISMATCH;  ///< OWI change found in unexpected location
+        static const uint32_t JERR_JCNTL_OPENRD;       ///< Unable to open file for read
+        static const uint32_t JERR_JCNTL_READ;         ///< Read error: no or insufficient data to read
+        static const uint32_t JERR_JCNTL_ENQSTATE;     ///< Read error: Record not in ENQ state
+        static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
 
         // class jdir
         static const uint32_t JERR_JDIR_NOTDIR;        ///< Exists but is not a directory
@@ -84,21 +87,14 @@ namespace qls_jrnl
         static const uint32_t JERR_JDIR_UNLINK;        ///< File delete failed
         static const uint32_t JERR_JDIR_BADFTYPE;      ///< Bad or unknown file type (stat mode)
 
-        // class fcntl
-//        static const uint32_t JERR_FCNTL_OPENWR;       ///< Unable to open file for write
-//        static const uint32_t JERR_FCNTL_WRITE;        ///< Unable to write to file
-//        static const uint32_t JERR_FCNTL_CLOSE;        ///< File close failed
-//        static const uint32_t JERR_FCNTL_FILEOFFSOVFL; ///< Increased offset past file size
-//        static const uint32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
-//        static const uint32_t JERR_FCNTL_RDOFFSOVFL;   ///< Increased read offs past write offs
-
-        // class lfmgr
-//        static const uint32_t JERR_LFMGR_BADAEFNUMLIM; ///< Bad auto-expand file number limit
-//        static const uint32_t JERR_LFMGR_AEFNUMLIMIT;  ///< Exceeded auto-expand file number limit
-//        static const uint32_t JERR_LFMGR_AEDISABLED;   ///< Attempted to expand with auto-expand disabled
+        // class JournalFile
+        static const uint32_t JERR_JNLF_OPEN;           ///< Unable to open file for write
+        static const uint32_t JERR_JNLF_CLOSE;          ///< Unable to close file
+        static const uint32_t JERR_JNLF_FILEOFFSOVFL;   ///< Increased offset past file size
+        static const uint32_t JERR_JNLF_CMPLOFFSOVFL;   ///< Increased cmpl offs past subm offs
 
-        // class rrfc
-//        static const uint32_t JERR_RRFC_OPENRD;        ///< Unable to open file for read
+        // class LinearFileController
+        static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found
 
         // class jrec, enq_rec, deq_rec, txn_rec
         static const uint32_t JERR_JREC_BADRECHDR;     ///< Invalid data record header
@@ -110,13 +106,14 @@ namespace qls_jrnl
         static const uint32_t JERR_WMGR_ENQDISCONT;    ///< Enq. new dtok when previous part compl.
         static const uint32_t JERR_WMGR_DEQDISCONT;    ///< Deq. new dtok when previous part compl.
         static const uint32_t JERR_WMGR_DEQRIDNOTENQ;  ///< Deq. rid not enqueued
+        static const uint32_t JERR_WMGR_BADFH;         ///< Bad file handle
 
-        // class rmgr
-        static const uint32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic
-        static const uint32_t JERR_RMGR_RIDMISMATCH;   ///< RID mismatch between rec and dtok
-        //static const uint32_t JERR_RMGR_FIDMISMATCH;   ///< FID mismatch between emap and rrfc
-        static const uint32_t JERR_RMGR_ENQSTATE;      ///< Attempted read when wstate not ENQ
-        static const uint32_t JERR_RMGR_BADRECTYPE;    ///< Attempted op on incorrect rec type
+//        // class rmgr
+//        static const uint32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic
+//        static const uint32_t JERR_RMGR_RIDMISMATCH;   ///< RID mismatch between rec and dtok
+//        //static const uint32_t JERR_RMGR_FIDMISMATCH;   ///< FID mismatch between emap and rrfc
+//        static const uint32_t JERR_RMGR_ENQSTATE;      ///< Attempted read when wstate not ENQ
+//        static const uint32_t JERR_RMGR_BADRECTYPE;    ///< Attempted op on incorrect rec type
 
         // class data_tok
         static const uint32_t JERR_DTOK_ILLEGALSTATE;  ///< Attempted to change to illegal state
@@ -127,19 +124,6 @@ namespace qls_jrnl
         static const uint32_t JERR_MAP_NOTFOUND;       ///< Key not found in map
         static const uint32_t JERR_MAP_LOCKED;         ///< rid locked by pending txn
 
-        // class jinf
-//        static const uint32_t JERR_JINF_CVALIDFAIL;    ///< Compatibility validation failure
-//        static const uint32_t JERR_JINF_NOVALUESTR;    ///< No value attr found in jinf file
-//        static const uint32_t JERR_JINF_BADVALUESTR;   ///< Bad format for value attr in jinf file
-//        static const uint32_t JERR_JINF_JDATEMPTY;     ///< Journal data files empty
-//        static const uint32_t JERR_JINF_TOOMANYFILES;  ///< Too many journal data files
-//        static const uint32_t JERR_JINF_INVALIDFHDR;   ///< Invalid file header
-//        static const uint32_t JERR_JINF_STAT;          ///< Error while trying to stat a file
-//        static const uint32_t JERR_JINF_NOTREGFILE;    ///< Target file is not a regular file
-//        static const uint32_t JERR_JINF_BADFILESIZE;   ///< File is of incorrect or unexpected size
-//        static const uint32_t JERR_JINF_OWIBAD;        ///< OWI inconsistent (>1 transition in non-ae journal)
-//        static const uint32_t JERR_JINF_ZEROLENFILE;   ///< Journal info file is zero length (empty).
-
         // EFP errors
         static const uint32_t JERR_EFP_BADPARTITIONNAME;  ///< Partition name invalid or of value 0
         static const uint32_t JERR_EFP_BADEFPDIRNAME;     ///< Empty File Pool directory name invalid

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Mon Oct  7 18:39:24 2013
@@ -148,9 +148,9 @@ namespace qls_jrnl
         virtual std::size_t rec_size() const = 0;
         inline virtual uint32_t rec_size_dblks() const { return size_dblks(rec_size()); }
         static inline uint32_t size_dblks(const std::size_t size)
-                { return size_blks(size, JRNL_DBLK_SIZE); }
+                { return size_blks(size, JRNL_DBLK_SIZE_BYTES); }
         static inline uint32_t size_sblks(const std::size_t size)
-                { return size_blks(size, JRNL_SBLK_SIZE); }
+                { return size_blks(size, JRNL_SBLK_SIZE_BYTES); }
         static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize)
                 { return (size + blksize - 1)/blksize; }
         virtual uint64_t rid() const = 0;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Mon Oct  7 18:39:24 2013
@@ -29,7 +29,6 @@
 #include "qpid/linearstore/jrnl/jerrno.h"
 #include <sstream>
 
-
 namespace qpid
 {
 namespace qls_jrnl
@@ -44,6 +43,7 @@ pmgr::page_cb::page_cb(uint16_t index):
         _pdtokl(0),
 //        _wfh(0),
 //        _rfh(0),
+        _jfp(0),
         _pbuff(0)
 {}
 
@@ -64,7 +64,8 @@ pmgr::page_cb::state_str() const
     return "<unknown>";
 }
 
-const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE;
+// static
+const uint32_t pmgr::_sblkSizeBytes = JRNL_SBLK_SIZE_BYTES;
 
 pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
         _cache_pgsize_sblks(0),
@@ -107,32 +108,33 @@ pmgr::initialize(aio_callback* const cbp
     _cbp = cbp;
 
     // 1. Allocate page memory (as a single block)
-    std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
-    if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize))
+    std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblkSizeBytes;
+    if (::posix_memalign(&_page_base_ptr, QLS_AIO_ALIGN_BOUNDARY, cache_pgsize))
     {
         clean();
         std::ostringstream oss;
-        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize;
+        oss << "posix_memalign(): alignment=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << cache_pgsize;
         oss << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
     }
+
     // 2. Allocate array of page pointers
     _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
     MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
 
-    // 3. Allocate and initilaize page control block (page_cb) array
+    // 3. Allocate and initialize page control block (page_cb) array
     _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
     MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
     std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
 
-    // 5. Allocate IO control block (iocb) array
+    // 4. Allocate IO control block (iocb) array
     _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
     MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
 
-    // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
+    // 5. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
     for (uint16_t i=0; i<_cache_num_pages; i++)
     {
-        _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i);
+        _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblkSizeBytes * i);
         _page_cb_arr[i]._index = i;
         _page_cb_arr[i]._state = UNUSED;
         _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
@@ -141,12 +143,12 @@ pmgr::initialize(aio_callback* const cbp
         _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
     }
 
-    // 7. Allocate io_event array, max one event per cache page plus one for each file
-    const uint16_t max_aio_evts = _cache_num_pages /*+ _jc->num_jfiles()*/; // TODO find replacement here for linear store
+    // 6. Allocate io_event array, max one event per cache page plus one for each file
+    const uint16_t max_aio_evts = _cache_num_pages + 1; // One additional event for file header writes
     _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
     MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
 
-    // 8. Initialize AIO context
+    // 7. Initialize AIO context
     if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
     {
         std::ostringstream oss;
@@ -158,7 +160,7 @@ pmgr::initialize(aio_callback* const cbp
 void
 pmgr::clean()
 {
-    // clean up allocated memory here
+    // Clean up allocated memory here
 
     if (_ioctx)
         aio::queue_release(_ioctx);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.h Mon Oct  7 18:39:24 2013
@@ -45,6 +45,7 @@ namespace qpid
 {
 namespace qls_jrnl
 {
+class JournalFile;
 
     /**
     * \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -64,7 +65,6 @@ namespace qls_jrnl
             AIO_COMPLETE                ///< An AIO request is complete.
         };
 
-    protected:
         /**
         * \brief Page control block, carries control and state information for each page in the
         *     cache.
@@ -79,13 +79,15 @@ namespace qls_jrnl
             std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
             //fcntl* _wfh;                ///< File handle for incrementing write compl counts
             //fcntl* _rfh;                ///< File handle for incrementing read compl counts
+            JournalFile* _jfp;          ///< Journal file for incrementing compl counts
             void* _pbuff;               ///< Page buffer
 
             page_cb(uint16_t index);   ///< Convenience constructor
             const char* state_str() const; ///< Return state as string for this pcb
         };
 
-        static const uint32_t _sblksize; ///< Disk softblock size
+    protected:
+        static const uint32_t _sblkSizeBytes; ///< Disk softblock size
         uint32_t _cache_pgsize_sblks;   ///< Size of page cache cache_num_pages
         uint16_t _cache_num_pages;      ///< Number of page cache cache_num_pages
         jcntl* _jc;                     ///< Pointer to journal controller

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rcvdat.h Mon Oct  7 18:39:24 2013
@@ -37,98 +37,42 @@ namespace qls_jrnl
 
         struct rcvdat
         {
-            uint16_t _njf;      ///< Number of journal files
-//            bool _ae;           ///< Auto-expand mode
-//            uint16_t _aemjf;    ///< Auto-expand mode max journal files
-//            bool _owi;          ///< Overwrite indicator
-//            bool _frot;         ///< First rotation flag
-            bool _jempty;       ///< Journal data files empty
-            uint16_t _ffid;     ///< First file id
-            std::size_t _fro;   ///< First record offset in ffid
-            uint16_t _lfid;     ///< Last file id
-            std::size_t _eo;    ///< End offset (first byte past last record)
-            uint64_t _h_rid;    ///< Highest rid found
-            bool _lffull;       ///< Last file is full
-            bool _jfull;        ///< Journal is full
-//            std::vector<uint16_t> _fid_list; ///< Fid-lid mapping - list of fids in order of lid
-            std::vector<uint32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+            std::vector<std::string> _jfl;          ///< Journal file list
+            std::map<uint64_t, std::string> _fm;    ///< File number - name map
+            std::vector<uint32_t> _enq_cnt_list;    ///< Number enqueued records found for each file
+            bool _jempty;                           ///< Journal data files empty
+            std::size_t _fro;                       ///< First record offset in ffid
+            std::size_t _eo;                        ///< End offset (first byte past last record)
+            uint64_t _h_rid;                        ///< Highest rid found
+            bool _lffull;                           ///< Last file is full
 
             rcvdat():
-                    _njf(0),
-//                    _ae(false),
-//                    _aemjf(0),
-//                    _owi(false),
-//                    _frot(false),
-                    _jempty(true),
-                    _ffid(0),
+                    _jfl(),
+                    _fm(),
+                    _enq_cnt_list(),
+                    _jempty(false),
                     _fro(0),
-                    _lfid(0),
                     _eo(0),
                     _h_rid(0),
-                    _lffull(false),
-                    _jfull(false),
-//                    _fid_list(),
-                    _enq_cnt_list()
+                    _lffull(false)
             {}
 
-            void reset(const uint16_t num_jfiles/*, const bool auto_expand, const uint16_t ae_max_jfiles*/)
-            {
-                _njf = num_jfiles;
-//                _ae = auto_expand;
-//                _aemjf = ae_max_jfiles;
-//                _owi = false;
-//                _frot = false;
-                _jempty = true;
-                _ffid = 0;
-                _fro = 0;
-                _lfid = 0;
-                _eo = 0;
-                _h_rid = 0;
-                _lffull = false;
-                _jfull = false;
-//                _fid_list.clear();
-                _enq_cnt_list.clear();
-                _enq_cnt_list.resize(num_jfiles, 0);
-            }
-
-            // Find first fid with enqueued records
-            uint16_t ffid()
-            {
-                uint16_t index = _ffid;
-                while (index != _lfid && _enq_cnt_list[index] == 0)
-                {
-                    if (++index >= _njf)
-                        index = 0;
-                }
-                return index;
-            }
-
             std::string to_string(const std::string& jid)
             {
                 std::ostringstream oss;
                 oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl;
-                oss << "  Number of journal files (_njf) = " << _njf << std::endl;
-//                oss << "  Auto-expand mode (_ae) = " << (_ae ? "TRUE" : "FALSE") << std::endl;
-//                if (_ae) oss << "  Auto-expand mode max journal files (_aemjf) = " << _aemjf << std::endl;
-//                oss << "  Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
-//                oss << "  First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
+                oss << "  Number of journal files = " << _fm.size() << std::endl;
+                oss << "  Journal File List (_jfl):";
+                for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) {
+                    oss << "    " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+                }
                 oss << "  Journal empty (_jempty) = " << (_jempty ? "TRUE" : "FALSE") << std::endl;
-                oss << "  First (earliest) fid (_ffid) = " << _ffid << std::endl;
                 oss << "  First record offset in first fid (_fro) = 0x" << std::hex << _fro <<
-                        std::dec << " (" << (_fro/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
-                oss << "  Last (most recent) fid (_lfid) = " << _lfid << std::endl;
+                        std::dec << " (" << (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
                 oss << "  End offset (_eo) = 0x" << std::hex << _eo << std::dec << " ("  <<
-                        (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
+                        (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
                 oss << "  Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec << std::endl;
                 oss << "  Last file full (_lffull) = " << (_lffull ? "TRUE" : "FALSE") << std::endl;
-                oss << "  Journal full (_jfull) = " << (_jfull ? "TRUE" : "FALSE") << std::endl;
-                oss << "  Normalized fid list (_fid_list) = [";
-//                for (std::vector<uint16_t>::const_iterator i = _fid_list.begin(); i < _fid_list.end(); i++)
-//                {
-//                    if (i != _fid_list.begin()) oss << ", ";
-//                    oss << *i;
-//                }
-                oss << "]" << std::endl;
                 oss << "  Enqueued records (txn & non-txn):" << std::endl;
                 for (unsigned i=0; i<_enq_cnt_list.size(); i++)
                    oss << "    File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
@@ -140,28 +84,25 @@ namespace qls_jrnl
             {
                 std::ostringstream oss;
                 oss << "Recover file analysis (jid=\"" << jid << "\"):";
-                oss << " njf=" << _njf;
-//                oss << " ae=" << (_owi ? "T" : "F");
-//                oss << " aemjf=" << _aemjf;
-//                oss << " owi=" << (_ae ? "T" : "F");
-//                oss << " frot=" << (_frot ? "T" : "F");
+                oss << " jfl=[";
+                for (std::map<uint64_t, std::string>::const_iterator i=_fm.begin(); i!=_fm.end(); ++i) {
+                    if (i!=_fm.begin()) oss << " ";
+                    oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+                }
+                oss << "]";
+                oss << " _enq_cnt_list: [ ";
+                for (unsigned i=0; i<_enq_cnt_list.size(); i++) {
+                    if (i) oss << " ";
+                    oss << _enq_cnt_list[i];
+                }
+                oss << " ]";
                 oss << " jempty=" << (_jempty ? "T" : "F");
-                oss << " ffid=" << _ffid;
                 oss << " fro=0x" << std::hex << _fro << std::dec << " (" <<
-                        (_fro/JRNL_DBLK_SIZE) << " dblks)";
-                oss << " lfid=" << _lfid;
+                        (_fro/JRNL_DBLK_SIZE_BYTES) << " dblks)";
                 oss << " eo=0x" << std::hex << _eo << std::dec << " ("  <<
-                        (_eo/JRNL_DBLK_SIZE) << " dblks)";
+                        (_eo/JRNL_DBLK_SIZE_BYTES) << " dblks)";
                 oss << " h_rid=0x" << std::hex << _h_rid << std::dec;
                 oss << " lffull=" << (_lffull ? "T" : "F");
-                oss << " jfull=" << (_jfull ? "T" : "F");
-                oss << " Enqueued records (txn & non-txn): [ ";
-                for (unsigned i=0; i<_enq_cnt_list.size(); i++)
-                {
-                    if (i) oss << " ";
-                    oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i];
-                }
-                oss << " ]";
                 return oss.str();
             }
         };

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp Mon Oct  7 18:39:24 2013
@@ -53,10 +53,10 @@ rmgr::initialize(aio_callback* const cbp
     pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
     clean();
     // Allocate memory for reading file header
-    if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
+    if (::posix_memalign(&_fhdr_buffer, _sblkSizeBytes, _sblkSizeBytes))
     {
         std::ostringstream oss;
-        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+        oss << "posix_memalign(): blksize=" << _sblkSizeBytes << " size=" << _sblkSizeBytes;
         oss << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Mon Oct  7 18:39:24 2013
@@ -50,23 +50,27 @@ txn_data_struct::txn_data_struct(const u
 {}
 
 txn_map::txn_map():
-        _map(),
-        _pfid_txn_cnt()
+        _map()/*,
+        _pfid_txn_cnt()*/
 {}
 
 txn_map::~txn_map() {}
 
+/*
 void
 txn_map::set_num_jfiles(const uint16_t num_jfiles)
 {
     _pfid_txn_cnt.resize(num_jfiles, 0);
 }
+*/
 
+/*
 uint32_t
 txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
 {
     return _pfid_txn_cnt.at(pfid);
 }
+*/
 
 bool
 txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
@@ -84,7 +88,7 @@ txn_map::insert_txn_data(const std::stri
     }
     else
         itr->second.push_back(td);
-    _pfid_txn_cnt.at(td._pfid)++;
+//    _pfid_txn_cnt.at(td._pfid)++;
     return ok;
 }
 
@@ -113,8 +117,8 @@ txn_map::get_remove_tdata_list(const std
         return _empty_data_list;
     txn_data_list list = itr->second;
     _map.erase(itr);
-    for (tdl_itr i=list.begin(); i!=list.end(); i++)
-        _pfid_txn_cnt.at(i->_pfid)--;
+//    for (tdl_itr i=list.begin(); i!=list.end(); i++)
+//        _pfid_txn_cnt.at(i->_pfid)--;
     return list;
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Mon Oct  7 18:39:24 2013
@@ -112,15 +112,15 @@ namespace qls_jrnl
 
         xmap _map;
         smutex _mutex;
-        std::vector<uint32_t> _pfid_txn_cnt;
+//        std::vector<uint32_t> _pfid_txn_cnt;
         const txn_data_list _empty_data_list;
 
     public:
         txn_map();
         virtual ~txn_map();
 
-        void set_num_jfiles(const uint16_t num_jfiles);
-        uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
+//        void set_num_jfiles(const uint16_t num_jfiles);
+//        uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
         bool insert_txn_data(const std::string& xid, const txn_data& td);
         const txn_data_list get_tdata_list(const std::string& xid);
         const txn_data_list get_remove_tdata_list(const std::string& xid);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/txn_rec.cpp Mon Oct  7 18:39:24 2013
@@ -94,8 +94,8 @@ txn_rec::encode(void* wptr, uint32_t rec
     assert(max_size_dblks > 0);
     assert(_xidp != 0 && _txn_hdr._xidsize > 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
     {
@@ -146,8 +146,8 @@ txn_rec::encode(void* wptr, uint32_t rec
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_txn_tail + rec_offs, wsize);
                 wr_cnt += wsize;
 #ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
                 std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
@@ -187,7 +187,7 @@ txn_rec::encode(void* wptr, uint32_t rec
             std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
             wr_cnt += sizeof(_txn_tail);
 #ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
             std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
@@ -206,7 +206,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
     {
         const uint32_t hdr_xid_dblks = size_dblks(sizeof(txn_hdr_t) + _txn_hdr._xidsize);
         const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(txn_hdr_t) +  _txn_hdr._xidsize + sizeof(rec_tail_t));
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
 
         if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
         {
@@ -239,7 +239,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
             const std::size_t xid_rem = _txn_hdr._xidsize - xid_offs;
             std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
             rd_cnt += xid_rem;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_txn_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -249,7 +249,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Remainder of xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + rec_offs - sizeof(txn_hdr_t), rptr, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -288,7 +288,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
             // Entire header and xid fit within this page, tail split
             std::memcpy(_buff, (char*)rptr + rd_cnt, _txn_hdr._xidsize);
             rd_cnt += _txn_hdr._xidsize;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_txn_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -298,7 +298,7 @@ txn_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Header fits within this page, xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
             std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -357,7 +357,7 @@ txn_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
     chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
     return true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.c Mon Oct  7 18:39:24 2013
@@ -54,3 +54,10 @@ void set_enq_external(enq_hdr_t *eh, con
     eh->_rhdr._uflag = external ? eh->_rhdr._uflag | ENQ_HDR_EXTERNAL_MASK :
                                   eh->_rhdr._uflag & (~ENQ_HDR_EXTERNAL_MASK);
 }
+
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid) {
+    return eh->_rhdr._magic == magic &&
+           eh->_rhdr._version == version &&
+           rid > 0 ? eh->_rhdr._rid == rid /* If rid == 0, don't compare rids */
+                   : true;
+}

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/enq_hdr.h Mon Oct  7 18:39:24 2013
@@ -70,6 +70,7 @@ bool is_enq_transient(const enq_hdr_t *e
 void set_enq_transient(enq_hdr_t *eh, const bool transient);
 bool is_enq_external(const enq_hdr_t *eh);
 void set_enq_external(enq_hdr_t *eh, const bool external);
+bool validate_enq_hdr(enq_hdr_t *eh, const uint32_t magic, const uint16_t version, const uint64_t rid);
 
 #pragma pack()
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Oct  7 18:39:24 2013
@@ -36,19 +36,21 @@ void file_hdr_create(file_hdr_t* dest, c
     dest->_queue_name_len = 0;
 }
 
-int file_hdr_init(file_hdr_t* dest, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
+int file_hdr_init(void* dest, const uint64_t dest_len, const uint16_t uflag, const uint64_t rid, const uint64_t fro,
                   const uint64_t file_number, const uint16_t queue_name_len, const char* queue_name) {
-    dest->_rhdr._uflag = uflag;
-    dest->_rhdr._rid = rid;
-    dest->_fro = fro;
-    dest->_file_number = file_number;
+    file_hdr_t* fhp = (file_hdr_t*)dest;
+    fhp->_rhdr._uflag = uflag;
+    fhp->_rhdr._rid = rid;
+    fhp->_fro = fro;
+    fhp->_file_number = file_number;
     if (sizeof(file_hdr_t) + queue_name_len < MAX_FILE_HDR_LEN) {
-        dest->_queue_name_len = queue_name_len;
+        fhp->_queue_name_len = queue_name_len;
     } else {
-        dest->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
+        fhp->_queue_name_len = MAX_FILE_HDR_LEN - sizeof(file_hdr_t);
     }
-    dest->_queue_name_len = queue_name_len;
-    memcpy(dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+    fhp->_queue_name_len = queue_name_len;
+    memcpy((char*)dest + sizeof(file_hdr_t), queue_name, queue_name_len);
+    memset((char*)dest + sizeof(file_hdr_t) + queue_name_len, 0, dest_len - sizeof(file_hdr_t) - queue_name_len);
     return set_time_now(dest);
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message