qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1538790 [2/2] - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ jrnl/ jrnl/utils/
Date Mon, 04 Nov 2013 22:15:15 GMT
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Mon Nov  4 22:15:14 2013
@@ -31,6 +31,7 @@
 #include "qpid/linearstore/jrnl/enq_rec.h"
 #include "qpid/linearstore/jrnl/jcfg.h"
 #include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalFile.h"
 #include "qpid/linearstore/jrnl/JournalLog.h"
 #include "qpid/linearstore/jrnl/jrec.h"
 #include "qpid/linearstore/jrnl/LinearFileController.h"
@@ -62,7 +63,7 @@ RecoveryManager::RecoveryManager(const s
                                                  highestRecordId_(0ULL),
                                                  highestFileNumber_(0ULL),
                                                  lastFileFullFlag_(false),
-                                                 fileSize_kib_(0)
+                                                 efpFileSize_kib_(0)
 {}
 
 RecoveryManager::~RecoveryManager() {}
@@ -74,18 +75,19 @@ void RecoveryManager::analyzeJournals(co
     efpIdentity_t efpIdentity;
     analyzeJournalFileHeaders(efpIdentity);
     *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
-    fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+    efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
+
     // Check for file full condition
     lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
 
-    // Restore all read and write pointers and transactions
     if (!journalEmptyFlag_) {
-        while (getNextRecordHeader()) {
 
-        }
+        // Read all records, establish remaining enqueued records
+        while (getNextRecordHeader()) {}
         if (inFileStream_.is_open()) {
             inFileStream_.close();
         }
+
         // Remove leading files which have no enqueued records
         removeEmptyFiles(*emptyFilePoolPtrPtr);
 
@@ -100,14 +102,14 @@ void RecoveryManager::analyzeJournals(co
                     txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
                     // Unlock any affected enqueues in emap
                     for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
-                        if (i->_enq_flag) { // enq op - decrement enqueue count
-                            enqueueCountList_[i->_pfid]--;
-                        } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record
-                            int16_t ret = enqueueMapRef_.unlock(i->_drid);
+                        if (i->enq_flag_) { // enq op - decrement enqueue count
+                            fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+                        } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
+                            int16_t ret = enqueueMapRef_.unlock(i->drid_);
                             if (ret < enq_map::EMAP_OK) { // fail
                                 // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
                                 std::ostringstream oss;
-                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid;
+                                oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
                                 throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals");
                             }
                         }
@@ -115,7 +117,10 @@ void RecoveryManager::analyzeJournals(co
                 }
             }
         }
+
+        // Set up recordIdList_ from enqueue map
         enqueueMapRef_.rid_list(recordIdList_);
+
         recordIdListConstItr_ = recordIdList_.begin();
     }
 }
@@ -144,19 +149,13 @@ bool RecoveryManager::readNextRemainingR
                                               bool& external,
                                               data_tok* const dtokp,
                                               bool /*ignore_pending_txns*/) {
-    if (!dtokp->is_readable()) {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id();
-        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str();
-        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord");
-    }
     if (recordIdListConstItr_ == recordIdList_.end()) {
         return false;
     }
     enq_map::emap_data_struct_t eds;
     enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
     uint64_t fileNumber = eds._pfid;
-    currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber);
+    currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
     getNextFile(false);
 
     inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
@@ -195,18 +194,26 @@ bool RecoveryManager::readNextRemainingR
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
     }
     readJournalData((char*)*dataPtrPtr, dataSize);
+
+    // Set data token
+    dtokp->set_wstate(data_tok::ENQ);
+    dtokp->set_rid(enqueueHeader._rhdr._rid);
+    dtokp->set_dsize(dataSize);
+    if (xidSize) {
+        dtokp->set_xid(*xidPtrPtr, xidSize);
+    }
+
+    ++recordIdListConstItr_;
     return true;
 }
 
 void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
                                                       LinearFileController* lfcPtr) {
-//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG
-    for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) {
+    for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
         uint32_t fileDblkCount = i->first == highestFileNumber_ ?               // Is this this last file?
-                                 endOffset_ / QLS_DBLK_SIZE_BYTES :            // Last file uses _endOffset
-                                 fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;   // All others use file size to make them full
-        (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount);
-//std::cout << "   ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG
+                                 endOffset_ / QLS_DBLK_SIZE_BYTES :             // Last file uses _endOffset
+                                 efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
+        (lfcPtr->*fnPtr)(i->second, fileDblkCount);
     }
 }
 
@@ -216,14 +223,19 @@ std::string RecoveryManager::toString(co
     if (compact) {
         oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
         oss << " jfl=[";
-        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
-            if (i!=fileNumberNameMap_.begin()) oss << " ";
-            oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+        for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) {
+            if (i!=fileNumberMap_.begin()) {
+                oss << " ";
+            }
+            std::string fqFileName = i->second->getFqFileName();
+            oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1);
         }
         oss << "] ecl=[ ";
-        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
-            if (j != enqueueCountList_.begin()) oss << " ";
-            oss << *j;
+        for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) {
+            if (j!=fileNumberMap_.begin()) {
+                oss << " ";
+            }
+            oss << j->second->getEnqueuedRecordCount();
         }
         oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F");
         oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)";
@@ -233,15 +245,18 @@ std::string RecoveryManager::toString(co
         oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F");
     } else {
         oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
-        oss << "  Number of journal files = " << fileNumberNameMap_.size() << std::endl;
+        oss << "  Number of journal files = " << fileNumberMap_.size() << std::endl;
         oss << "  Journal File List:" << std::endl;
-        for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) {
-            oss << "    " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+        for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
+            std::string fqFileName = k->second->getFqFileName();
+            oss << "    " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
         }
         oss << "  Enqueue Counts: [ " << std::endl;
-        for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) {
-            if (j != enqueueCountList_.begin()) oss << ", ";
-            oss << *j;
+        for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
+            if (l != fileNumberMap_.begin()) {
+                oss << ", ";
+            }
+            oss << l->second->getEnqueuedRecordCount();
         }
         oss << " ]" << std::endl;
         oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -271,21 +286,26 @@ void RecoveryManager::analyzeJournalFile
             oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
             journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
         } else {
-            fileNumberNameMap_[fileHeader._file_number] = *i;
+            JournalFile* jfp = new JournalFile(*i, fileHeader);
+            fileNumberMap_[fileHeader._file_number] = jfp;
             if (fileHeader._file_number > highestFileNumber_) {
                 highestFileNumber_ = fileHeader._file_number;
             }
         }
     }
-    efpIdentity.first = fileHeader._efp_partition;
-    efpIdentity.second = fileHeader._file_size_kib;
-    enqueueCountList_.resize(fileNumberNameMap_.size(), 0);
-    currentJournalFileConstItr_ = fileNumberNameMap_.begin();
+    efpIdentity.pn_ = fileHeader._efp_partition;
+    efpIdentity.ds_ = fileHeader._data_size_kib;
+    currentJournalFileConstItr_ = fileNumberMap_.begin();
 }
 
 void RecoveryManager::checkFileStreamOk(bool checkEof) {
     if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) {
-        throw jexception("read failure"); // TODO complete exception
+        std::ostringstream oss;
+        oss << "Stream status: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+        if (checkEof) {
+            oss << " eof=" << (inFileStream_.eof()?"T":"F");
+        }
+        throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "checkFileStreamOk");
     }
 }
 
@@ -339,7 +359,6 @@ bool RecoveryManager::decodeRecord(jrec&
                               ::rec_hdr_t& headerRecord,
                               std::streampos& fileOffset)
 {
-//    uint16_t start_fid = getCurrentFileNumber();
     std::streampos start_file_offs = fileOffset;
 
     if (highestRecordId_ == 0) {
@@ -354,14 +373,7 @@ bool RecoveryManager::decodeRecord(jrec&
             done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead);
         }
         catch (const jexception& e) {
-// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn.
-// Original
-//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
-//                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
-// Tried this, but did not work
-//             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
             checkJournalAlignment(start_file_offs);
-//             rd._lfid = start_fid;
             return false;
         }
         if (!done && !getNextFile(false)) {
@@ -373,7 +385,7 @@ bool RecoveryManager::decodeRecord(jrec&
 }
 
 std::string RecoveryManager::getCurrentFileName() const {
-    return currentJournalFileConstItr_->second;
+    return currentJournalFileConstItr_->second->getFqFileName();
 }
 
 uint64_t RecoveryManager::getCurrentFileNumber() const {
@@ -382,19 +394,21 @@ uint64_t RecoveryManager::getCurrentFile
 
 bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) {
     if (inFileStream_.is_open()) {
-        if (inFileStream_.eof() || !inFileStream_.good())
-        {
+        if (inFileStream_.eof() || !inFileStream_.good()) {
             inFileStream_.clear();
             endOffset_ = inFileStream_.tellg(); // remember file offset before closing
-            if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception
+            if (endOffset_ == -1) {
+                std::ostringstream oss;
+                oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+                throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile");
+            }
             inFileStream_.close();
-            if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) {
+            if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
                 return false;
             }
         }
     }
-    if (!inFileStream_.is_open())
-    {
+    if (!inFileStream_.is_open())  {
         inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
         inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
         if (!inFileStream_.good()) {
@@ -402,7 +416,6 @@ bool RecoveryManager::getNextFile(bool j
         }
 
         // Read file header
-//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG
         file_hdr_t fhdr;
         inFileStream_.read((char*)&fhdr, sizeof(fhdr));
         checkFileStreamOk(true);
@@ -412,7 +425,7 @@ bool RecoveryManager::getNextFile(bool j
             inFileStream_.seekg(foffs);
         } else {
             inFileStream_.close();
-            if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) {
+            if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
                 journalEmptyFlag_ = true;
             }
             return false;
@@ -436,7 +449,11 @@ bool RecoveryManager::getNextRecordHeade
             }
         }
         file_pos = inFileStream_.tellg();
-//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG
+        if (file_pos == -1) {
+            std::ostringstream oss;
+            oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+            throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
+        }
         inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
         if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
             hdr_ok = true;
@@ -450,19 +467,21 @@ bool RecoveryManager::getNextRecordHeade
     switch(h._magic) {
         case QLS_ENQ_MAGIC:
             {
-//std::cout << ".e" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
                 enq_rec er;
                 uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
                 if (!decodeRecord(er, cum_size_read, h, file_pos)) {
                     return false;
                 }
                 if (!er.is_transient()) { // Ignore transient msgs
-                    enqueueCountList_[start_fid]++;
+                    fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
                     if (er.xid_size()) {
                         er.get_xid(&xidp);
-                        if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                        if (xidp == 0) {
+                            throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
+                        }
                         std::string xid((char*)xidp, er.xid_size());
-                        transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
+                        transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
                         if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
                             std::ostringstream oss;
                             oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -482,7 +501,7 @@ bool RecoveryManager::getNextRecordHeade
             break;
         case QLS_DEQ_MAGIC:
             {
-//std::cout << ".d" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
                 deq_rec dr;
                 uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
                 if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
@@ -492,10 +511,12 @@ bool RecoveryManager::getNextRecordHeade
                     // If the enqueue is part of a pending txn, it will not yet be in emap
                     enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
                     dr.get_xid(&xidp);
-                    if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception
+                    if (xidp == 0) {
+                        throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader");
+                    }
                     std::string xid((char*)xidp, dr.xid_size());
-                    transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
-                            dr.is_txn_coml_commit()));
+                    transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
+                                                       false, dr.is_txn_coml_commit()));
                     if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
                         std::ostringstream oss;
                         oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
@@ -505,30 +526,30 @@ bool RecoveryManager::getNextRecordHeade
                 } else {
                     uint64_t enq_fid;
                     if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
-                        enqueueCountList_[enq_fid]--;
+                        fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
                     }
                 }
             }
             break;
         case QLS_TXA_MAGIC:
             {
-//std::cout << ".a" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
                 txn_rec ar;
                 if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
                     return false;
                 }
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
-                if (xidp != 0) {
-                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                if (xidp == 0) {
+                    throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, ar.xid_size());
                 txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                 for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
-                    if (itr->_enq_flag) {
-                        enqueueCountList_[itr->_pfid]--;
+                    if (itr->enq_flag_) {
+                        fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
                     } else {
-                        enqueueMapRef_.unlock(itr->_drid); // ignore not found error
+                        enqueueMapRef_.unlock(itr->drid_); // ignore not found error
                     }
                 }
                 std::free(xidp);
@@ -536,30 +557,31 @@ bool RecoveryManager::getNextRecordHeade
             break;
         case QLS_TXC_MAGIC:
             {
-//std::cout << ".t" << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
                 txn_rec cr;
                 if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
                     return false;
                 }
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
-                if (xidp != 0) {
-                    throw jexception("Null xid with non-null xid_size"); // TODO complete exception
+                if (xidp == 0) {
+                    throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, cr.xid_size());
                 txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                 for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
-                    if (itr->_enq_flag) { // txn enqueue
-                        if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail
+                    if (itr->enq_flag_) { // txn enqueue
+//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
+                        if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
                             // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                             std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+                            oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
                             throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
                         }
                     } else { // txn dequeue
                         uint64_t enq_fid;
-                        if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
-                            enqueueCountList_[enq_fid]--;
+                        if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
+                            fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
                     }
                 }
                 std::free(xidp);
@@ -577,11 +599,11 @@ bool RecoveryManager::getNextRecordHeade
             }
             break;
         case 0:
-//std::cout << ".0" << std::endl << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
             checkJournalAlignment(file_pos);
             return false;
         default:
-//std::cout << ".?" << std::endl << std::flush; // DEBUG
+//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
             // Stop as this is the overwrite boundary.
             checkJournalAlignment(file_pos);
             return false;
@@ -593,16 +615,24 @@ void RecoveryManager::readJournalData(ch
                                       const std::streamsize readSize) {
     std::streamoff bytesRead = 0;
     while (bytesRead < readSize) {
-        if (inFileStream_.eof()) {
-            getNextFile(false);
+        std::streampos file_pos = inFileStream_.tellg();
+        if (file_pos == -1) {
+            std::ostringstream oss;
+            oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+            throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData");
         }
-        bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024;
-        std::streamoff actualReadSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg();
-        inFileStream_.read(target + bytesRead, actualReadSize);
-        if (inFileStream_.gcount() != actualReadSize) {
-            throw jexception(); // TODO - proper exception
+        inFileStream_.read(target + bytesRead, readSize - bytesRead);
+        std::streamoff thisReadSize = inFileStream_.gcount();
+        if (thisReadSize < readSize) {
+            getNextFile(false);
+            file_pos = inFileStream_.tellg();
+            if (file_pos == -1) {
+                std::ostringstream oss;
+                oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
+                throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData");
+            }
         }
-        bytesRead += actualReadSize;
+        bytesRead += thisReadSize;
     }
 }
 
@@ -633,12 +663,10 @@ void RecoveryManager::readJournalFileHea
 }
 
 void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
-    while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) {
-        fileNumberNameMapItr_t i = fileNumberNameMap_.begin();
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl;
-        emptyFilePoolPtr->returnEmptyFile(i->second);
-        fileNumberNameMap_.erase(i);
-        enqueueCountList_.pop_front();
+    while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
+        emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+        fileNumberMap_.erase(fileNumberMap_.begin()->first);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Mon Nov  4 22:15:14 2013
@@ -49,11 +49,9 @@ protected:
     // Types
     typedef std::vector<std::string> directoryList_t;
     typedef directoryList_t::const_iterator directoryListConstItr_t;
-    typedef std::map<uint64_t, std::string> fileNumberNameMap_t;
-    typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t;
-    typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t;
-    typedef std::deque<uint32_t> enqueueCountList_t;
-    typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t;
+    typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
+    typedef fileNumberMap_t::iterator fileNumberMapItr_t;
+    typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
     typedef std::vector<uint64_t> recordIdList_t;
     typedef recordIdList_t::const_iterator recordIdListConstItr_t;
 
@@ -65,8 +63,7 @@ protected:
     JournalLog& journalLogRef_;
 
     // Initial journal analysis data
-    fileNumberNameMap_t fileNumberNameMap_;     ///< File number - name map
-    enqueueCountList_t enqueueCountList_;       ///< Number enqueued records found for each file
+    fileNumberMap_t fileNumberMap_;             ///< File number - JournalFilePtr map
     bool journalEmptyFlag_;                     ///< Journal data files empty
     std::streamoff firstRecordOffset_;          ///< First record offset in ffid
     std::streamoff endOffset_;                  ///< End offset (first byte past last record)
@@ -75,8 +72,8 @@ protected:
     bool lastFileFullFlag_;                     ///< Last file is full
 
     // State for recovery of individual enqueued records
-    uint32_t fileSize_kib_;
-    fileNumberNameMapConstItr_t currentJournalFileConstItr_;
+    uint32_t efpFileSize_kib_;
+    fileNumberMapConstItr_t currentJournalFileConstItr_;
     std::string currentFileName_;
     std::ifstream inFileStream_;
     recordIdList_t recordIdList_;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Nov  4 22:15:14 2013
@@ -103,35 +103,10 @@ jcntl::initialize(EmptyFilePool* efpp,
     _tmap.clear();
 
     _linearFileController.finalize();
-
-//    _lpmgr.finalize();
-
-    // Set new file geometry parameters
-//    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
-//    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
-//    _emap.set_num_jfiles(num_jfiles);
-//    _tmap.set_num_jfiles(num_jfiles);
-
-//    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
-//    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
-//    _jfsize_sblks = jfsize_sblks;
-
-    // Clear any existing journal files
-    _jdir.clear_dir();
-//    _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files
-
+    _jdir.clear_dir(); // Clear any existing journal files
     _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
     _linearFileController.pullEmptyFileFromEfp();
-//    std::cout << _linearFileController.status(2);
-//    _wrfc.initialize(_jfsize_sblks);
-//    _rrfc.initialize();
-//    _rrfc.set_findex(0);
-//    _rmgr.initialize(cbp);
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
-
-    // Write info file (<basename>.jinf) to disk
-//    write_infofile();
-
     _init_flag = true;
 }
 
@@ -152,36 +127,14 @@ jcntl::recover(EmptyFilePoolManager* efp
 
     _linearFileController.finalize();
 
-//    _lpmgr.finalize();
-
-//    assert(num_jfiles >= JRNL_MIN_NUM_FILES);
-//    assert(num_jfiles <= JRNL_MAX_NUM_FILES);
-//    assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
-//    assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
-//    _jfsize_sblks = jfsize_sblks;
-
     // Verify journal dir and journal files
     _jdir.verify_dir();
-//    _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/);
-
-//    rcvr_janalyze(prep_txn_list_ptr, efpm);
-    efpIdentity_t efpIdentity;
     _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
 
     highest_rid = _recoveryManager.getHighestRecordId();
-//    if (_rcvdat._jfull)
-//        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
     _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true));
-
-//    _lpmgr.recover(_rcvdat, this, &new_fcntl);
-
     _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
-//    _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber());
     _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController);
-//    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
-//    _rrfc.initialize();
-//    _rrfc.set_findex(_rcvdat.ffid());
-//    _rmgr.initialize(cbp);
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
             (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
 
@@ -194,12 +147,6 @@ jcntl::recover_complete()
 {
     if (!_readonly_flag)
         throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
-//    for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++)
-//        _lpmgr.get_fcntlp(i)->reset(&_rcvdat);
-//    _wrfc.initialize(_jfsize_sblks, &_rcvdat);
-//    _rrfc.initialize();
-//    _rrfc.set_findex(_rcvdat.ffid());
-//    _rmgr.recover_complete();
     _readonly_flag = false;
 }
 
@@ -207,7 +154,7 @@ void
 jcntl::delete_jrnl_files()
 {
     stop(true); // wait for AIO to complete
-    _linearFileController.purgeFilesToEfp();
+    _linearFileController.purgeEmptyFilesToEfp();
     _jdir.delete_dir();
 }
 
@@ -288,73 +235,10 @@ jcntl::read_data_record(void** const dat
                         bool ignore_pending_txns)
 {
     check_rstatus("read_data");
-    if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns))
+    if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) {
         return RHM_IORES_SUCCESS;
-    return RHM_IORES_EMPTY;
-/*
-    if (!dtokp->is_readable()) {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        oss << "dtok_id=0x" << std::setw(8) << dtokp->id();
-        oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
-        oss << "; dtok_wstate=" << dtokp->wstate_str();
-        throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record");
-    }
-    std::vector<uint64_t> ridl;
-    _emap.rid_list(ridl);
-    enq_map::emap_data_struct_t eds;
-    for (std::vector<uint64_t>::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) {
-        short res = _emap.get_data(*i, eds);
-        if (res == enq_map::EMAP_OK) {
-            std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary);
-            if (!ifs.good()) {
-                std::ostringstream oss;
-                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
-                throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record");
-            }
-            ifs.seekg(eds._file_posn, std::ifstream::beg);
-            ::enq_hdr_t eh;
-            ifs.read((char*)&eh, sizeof(::enq_hdr_t));
-            if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) {
-                std::ostringstream oss;
-                oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn;
-                throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record");
-            }
-            dsize = eh._dsize;
-            xidsize = eh._xidsize;
-            transient = ::is_enq_transient(&eh);
-            external = ::is_enq_external(&eh);
-            if (xidsize) {
-                *xidpp = ::malloc(xidsize);
-                ifs.read((char*)(*xidpp), xidsize);
-            } else {
-                *xidpp = 0;
-            }
-            if (dsize) {
-                *datapp = ::malloc(dsize);
-                ifs.read((char*)(*datapp), dsize);
-            } else {
-                *datapp = 0;
-            }
-        }
     }
-*/
-/*
-    check_rstatus("read_data");
-    iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
-    if (res == RHM_IORES_RCINVALID)
-    {
-        get_wr_events(0); // check for outstanding write events
-        iores sres = _rmgr.synchronize(); // flushes all outstanding read events
-        if (sres != RHM_IORES_SUCCESS)
-            return sres;
-        // TODO: Does linear store need this?
-//        _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
-        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
-    }
-    return res;
-*/
-    return RHM_IORES_SUCCESS;
+    return RHM_IORES_EMPTY;
 }
 
 iores

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Nov  4 22:15:14 2013
@@ -53,7 +53,6 @@ const uint32_t jerrno::JERR_JCNTL_NOTREC
 const uint32_t jerrno::JERR_JCNTL_ENQSTATE       = 0x0207;
 const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR  = 0x0208;
 
-
 // class jdir
 const uint32_t jerrno::JERR_JDIR_NOTDIR          = 0x0300;
 const uint32_t jerrno::JERR_JDIR_MKDIR           = 0x0301;
@@ -89,16 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQRIDN
 const uint32_t jerrno::JERR_WMGR_BADFH           = 0x0806;
 
 // class RecoveryManager
-const uint32_t jerrno::JERR_RCVM_OPENRD          = 0x0900;
-const uint32_t jerrno::JERR_RCVM_READ            = 0x0901;
-const uint32_t jerrno::JERR_RCVM_WRITE           = 0x0902;
-
-//// class rmgr
-//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC    = 0x0900;
-//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH     = 0x0901;
-////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH   = 0x0902;
-//const uint32_t jerrno::JERR_RMGR_ENQSTATE        = 0x0903;
-//const uint32_t jerrno::JERR_RMGR_BADRECTYPE      = 0x0904;
+const uint32_t jerrno::JERR_RCVM_OPENRD          = 0x0900;         ///< Unable to open file for read
+const uint32_t jerrno::JERR_RCVM_STREAMBAD       = 0x0901;      ///< Read/write stream error
+const uint32_t jerrno::JERR_RCVM_READ            = 0x0902;           ///< Read error: no or insufficient data to read
+const uint32_t jerrno::JERR_RCVM_WRITE           = 0x0903;          ///< Write error
+const uint32_t jerrno::JERR_RCVM_NULLXID         = 0x0904;        ///< Null XID when XID length non-null in header
 
 // class data_tok
 const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE    = 0x0a00;
@@ -184,15 +178,11 @@ jerrno::__init()
     _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle.";
 
     // class RecoveryManager
-    _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write";
-    _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read";
+    _err_map[JERR_RCVM_OPENRD] = "JERR_RCVM_OPENRD: Unable to open file for read";
+    _err_map[JERR_RCVM_STREAMBAD] = "JERR_RCVM_STREAMBAD: Read/write stream error";
+    _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read";
     _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
-//    // class rmgr
-//    _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
-//    _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID";
-//    //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc";
-//    _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ";
-//    _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type";
+    _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header";
 
     // class data_tok
     _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Nov  4 22:15:14 2013
@@ -50,40 +50,40 @@ namespace qls_jrnl
 
     public:
         // generic errors
-        static const uint32_t JERR__MALLOC;            ///< Buffer memory allocation failed
-        static const uint32_t JERR__UNDERFLOW;         ///< Underflow error
-        static const uint32_t JERR__NINIT;             ///< Operation on uninitialized class
-        static const uint32_t JERR__AIO;               ///< AIO failure
-        static const uint32_t JERR__FILEIO;            ///< File read or write failure
-        static const uint32_t JERR__RTCLOCK;           ///< Reading real-time clock failed
-        static const uint32_t JERR__PTHREAD;           ///< pthread failure
-        static const uint32_t JERR__TIMEOUT;           ///< Timeout waiting for an event
-        static const uint32_t JERR__UNEXPRESPONSE;     ///< Unexpected response to call or event
-        static const uint32_t JERR__RECNFOUND;         ///< Record not found
-        static const uint32_t JERR__NOTIMPL;           ///< Not implemented
-        static const uint32_t JERR__NULL;              ///< Operation on null pointer
+        static const uint32_t JERR__MALLOC;             ///< Buffer memory allocation failed
+        static const uint32_t JERR__UNDERFLOW;          ///< Underflow error
+        static const uint32_t JERR__NINIT;              ///< Operation on uninitialized class
+        static const uint32_t JERR__AIO;                ///< AIO failure
+        static const uint32_t JERR__FILEIO;             ///< File read or write failure
+        static const uint32_t JERR__RTCLOCK;            ///< Reading real-time clock failed
+        static const uint32_t JERR__PTHREAD;            ///< pthread failure
+        static const uint32_t JERR__TIMEOUT;            ///< Timeout waiting for an event
+        static const uint32_t JERR__UNEXPRESPONSE;      ///< Unexpected response to call or event
+        static const uint32_t JERR__RECNFOUND;          ///< Record not found
+        static const uint32_t JERR__NOTIMPL;            ///< Not implemented
+        static const uint32_t JERR__NULL;               ///< Operation on null pointer
 
         // class jcntl
-        static const uint32_t JERR_JCNTL_STOPPED;      ///< Operation on stopped journal
-        static const uint32_t JERR_JCNTL_READONLY;     ///< Write operation on read-only journal
-        static const uint32_t JERR_JCNTL_AIOCMPLWAIT;  ///< Timeout waiting for AIOs to complete
-        static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
-        static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
-        static const uint32_t JERR_JCNTL_ENQSTATE;     ///< Read error: Record not in ENQ state
-        static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header
+        static const uint32_t JERR_JCNTL_STOPPED;       ///< Operation on stopped journal
+        static const uint32_t JERR_JCNTL_READONLY;      ///< Write operation on read-only journal
+        static const uint32_t JERR_JCNTL_AIOCMPLWAIT;   ///< Timeout waiting for AIOs to complete
+        static const uint32_t JERR_JCNTL_UNKNOWNMAGIC;  ///< Found record with unknown magic
+        static const uint32_t JERR_JCNTL_NOTRECOVERED;  ///< Req' recover() to be called first
+        static const uint32_t JERR_JCNTL_ENQSTATE;      ///< Read error: Record not in ENQ state
+        static const uint32_t JERR_JCNTL_INVALIDENQHDR; ///< Invalid ENQ header
 
         // class jdir
-        static const uint32_t JERR_JDIR_NOTDIR;        ///< Exists but is not a directory
-        static const uint32_t JERR_JDIR_MKDIR;         ///< Directory creation failed
-        static const uint32_t JERR_JDIR_OPENDIR;       ///< Directory open failed
-        static const uint32_t JERR_JDIR_READDIR;       ///< Directory read failed
-        static const uint32_t JERR_JDIR_CLOSEDIR;      ///< Directory close failed
-        static const uint32_t JERR_JDIR_RMDIR;         ///< Directory delete failed
-        static const uint32_t JERR_JDIR_NOSUCHFILE;    ///< File does not exist
-        static const uint32_t JERR_JDIR_FMOVE;         ///< File move failed
-        static const uint32_t JERR_JDIR_STAT;          ///< File stat failed
-        static const uint32_t JERR_JDIR_UNLINK;        ///< File delete failed
-        static const uint32_t JERR_JDIR_BADFTYPE;      ///< Bad or unknown file type (stat mode)
+        static const uint32_t JERR_JDIR_NOTDIR;         ///< Exists but is not a directory
+        static const uint32_t JERR_JDIR_MKDIR;          ///< Directory creation failed
+        static const uint32_t JERR_JDIR_OPENDIR;        ///< Directory open failed
+        static const uint32_t JERR_JDIR_READDIR;        ///< Directory read failed
+        static const uint32_t JERR_JDIR_CLOSEDIR;       ///< Directory close failed
+        static const uint32_t JERR_JDIR_RMDIR;          ///< Directory delete failed
+        static const uint32_t JERR_JDIR_NOSUCHFILE;     ///< File does not exist
+        static const uint32_t JERR_JDIR_FMOVE;          ///< File move failed
+        static const uint32_t JERR_JDIR_STAT;           ///< File stat failed
+        static const uint32_t JERR_JDIR_UNLINK;         ///< File delete failed
+        static const uint32_t JERR_JDIR_BADFTYPE;       ///< Bad or unknown file type (stat mode)
 
         // class JournalFile
         static const uint32_t JERR_JNLF_OPEN;           ///< Unable to open file for write
@@ -92,47 +92,42 @@ namespace qls_jrnl
         static const uint32_t JERR_JNLF_CMPLOFFSOVFL;   ///< Increased cmpl offs past subm offs
 
         // class LinearFileController
-        static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found
+        static const uint32_t JERR_LFCR_SEQNUMNOTFOUND; ///< File sequence number not found
 
         // class jrec, enq_rec, deq_rec, txn_rec
-        static const uint32_t JERR_JREC_BADRECHDR;     ///< Invalid data record header
-        static const uint32_t JERR_JREC_BADRECTAIL;    ///< Invalid data record tail
+        static const uint32_t JERR_JREC_BADRECHDR;      ///< Invalid data record header
+        static const uint32_t JERR_JREC_BADRECTAIL;     ///< Invalid data record tail
 
         // class wmgr
-        static const uint32_t JERR_WMGR_BADPGSTATE;    ///< Page buffer in illegal state.
-        static const uint32_t JERR_WMGR_BADDTOKSTATE;  ///< Data token in illegal state.
-        static const uint32_t JERR_WMGR_ENQDISCONT;    ///< Enq. new dtok when previous part compl.
-        static const uint32_t JERR_WMGR_DEQDISCONT;    ///< Deq. new dtok when previous part compl.
-        static const uint32_t JERR_WMGR_DEQRIDNOTENQ;  ///< Deq. rid not enqueued
-        static const uint32_t JERR_WMGR_BADFH;         ///< Bad file handle
+        static const uint32_t JERR_WMGR_BADPGSTATE;     ///< Page buffer in illegal state.
+        static const uint32_t JERR_WMGR_BADDTOKSTATE;   ///< Data token in illegal state.
+        static const uint32_t JERR_WMGR_ENQDISCONT;     ///< Enq. new dtok when previous part compl.
+        static const uint32_t JERR_WMGR_DEQDISCONT;     ///< Deq. new dtok when previous part compl.
+        static const uint32_t JERR_WMGR_DEQRIDNOTENQ;   ///< Deq. rid not enqueued
+        static const uint32_t JERR_WMGR_BADFH;          ///< Bad file handle
 
         // class RecoveryManager
-        static const uint32_t JERR_RCVM_OPENRD;       ///< Unable to open file for read
-        static const uint32_t JERR_RCVM_READ;         ///< Read error: no or insufficient data to read
+        static const uint32_t JERR_RCVM_OPENRD;         ///< Unable to open file for read
+        static const uint32_t JERR_RCVM_STREAMBAD;      ///< Read/write stream error
+        static const uint32_t JERR_RCVM_READ;           ///< Read error: no or insufficient data to read
         static const uint32_t JERR_RCVM_WRITE;          ///< Write error
-
-//        // class rmgr
-//        static const uint32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic
-//        static const uint32_t JERR_RMGR_RIDMISMATCH;   ///< RID mismatch between rec and dtok
-//        //static const uint32_t JERR_RMGR_FIDMISMATCH;   ///< FID mismatch between emap and rrfc
-//        static const uint32_t JERR_RMGR_ENQSTATE;      ///< Attempted read when wstate not ENQ
-//        static const uint32_t JERR_RMGR_BADRECTYPE;    ///< Attempted op on incorrect rec type
+        static const uint32_t JERR_RCVM_NULLXID;        ///< Null XID when XID length non-null in header
 
         // class data_tok
-        static const uint32_t JERR_DTOK_ILLEGALSTATE;  ///< Attempted to change to illegal state
-//         static const uint32_t JERR_DTOK_RIDNOTSET;     ///< Record ID not set
+        static const uint32_t JERR_DTOK_ILLEGALSTATE;   ///< Attempted to change to illegal state
+//         static const uint32_t JERR_DTOK_RIDNOTSET;   ///< Record ID not set
 
         // class enq_map, txn_map
-        static const uint32_t JERR_MAP_DUPLICATE;      ///< Attempted to insert using duplicate key
-        static const uint32_t JERR_MAP_NOTFOUND;       ///< Key not found in map
-        static const uint32_t JERR_MAP_LOCKED;         ///< rid locked by pending txn
+        static const uint32_t JERR_MAP_DUPLICATE;       ///< Attempted to insert using duplicate key
+        static const uint32_t JERR_MAP_NOTFOUND;        ///< Key not found in map
+        static const uint32_t JERR_MAP_LOCKED;          ///< rid locked by pending txn
 
         // EFP errors
         static const uint32_t JERR_EFP_BADPARTITIONNAME;  ///< Partition name invalid or of value 0
-        static const uint32_t JERR_EFP_BADEFPDIRNAME;     ///< Empty File Pool directory name invalid
-        static const uint32_t JERR_EFP_BADPARTITIONDIR;   ///< Invalid partition directory
-        static const uint32_t JERR_EFP_NOEFP;             ///< No EFP found for given partition and file size
-        static const uint32_t JERR_EFP_EMPTY;             ///< EFP empty
+        static const uint32_t JERR_EFP_BADEFPDIRNAME;   ///< Empty File Pool directory name invalid
+        static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
+        static const uint32_t JERR_EFP_NOEFP;           ///< No EFP found for given partition and file size
+        static const uint32_t JERR_EFP_EMPTY;           ///< EFP empty
 
         // Negative returns for some functions
         static const int32_t AIO_TIMEOUT;               ///< Timeout waiting for AIO return

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Mon Nov  4 22:15:14 2013
@@ -39,14 +39,19 @@ int16_t txn_map::TMAP_OK = 0;
 int16_t txn_map::TMAP_NOT_SYNCED = 0;
 int16_t txn_map::TMAP_SYNCED = 1;
 
-txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
-		const bool enq_flag, const bool commit_flag):
-        _rid(rid),
-        _drid(drid),
-        _pfid(pfid),
-        _enq_flag(enq_flag),
-        _commit_flag(commit_flag),
-        _aio_compl(false)
+txn_data_t::txn_data_t(const uint64_t rid,
+                       const uint64_t drid,
+                       const uint16_t pfid,
+                       const uint64_t foffs,
+                       const bool enq_flag,
+                       const bool commit_flag):
+        rid_(rid),
+        drid_(drid),
+        pfid_(pfid),
+        foffs_(foffs),
+        enq_flag_(enq_flag),
+        commit_flag_(commit_flag),
+        aio_compl_(false)
 {}
 
 txn_map::txn_map():
@@ -56,24 +61,8 @@ txn_map::txn_map():
 
 txn_map::~txn_map() {}
 
-/*
-void
-txn_map::set_num_jfiles(const uint16_t num_jfiles)
-{
-    _pfid_txn_cnt.resize(num_jfiles, 0);
-}
-*/
-
-/*
-uint32_t
-txn_map::get_txn_pfid_cnt(const uint16_t pfid) const
-{
-    return _pfid_txn_cnt.at(pfid);
-}
-*/
-
 bool
-txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
+txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td)
 {
     bool ok = true;
     slock s(_mutex);
@@ -88,7 +77,6 @@ txn_map::insert_txn_data(const std::stri
     }
     else
         itr->second.push_back(td);
-//    _pfid_txn_cnt.at(td._pfid)++;
     return ok;
 }
 
@@ -117,8 +105,6 @@ txn_map::get_remove_tdata_list(const std
         return _empty_data_list;
     txn_data_list list = itr->second;
     _map.erase(itr);
-//    for (tdl_itr i=list.begin(); i!=list.end(); i++)
-//        _pfid_txn_cnt.at(i->_pfid)--;
     return list;
 }
 
@@ -151,7 +137,7 @@ txn_map::cnt(const bool enq_flag)
     {
         for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
         {
-            if (j->_enq_flag == enq_flag)
+            if (j->enq_flag_ == enq_flag)
                 c++;
         }
     }
@@ -168,7 +154,7 @@ txn_map::is_txn_synced(const std::string
     bool is_synced = true;
     for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
-        if (!litr->_aio_compl)
+        if (!litr->aio_compl_)
         {
             is_synced = false;
             break;
@@ -186,9 +172,9 @@ txn_map::set_aio_compl(const std::string
         return TMAP_XID_NOT_FOUND;
     for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
-        if (litr->_rid == rid)
+        if (litr->rid_ == rid)
         {
-            litr->_aio_compl = true;
+            litr->aio_compl_ = true;
             return TMAP_OK; // rid found
         }
     }
@@ -206,7 +192,7 @@ txn_map::data_exists(const std::string& 
         tdl_itr itr = tdl.begin();
         while (itr != tdl.end() && !found)
         {
-            found = itr->_rid == rid;
+            found = itr->rid_ == rid;
             itr++;
         }
     }
@@ -224,10 +210,10 @@ txn_map::is_enq(const uint64_t rid)
             txn_data_list list = i->second;
             for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
             {
-                if (j->_enq_flag)
-                    found = j->_rid == rid;
+                if (j->enq_flag_)
+                    found = j->rid_ == rid;
                 else
-                    found = j->_drid == rid;
+                    found = j->drid_ == rid;
             }
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Mon Nov  4 22:15:14 2013
@@ -45,19 +45,23 @@ namespace qls_jrnl
     * \brief Struct encapsulating transaction data necessary for processing a transaction
     *     in the journal once it is closed with either a commit or abort.
     */
-    struct txn_data_struct
+    typedef struct txn_data_t
     {
-        uint64_t _rid;      ///< Record id for this operation
-        uint64_t _drid;     ///< Dequeue record id for this operation
-        uint16_t _pfid;     ///< Physical file id, to be used when transferring to emap on commit
-        bool _enq_flag;     ///< If true, enq op, otherwise deq op
-        bool _commit_flag;  ///< (2PC transactions) Records 2PC complete c/a mode
-        bool _aio_compl;    ///< Initially false, set to true when record AIO returns
-        txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid,
-                const bool enq_flag, const bool commit_flag = false);
-    };
-    typedef txn_data_struct txn_data;
-    typedef std::vector<txn_data> txn_data_list;
+        uint64_t rid_;      ///< Record id for this operation
+        uint64_t drid_;     ///< Dequeue record id for this operation
+        uint16_t pfid_;     ///< Physical file id, to be used when transferring to emap on commit
+        uint64_t foffs_;    ///< Offset in file for this record
+        bool enq_flag_;     ///< If true, enq op, otherwise deq op
+        bool commit_flag_;  ///< (2PC transactions) Records 2PC complete c/a mode
+        bool aio_compl_;    ///< Initially false, set to true when record AIO returns
+        txn_data_t(const uint64_t rid,
+                   const uint64_t drid,
+                   const uint16_t pfid,
+                   const uint64_t foffs,
+                   const bool enq_flag,
+                   const bool commit_flag = false);
+    } txn_data_t;
+    typedef std::vector<txn_data_t> txn_data_list;
     typedef txn_data_list::iterator tdl_itr;
 
     /**
@@ -112,16 +116,13 @@ namespace qls_jrnl
 
         xmap _map;
         smutex _mutex;
-//        std::vector<uint32_t> _pfid_txn_cnt;
         const txn_data_list _empty_data_list;
 
     public:
         txn_map();
         virtual ~txn_map();
 
-//        void set_num_jfiles(const uint16_t num_jfiles);
-//        uint32_t get_txn_pfid_cnt(const uint16_t pfid) const;
-        bool insert_txn_data(const std::string& xid, const txn_data& td);
+        bool insert_txn_data(const std::string& xid, const txn_data_t& td);
         const txn_data_list get_tdata_list(const std::string& xid);
         const txn_data_list get_remove_tdata_list(const std::string& xid);
         bool in_map(const std::string& xid);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Nov  4 22:15:14 2013
@@ -28,7 +28,7 @@ void file_hdr_create(file_hdr_t* dest, c
     dest->_fhdr_size_sblks = fhdr_size_sblks;
     dest->_efp_partition = efp_partition;
     dest->_reserved = 0;
-    dest->_file_size_kib = file_size;
+    dest->_data_size_kib = file_size;
     dest->_fro = 0;
     dest->_ts_nsec = 0;
     dest->_ts_sec = 0;
@@ -58,7 +58,7 @@ void file_hdr_copy(file_hdr_t* dest, con
     rec_hdr_copy(&dest->_rhdr, &src->_rhdr);
     dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied?
     dest->_efp_partition = src->_efp_partition;     // Should this be copied?
-    dest->_file_size_kib = src->_file_size_kib;
+    dest->_data_size_kib = src->_data_size_kib;
     dest->_fro = src->_fro;
     dest->_ts_sec = src->_ts_sec;
     dest->_ts_nsec = src->_ts_nsec;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Mon Nov  4 22:15:14 2013
@@ -79,7 +79,7 @@ typedef struct file_hdr_t {
     uint16_t  _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */
     uint16_t  _efp_partition;   /**< EFP Partition number from which this file was obtained */
     uint32_t  _reserved;
-    uint64_t  _file_size_kib;   /**< Size of this file in KiB, excluding header sblk */
+    uint64_t  _data_size_kib;   /**< Size of the data part of this file in KiB. (ie file size excluding file header sblk) */
     uint64_t  _fro;			    /**< First Record Offset (FRO) */
     uint64_t  _ts_sec;		    /**< Time stamp (seconds part) */
     uint64_t  _ts_nsec;		    /**< Time stamp (nanoseconds part) */

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Nov  4 22:15:14 2013
@@ -33,7 +33,7 @@
 #include <sstream>
 #include <stdint.h>
 
-#include <iostream> // DEBUG
+//#include <iostream> // DEBUG
 
 namespace qpid
 {
@@ -99,7 +99,7 @@ wmgr::initialize(aio_callback* const cbp
     if (eo)
     {
         const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS;
-        uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr
+        uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); // exclude file header
         _pg_cntr = data_dblks / wr_pg_size_dblks;
         _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
     }
@@ -115,6 +115,7 @@ wmgr::enqueue(const void* const data_buf
               const bool transient,
               const bool external)
 {
+//std::cout << _lfc.status(10) << std::endl;
     if (xid_len)
         assert(xid_ptr != 0);
 
@@ -160,7 +161,7 @@ wmgr::enqueue(const void* const data_buf
             dtokp->clear_xid();
         _enq_busy = true;
     }
-//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG
+//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG
     bool done = false;
     while (!done)
     {
@@ -193,12 +194,12 @@ wmgr::enqueue(const void* const data_buf
             // message. AIO callbacks will then only process this token when entire message is
             // enqueued.
             _lfc.incrEnqueuedRecordCount(dtokp->fid());
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG
+//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(dtokp->fid()) << std::dec << std::flush; // DEBUG
 
             if (xid_len) // If part of transaction, add to transaction map
             {
                 std::string xid((const char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
+                _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true));
             }
             else
             {
@@ -213,7 +214,7 @@ wmgr::enqueue(const void* const data_buf
 
             done = true;
         } else {
-//std::cout << "$" << std::endl << std::flush; // DEBUG
+//std::cout << "$" << std::flush; // DEBUG
             dtokp->set_wstate(data_tok::ENQ_PART);
         }
 
@@ -313,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp,
                 // If the enqueue is part of a pending txn, it will not yet be in emap
                 _emap.lock(dequeue_rid); // ignore rid not found error
                 std::string xid((const char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
+                _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false));
             }
             else
             {
@@ -427,10 +428,10 @@ wmgr::abort(data_tok* dtokp,
             txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
-				if (!itr->_enq_flag)
-				    _emap.unlock(itr->_drid); // ignore rid not found error
-                if (itr->_enq_flag)
-                    _lfc.decrEnqueuedRecordCount(itr->_pfid);
+				if (!itr->enq_flag_)
+				    _emap.unlock(itr->drid_); // ignore rid not found error
+                if (itr->enq_flag_)
+                    _lfc.decrEnqueuedRecordCount(itr->pfid_);
             }
             std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
             if (!res.second)
@@ -441,9 +442,9 @@ wmgr::abort(data_tok* dtokp,
             }
 
             done = true;
-        }
-        else
+        } else {
             dtokp->set_wstate(data_tok::ABORT_PART);
+        }
 
         file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
         flush_check(res, cont, done, rid);
@@ -525,20 +526,20 @@ wmgr::commit(data_tok* dtokp,
             txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
-                if (itr->_enq_flag) // txn enqueue
+                if (itr->enq_flag_) // txn enqueue
                 {
-                    if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail
+                    if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail
                     {
                         // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                         std::ostringstream oss;
-                        oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
+                        oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
                         throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
                     }
                 }
                 else // txn dequeue
                 {
                     uint64_t fid;
-                    short eres = _emap.get_remove_pfid(itr->_drid, fid, true);
+                    short eres = _emap.get_remove_pfid(itr->drid_, fid, true);
                     if (eres < enq_map::EMAP_OK) // fail
                     {
                         if (eres == enq_map::EMAP_RID_NOT_FOUND)
@@ -566,9 +567,9 @@ wmgr::commit(data_tok* dtokp,
             }
 
             done = true;
-        }
-        else
+        } else {
             dtokp->set_wstate(data_tok::COMMIT_PART);
+        }
 
         file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
         flush_check(res, cont, done, rid);
@@ -585,6 +586,7 @@ wmgr::file_header_check(const uint64_t r
 {
     if (_lfc.isEmpty()) // File never written (i.e. no header or data)
     {
+//std::cout << "e" << std::flush;
         std::size_t fro = 0;
         if (cont) {
             bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file
@@ -608,6 +610,7 @@ wmgr::flush_check(iores& res,
     // Is page is full, flush
     if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS)
     {
+//std::cout << "^" << _pg_offset_dblks << ">=" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << std::flush;
         res = write_flush();
         assert(res == RHM_IORES_SUCCESS);
 
@@ -621,6 +624,7 @@ wmgr::flush_check(iores& res,
         uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
         if (_pg_cntr >= fileSize_pgs)
         {
+//std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush;
             get_next_file();
             if (!done) {
                 cont = true;
@@ -649,7 +653,7 @@ wmgr::write_flush()
     if (_cached_offset_dblks)
     {
         if (_page_cb_arr[_pg_index]._state == AIO_PENDING) {
-//std::cout << "#"; // DEBUG
+//std::cout << "#" << std::flush; // DEBUG
             res = RHM_IORES_PAGE_AIOWAIT;
         } else {
             if (_page_cb_arr[_pg_index]._state != IN_USE)
@@ -688,7 +692,7 @@ void
 wmgr::get_next_file()
 {
     _pg_cntr = 0;
-//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG
+//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
     _lfc.pullEmptyFileFromEfp();
 }
 
@@ -972,6 +976,7 @@ wmgr::dblk_roundup()
     uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS;
     while (_cached_offset_dblks < wdblks)
     {
+//std::cout << "^0x" << std::hex << _cached_offset_dblks << "<0x" << wdblks << std::dec << std::flush;
         void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
         std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
 #ifdef QLS_CLEAN



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message