Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5434C1010F for ; Mon, 4 Nov 2013 22:15:41 +0000 (UTC) Received: (qmail 59470 invoked by uid 500); 4 Nov 2013 22:15:41 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 59426 invoked by uid 500); 4 Nov 2013 22:15:41 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 59419 invoked by uid 99); 4 Nov 2013 22:15:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Nov 2013 22:15:41 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,HK_RANDOM_FROM,T_FRT_PROFILE2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Nov 2013 22:15:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B60D72388999; Mon, 4 Nov 2013 22:15:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1538790 [2/2] - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ jrnl/ jrnl/utils/ Date: Mon, 04 Nov 2013 22:15:15 -0000 To: commits@qpid.apache.org From: kpvdr@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131104221516.B60D72388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Mon Nov 4 22:15:14 2013 @@ -31,6 +31,7 @@ #include "qpid/linearstore/jrnl/enq_rec.h" #include "qpid/linearstore/jrnl/jcfg.h" #include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalFile.h" #include "qpid/linearstore/jrnl/JournalLog.h" #include "qpid/linearstore/jrnl/jrec.h" #include "qpid/linearstore/jrnl/LinearFileController.h" @@ -62,7 +63,7 @@ RecoveryManager::RecoveryManager(const s highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), - fileSize_kib_(0) + efpFileSize_kib_(0) {} RecoveryManager::~RecoveryManager() {} @@ -74,18 +75,19 @@ void RecoveryManager::analyzeJournals(co efpIdentity_t efpIdentity; analyzeJournalFileHeaders(efpIdentity); *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); - fileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); + efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); + // Check for file full condition lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; - // Restore all read and write pointers and transactions if (!journalEmptyFlag_) { - while (getNextRecordHeader()) { - } + // Read all records, establish remaining enqueued records + while (getNextRecordHeader()) {} if (inFileStream_.is_open()) { inFileStream_.close(); } + // Remove leading files which have no enqueued records removeEmptyFiles(*emptyFilePoolPtrPtr); @@ -100,14 +102,14 @@ void RecoveryManager::analyzeJournals(co txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found // Unlock any affected enqueues in emap for (tdl_itr i=tdl.begin(); i_enq_flag) { // enq op - decrement enqueue count - enqueueCountList_[i->_pfid]--; - } else if (enqueueMapRef_.is_enqueued(i->_drid, true)) { // deq op - unlock enq record - int16_t ret = enqueueMapRef_.unlock(i->_drid); + if (i->enq_flag_) { // enq op - decrement enqueue count + fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); + } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record + int16_t ret = enqueueMapRef_.unlock(i->drid_); if (ret < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND std::ostringstream oss; - oss << std::hex << "_emap.unlock(): drid=0x\"" << i->_drid; + oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "analyzeJournals"); } } @@ -115,7 +117,10 @@ void RecoveryManager::analyzeJournals(co } } } + + // Set up recordIdList_ from enqueue map enqueueMapRef_.rid_list(recordIdList_); + recordIdListConstItr_ = recordIdList_.begin(); } } @@ -144,19 +149,13 @@ bool RecoveryManager::readNextRemainingR bool& external, data_tok* const dtokp, bool /*ignore_pending_txns*/) { - if (!dtokp->is_readable()) { - std::ostringstream oss; - oss << std::hex << std::setfill('0') << "dtok_id=0x" << std::setw(8) << dtokp->id(); - oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid() << "; dtok_wstate=" << dtokp->wstate_str(); - throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "RecoveryManager", "readNextRemainingRecord"); - } if (recordIdListConstItr_ == recordIdList_.end()) { return false; } enq_map::emap_data_struct_t eds; enqueueMapRef_.get_data(*recordIdListConstItr_, eds); uint64_t fileNumber = eds._pfid; - currentJournalFileConstItr_ = fileNumberNameMap_.find(fileNumber); + currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); getNextFile(false); inFileStream_.seekg(eds._file_posn, std::ifstream::beg); @@ -195,18 +194,26 @@ bool RecoveryManager::readNextRemainingR throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord"); } readJournalData((char*)*dataPtrPtr, dataSize); + + // Set data token + dtokp->set_wstate(data_tok::ENQ); + dtokp->set_rid(enqueueHeader._rhdr._rid); + dtokp->set_dsize(dataSize); + if (xidSize) { + dtokp->set_xid(*xidPtrPtr, xidSize); + } + + ++recordIdListConstItr_; return true; } void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { -//std::cout << "****** RecoveryManager::setLinearFileControllerJournals():" << std::endl; // DEBUG - for (fileNumberNameMapConstItr_t i = fileNumberNameMap_.begin(); i != fileNumberNameMap_.end(); ++i) { + for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file? - endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset - fileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full - (lfcPtr->*fnPtr)(i->second, i->first, fileSize_kib_, fileDblkCount); -//std::cout << " ** f=" << i->second.substr(i->second.rfind('/')+1) << ",fn=" << i->first << ",s=" << _fileSize_kib << ",eo=" << fileDblkCount << "(" << (fileDblkCount * QLS_DBLK_SIZE_BYTES / 1024) << "kiB)" << std::endl; // DEBUG + endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset + efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full + (lfcPtr->*fnPtr)(i->second, fileDblkCount); } } @@ -216,14 +223,19 @@ std::string RecoveryManager::toString(co if (compact) { oss << "Recovery journal analysis (jid=\"" << jid << "\"):"; oss << " jfl=["; - for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) { - if (i!=fileNumberNameMap_.begin()) oss << " "; - oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1); + for (fileNumberMapConstItr_t i=fileNumberMap_.begin(); i!=fileNumberMap_.end(); ++i) { + if (i!=fileNumberMap_.begin()) { + oss << " "; + } + std::string fqFileName = i->second->getFqFileName(); + oss << i->first << ":" << fqFileName.substr(fqFileName.rfind('/')+1); } oss << "] ecl=[ "; - for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) { - if (j != enqueueCountList_.begin()) oss << " "; - oss << *j; + for (fileNumberMapConstItr_t j=fileNumberMap_.begin(); j!=fileNumberMap_.end(); ++j) { + if (j!=fileNumberMap_.begin()) { + oss << " "; + } + oss << j->second->getEnqueuedRecordCount(); } oss << " ] empty=" << (journalEmptyFlag_ ? "T" : "F"); oss << " fro=0x" << std::hex << firstRecordOffset_ << std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)"; @@ -233,15 +245,18 @@ std::string RecoveryManager::toString(co oss << " lffull=" << (lastFileFullFlag_ ? "T" : "F"); } else { oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl; - oss << " Number of journal files = " << fileNumberNameMap_.size() << std::endl; + oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; oss << " Journal File List:" << std::endl; - for (fileNumberNameMapConstItr_t i=fileNumberNameMap_.begin(); i!=fileNumberNameMap_.end(); ++i) { - oss << " " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl; + for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { + std::string fqFileName = k->second->getFqFileName(); + oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; } oss << " Enqueue Counts: [ " << std::endl; - for (enqueueCountListConstItr_t j = enqueueCountList_.begin(); j!=enqueueCountList_.end(); ++j) { - if (j != enqueueCountList_.begin()) oss << ", "; - oss << *j; + for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) { + if (l != fileNumberMap_.begin()) { + oss << ", "; + } + oss << l->second->getEnqueuedRecordCount(); } oss << " ]" << std::endl; oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; @@ -271,21 +286,26 @@ void RecoveryManager::analyzeJournalFile oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); } else { - fileNumberNameMap_[fileHeader._file_number] = *i; + JournalFile* jfp = new JournalFile(*i, fileHeader); + fileNumberMap_[fileHeader._file_number] = jfp; if (fileHeader._file_number > highestFileNumber_) { highestFileNumber_ = fileHeader._file_number; } } } - efpIdentity.first = fileHeader._efp_partition; - efpIdentity.second = fileHeader._file_size_kib; - enqueueCountList_.resize(fileNumberNameMap_.size(), 0); - currentJournalFileConstItr_ = fileNumberNameMap_.begin(); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = fileHeader._data_size_kib; + currentJournalFileConstItr_ = fileNumberMap_.begin(); } void RecoveryManager::checkFileStreamOk(bool checkEof) { if (inFileStream_.fail() || inFileStream_.bad() || checkEof ? inFileStream_.eof() : false) { - throw jexception("read failure"); // TODO complete exception + std::ostringstream oss; + oss << "Stream status: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + if (checkEof) { + oss << " eof=" << (inFileStream_.eof()?"T":"F"); + } + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "checkFileStreamOk"); } } @@ -339,7 +359,6 @@ bool RecoveryManager::decodeRecord(jrec& ::rec_hdr_t& headerRecord, std::streampos& fileOffset) { -// uint16_t start_fid = getCurrentFileNumber(); std::streampos start_file_offs = fileOffset; if (highestRecordId_ == 0) { @@ -354,14 +373,7 @@ bool RecoveryManager::decodeRecord(jrec& done = record.rcv_decode(headerRecord, &inFileStream_, cumulativeSizeRead); } catch (const jexception& e) { -// TODO - review this logic and tidy up how rd._lfid is assigned. See new jinf.get_end_file() fn. -// Original -// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || -// fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw; -// Tried this, but did not work -// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw; checkJournalAlignment(start_file_offs); -// rd._lfid = start_fid; return false; } if (!done && !getNextFile(false)) { @@ -373,7 +385,7 @@ bool RecoveryManager::decodeRecord(jrec& } std::string RecoveryManager::getCurrentFileName() const { - return currentJournalFileConstItr_->second; + return currentJournalFileConstItr_->second->getFqFileName(); } uint64_t RecoveryManager::getCurrentFileNumber() const { @@ -382,19 +394,21 @@ uint64_t RecoveryManager::getCurrentFile bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { - if (inFileStream_.eof() || !inFileStream_.good()) - { + if (inFileStream_.eof() || !inFileStream_.good()) { inFileStream_.clear(); endOffset_ = inFileStream_.tellg(); // remember file offset before closing - if (endOffset_ == -1) { throw jexception("tellg() failure"); } // Check for error code -1 TODO: compelete exception + if (endOffset_ == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextFile"); + } inFileStream_.close(); - if (++currentJournalFileConstItr_ == fileNumberNameMap_.end()) { + if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { return false; } } } - if (!inFileStream_.is_open()) - { + if (!inFileStream_.is_open()) { inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); if (!inFileStream_.good()) { @@ -402,7 +416,6 @@ bool RecoveryManager::getNextFile(bool j } // Read file header -//std::cout << " F" << getCurrentFileNumber() << std::flush; // DEBUG file_hdr_t fhdr; inFileStream_.read((char*)&fhdr, sizeof(fhdr)); checkFileStreamOk(true); @@ -412,7 +425,7 @@ bool RecoveryManager::getNextFile(bool j inFileStream_.seekg(foffs); } else { inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberNameMap_.begin()) { + if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { journalEmptyFlag_ = true; } return false; @@ -436,7 +449,11 @@ bool RecoveryManager::getNextRecordHeade } } file_pos = inFileStream_.tellg(); -//std::cout << " 0x" << std::hex << file_pos << std::dec; // DEBUG + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader"); + } inFileStream_.read((char*)&h, sizeof(rec_hdr_t)); if (inFileStream_.gcount() == sizeof(rec_hdr_t)) { hdr_ok = true; @@ -450,19 +467,21 @@ bool RecoveryManager::getNextRecordHeade switch(h._magic) { case QLS_ENQ_MAGIC: { -//std::cout << ".e" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG enq_rec er; uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(er, cum_size_read, h, file_pos)) { return false; } if (!er.is_transient()) { // Ignore transient msgs - enqueueCountList_[start_fid]++; + fileNumberMap_[start_fid]->incrEnqueuedRecordCount(); if (er.xid_size()) { er.get_xid(&xidp); - if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader"); + } std::string xid((char*)xidp, er.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true)); + transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true)); if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid; @@ -482,7 +501,7 @@ bool RecoveryManager::getNextRecordHeade break; case QLS_DEQ_MAGIC: { -//std::cout << ".d" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG deq_rec dr; uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary if (!decodeRecord(dr, cum_size_read, h, file_pos)) { @@ -492,10 +511,12 @@ bool RecoveryManager::getNextRecordHeade // If the enqueue is part of a pending txn, it will not yet be in emap enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error dr.get_xid(&xidp); - if (xidp != 0) { throw jexception("Null xid with non-null xid_size"); } // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader"); + } std::string xid((char*)xidp, dr.xid_size()); - transactionMapRef_.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false, - dr.is_txn_coml_commit())); + transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos, + false, dr.is_txn_coml_commit())); if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found std::ostringstream oss; oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid(); @@ -505,30 +526,30 @@ bool RecoveryManager::getNextRecordHeade } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error - enqueueCountList_[enq_fid]--; + fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); } } } break; case QLS_TXA_MAGIC: { -//std::cout << ".a" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG txn_rec ar; if (!decodeRecord(ar, cum_size_read, h, file_pos)) { return false; } // Delete this txn from tmap, unlock any locked records in emap ar.get_xid(&xidp); - if (xidp != 0) { - throw jexception("Null xid with non-null xid_size"); // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, ar.xid_size()); txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) { - enqueueCountList_[itr->_pfid]--; + if (itr->enq_flag_) { + fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); } else { - enqueueMapRef_.unlock(itr->_drid); // ignore not found error + enqueueMapRef_.unlock(itr->drid_); // ignore not found error } } std::free(xidp); @@ -536,30 +557,31 @@ bool RecoveryManager::getNextRecordHeade break; case QLS_TXC_MAGIC: { -//std::cout << ".t" << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG txn_rec cr; if (!decodeRecord(cr, cum_size_read, h, file_pos)) { return false; } // Delete this txn from tmap, process records into emap cr.get_xid(&xidp); - if (xidp != 0) { - throw jexception("Null xid with non-null xid_size"); // TODO complete exception + if (xidp == 0) { + throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader"); } std::string xid((char*)xidp, cr.xid_size()); txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) { // txn enqueue - if (enqueueMapRef_.insert_pfid(itr->_rid, itr->_pfid, file_pos) < enq_map::EMAP_OK) { // fail + if (itr->enq_flag_) { // txn enqueue +//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG + if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader"); } } else { // txn dequeue uint64_t enq_fid; - if (enqueueMapRef_.get_remove_pfid(itr->_drid, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error - enqueueCountList_[enq_fid]--; + if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error + fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); } } std::free(xidp); @@ -577,11 +599,11 @@ bool RecoveryManager::getNextRecordHeade } break; case 0: -//std::cout << ".0" << std::endl << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG checkJournalAlignment(file_pos); return false; default: -//std::cout << ".?" << std::endl << std::flush; // DEBUG +//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG // Stop as this is the overwrite boundary. checkJournalAlignment(file_pos); return false; @@ -593,16 +615,24 @@ void RecoveryManager::readJournalData(ch const std::streamsize readSize) { std::streamoff bytesRead = 0; while (bytesRead < readSize) { - if (inFileStream_.eof()) { - getNextFile(false); + std::streampos file_pos = inFileStream_.tellg(); + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData"); } - bool readFitsInFile = inFileStream_.tellg() + readSize <= fileSize_kib_ * 1024; - std::streamoff actualReadSize = readFitsInFile ? readSize : (fileSize_kib_ * 1024) - inFileStream_.tellg(); - inFileStream_.read(target + bytesRead, actualReadSize); - if (inFileStream_.gcount() != actualReadSize) { - throw jexception(); // TODO - proper exception + inFileStream_.read(target + bytesRead, readSize - bytesRead); + std::streamoff thisReadSize = inFileStream_.gcount(); + if (thisReadSize < readSize) { + getNextFile(false); + file_pos = inFileStream_.tellg(); + if (file_pos == -1) { + std::ostringstream oss; + oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F"); + throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "readJournalData"); + } } - bytesRead += actualReadSize; + bytesRead += thisReadSize; } } @@ -633,12 +663,10 @@ void RecoveryManager::readJournalFileHea } void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { - while (enqueueCountList_.front() == 0 && enqueueCountList_.size() > 1) { - fileNumberNameMapItr_t i = fileNumberNameMap_.begin(); -//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; - emptyFilePoolPtr->returnEmptyFile(i->second); - fileNumberNameMap_.erase(i); - enqueueCountList_.pop_front(); + while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { +//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG + emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName()); + fileNumberMap_.erase(fileNumberMap_.begin()->first); } } Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Mon Nov 4 22:15:14 2013 @@ -49,11 +49,9 @@ protected: // Types typedef std::vector directoryList_t; typedef directoryList_t::const_iterator directoryListConstItr_t; - typedef std::map fileNumberNameMap_t; - typedef fileNumberNameMap_t::iterator fileNumberNameMapItr_t; - typedef fileNumberNameMap_t::const_iterator fileNumberNameMapConstItr_t; - typedef std::deque enqueueCountList_t; - typedef enqueueCountList_t::const_iterator enqueueCountListConstItr_t; + typedef std::map fileNumberMap_t; + typedef fileNumberMap_t::iterator fileNumberMapItr_t; + typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t; typedef std::vector recordIdList_t; typedef recordIdList_t::const_iterator recordIdListConstItr_t; @@ -65,8 +63,7 @@ protected: JournalLog& journalLogRef_; // Initial journal analysis data - fileNumberNameMap_t fileNumberNameMap_; ///< File number - name map - enqueueCountList_t enqueueCountList_; ///< Number enqueued records found for each file + fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map bool journalEmptyFlag_; ///< Journal data files empty std::streamoff firstRecordOffset_; ///< First record offset in ffid std::streamoff endOffset_; ///< End offset (first byte past last record) @@ -75,8 +72,8 @@ protected: bool lastFileFullFlag_; ///< Last file is full // State for recovery of individual enqueued records - uint32_t fileSize_kib_; - fileNumberNameMapConstItr_t currentJournalFileConstItr_; + uint32_t efpFileSize_kib_; + fileNumberMapConstItr_t currentJournalFileConstItr_; std::string currentFileName_; std::ifstream inFileStream_; recordIdList_t recordIdList_; Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp Mon Nov 4 22:15:14 2013 @@ -103,35 +103,10 @@ jcntl::initialize(EmptyFilePool* efpp, _tmap.clear(); _linearFileController.finalize(); - -// _lpmgr.finalize(); - - // Set new file geometry parameters -// assert(num_jfiles >= JRNL_MIN_NUM_FILES); -// assert(num_jfiles <= JRNL_MAX_NUM_FILES); -// _emap.set_num_jfiles(num_jfiles); -// _tmap.set_num_jfiles(num_jfiles); - -// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); -// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); -// _jfsize_sblks = jfsize_sblks; - - // Clear any existing journal files - _jdir.clear_dir(); -// _lpmgr.initialize(num_jfiles, ae, ae_max_jfiles, this, &new_fcntl); // Creates new journal files - + _jdir.clear_dir(); // Clear any existing journal files _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); _linearFileController.pullEmptyFileFromEfp(); -// std::cout << _linearFileController.status(2); -// _wrfc.initialize(_jfsize_sblks); -// _rrfc.initialize(); -// _rrfc.set_findex(0); -// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS); - - // Write info file (.jinf) to disk -// write_infofile(); - _init_flag = true; } @@ -152,36 +127,14 @@ jcntl::recover(EmptyFilePoolManager* efp _linearFileController.finalize(); -// _lpmgr.finalize(); - -// assert(num_jfiles >= JRNL_MIN_NUM_FILES); -// assert(num_jfiles <= JRNL_MAX_NUM_FILES); -// assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE); -// assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE); -// _jfsize_sblks = jfsize_sblks; - // Verify journal dir and journal files _jdir.verify_dir(); -// _rcvdat.reset(num_jfiles/*, ae, ae_max_jfiles*/); - -// rcvr_janalyze(prep_txn_list_ptr, efpm); - efpIdentity_t efpIdentity; _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr); highest_rid = _recoveryManager.getHighestRecordId(); -// if (_rcvdat._jfull) -// throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover"); _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, true)); - -// _lpmgr.recover(_rcvdat, this, &new_fcntl); - _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); -// _linearFileController.setFileNumberCounter(_recoveryManager.getHighestFileNumber()); _recoveryManager.setLinearFileControllerJournals(&qpid::qls_jrnl::LinearFileController::addJournalFile, &_linearFileController); -// _wrfc.initialize(_jfsize_sblks, &_rcvdat); -// _rrfc.initialize(); -// _rrfc.set_findex(_rcvdat.ffid()); -// _rmgr.initialize(cbp); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); @@ -194,12 +147,6 @@ jcntl::recover_complete() { if (!_readonly_flag) throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete"); -// for (uint16_t i=0; i<_lpmgr.num_jfiles(); i++) -// _lpmgr.get_fcntlp(i)->reset(&_rcvdat); -// _wrfc.initialize(_jfsize_sblks, &_rcvdat); -// _rrfc.initialize(); -// _rrfc.set_findex(_rcvdat.ffid()); -// _rmgr.recover_complete(); _readonly_flag = false; } @@ -207,7 +154,7 @@ void jcntl::delete_jrnl_files() { stop(true); // wait for AIO to complete - _linearFileController.purgeFilesToEfp(); + _linearFileController.purgeEmptyFilesToEfp(); _jdir.delete_dir(); } @@ -288,73 +235,10 @@ jcntl::read_data_record(void** const dat bool ignore_pending_txns) { check_rstatus("read_data"); - if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) + if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) { return RHM_IORES_SUCCESS; - return RHM_IORES_EMPTY; -/* - if (!dtokp->is_readable()) { - std::ostringstream oss; - oss << std::hex << std::setfill('0'); - oss << "dtok_id=0x" << std::setw(8) << dtokp->id(); - oss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid(); - oss << "; dtok_wstate=" << dtokp->wstate_str(); - throw jexception(jerrno::JERR_JCNTL_ENQSTATE, oss.str(), "jcntl", "read_data_record"); - } - std::vector ridl; - _emap.rid_list(ridl); - enq_map::emap_data_struct_t eds; - for (std::vector::const_iterator i=ridl.begin(); i!=ridl.end(); ++i) { - short res = _emap.get_data(*i, eds); - if (res == enq_map::EMAP_OK) { - std::ifstream ifs(_recoveryManager._fm[eds._pfid].c_str(), std::ifstream::in | std::ifstream::binary); - if (!ifs.good()) { - std::ostringstream oss; - oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn; - throw jexception(jerrno::JERR_RCVM_OPENRD, oss.str(), "jcntl", "read_data_record"); - } - ifs.seekg(eds._file_posn, std::ifstream::beg); - ::enq_hdr_t eh; - ifs.read((char*)&eh, sizeof(::enq_hdr_t)); - if (!::validate_enq_hdr(&eh, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, *i)) { - std::ostringstream oss; - oss << "rid=" << (*i) << " pfid=" << eds._pfid << " file=" << _recoveryManager._fm[eds._pfid] << " file_posn=" << eds._file_posn; - throw jexception(jerrno::JERR_JCNTL_INVALIDENQHDR, oss.str(), "jcntl", "read_data_record"); - } - dsize = eh._dsize; - xidsize = eh._xidsize; - transient = ::is_enq_transient(&eh); - external = ::is_enq_external(&eh); - if (xidsize) { - *xidpp = ::malloc(xidsize); - ifs.read((char*)(*xidpp), xidsize); - } else { - *xidpp = 0; - } - if (dsize) { - *datapp = ::malloc(dsize); - ifs.read((char*)(*datapp), dsize); - } else { - *datapp = 0; - } - } } -*/ -/* - check_rstatus("read_data"); - iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); - if (res == RHM_IORES_RCINVALID) - { - get_wr_events(0); // check for outstanding write events - iores sres = _rmgr.synchronize(); // flushes all outstanding read events - if (sres != RHM_IORES_SUCCESS) - return sres; - // TODO: Does linear store need this? -// _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs - res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns); - } - return res; -*/ - return RHM_IORES_SUCCESS; + return RHM_IORES_EMPTY; } iores Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp Mon Nov 4 22:15:14 2013 @@ -53,7 +53,6 @@ const uint32_t jerrno::JERR_JCNTL_NOTREC const uint32_t jerrno::JERR_JCNTL_ENQSTATE = 0x0207; const uint32_t jerrno::JERR_JCNTL_INVALIDENQHDR = 0x0208; - // class jdir const uint32_t jerrno::JERR_JDIR_NOTDIR = 0x0300; const uint32_t jerrno::JERR_JDIR_MKDIR = 0x0301; @@ -89,16 +88,11 @@ const uint32_t jerrno::JERR_WMGR_DEQRIDN const uint32_t jerrno::JERR_WMGR_BADFH = 0x0806; // class RecoveryManager -const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900; -const uint32_t jerrno::JERR_RCVM_READ = 0x0901; -const uint32_t jerrno::JERR_RCVM_WRITE = 0x0902; - -//// class rmgr -//const uint32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900; -//const uint32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901; -////const uint32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902; -//const uint32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903; -//const uint32_t jerrno::JERR_RMGR_BADRECTYPE = 0x0904; +const uint32_t jerrno::JERR_RCVM_OPENRD = 0x0900; ///< Unable to open file for read +const uint32_t jerrno::JERR_RCVM_STREAMBAD = 0x0901; ///< Read/write stream error +const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read +const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error +const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header // class data_tok const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00; @@ -184,15 +178,11 @@ jerrno::__init() _err_map[JERR_WMGR_BADFH] = "JERR_WMGR_BADFH: Bad file handle."; // class RecoveryManager - _err_map[JERR_RCVM_OPENRD] = "JERR_JCNTL_OPENRD: Unable to open file for write"; - _err_map[JERR_RCVM_READ] = "JERR_JCNTL_READ: Read error: no or insufficient data to read"; + _err_map[JERR_RCVM_OPENRD] = "JERR_RCVM_OPENRD: Unable to open file for read"; + _err_map[JERR_RCVM_STREAMBAD] = "JERR_RCVM_STREAMBAD: Read/write stream error"; + _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read"; _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error"; -// // class rmgr -// _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic."; -// _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: RID mismatch between current record and dtok RID"; -// //_err_map[JERR_RMGR_FIDMISMATCH] = "JERR_RMGR_FIDMISMATCH: FID mismatch between emap and rrfc"; -// _err_map[JERR_RMGR_ENQSTATE] = "JERR_RMGR_ENQSTATE: Attempted read when data token wstate was not ENQ"; -// _err_map[JERR_RMGR_BADRECTYPE] = "JERR_RMGR_BADRECTYPE: Attempted operation on inappropriate record type"; + _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header"; // class data_tok _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state."; Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h Mon Nov 4 22:15:14 2013 @@ -50,40 +50,40 @@ namespace qls_jrnl public: // generic errors - static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed - static const uint32_t JERR__UNDERFLOW; ///< Underflow error - static const uint32_t JERR__NINIT; ///< Operation on uninitialized class - static const uint32_t JERR__AIO; ///< AIO failure - static const uint32_t JERR__FILEIO; ///< File read or write failure - static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed - static const uint32_t JERR__PTHREAD; ///< pthread failure - static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event - static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event - static const uint32_t JERR__RECNFOUND; ///< Record not found - static const uint32_t JERR__NOTIMPL; ///< Not implemented - static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__MALLOC; ///< Buffer memory allocation failed + static const uint32_t JERR__UNDERFLOW; ///< Underflow error + static const uint32_t JERR__NINIT; ///< Operation on uninitialized class + static const uint32_t JERR__AIO; ///< AIO failure + static const uint32_t JERR__FILEIO; ///< File read or write failure + static const uint32_t JERR__RTCLOCK; ///< Reading real-time clock failed + static const uint32_t JERR__PTHREAD; ///< pthread failure + static const uint32_t JERR__TIMEOUT; ///< Timeout waiting for an event + static const uint32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event + static const uint32_t JERR__RECNFOUND; ///< Record not found + static const uint32_t JERR__NOTIMPL; ///< Not implemented + static const uint32_t JERR__NULL; ///< Operation on null pointer // class jcntl - static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal - static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal - static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete - static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic - static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first - static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state - static const uint32_t JERR_JCNTL_INVALIDENQHDR;///< Invalid ENQ header + static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal + static const uint32_t JERR_JCNTL_READONLY; ///< Write operation on read-only journal + static const uint32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete + static const uint32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic + static const uint32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first + static const uint32_t JERR_JCNTL_ENQSTATE; ///< Read error: Record not in ENQ state + static const uint32_t JERR_JCNTL_INVALIDENQHDR; ///< Invalid ENQ header // class jdir - static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory - static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed - static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed - static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed - static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed - static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed - static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist - static const uint32_t JERR_JDIR_FMOVE; ///< File move failed - static const uint32_t JERR_JDIR_STAT; ///< File stat failed - static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed - static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode) + static const uint32_t JERR_JDIR_NOTDIR; ///< Exists but is not a directory + static const uint32_t JERR_JDIR_MKDIR; ///< Directory creation failed + static const uint32_t JERR_JDIR_OPENDIR; ///< Directory open failed + static const uint32_t JERR_JDIR_READDIR; ///< Directory read failed + static const uint32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed + static const uint32_t JERR_JDIR_RMDIR; ///< Directory delete failed + static const uint32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist + static const uint32_t JERR_JDIR_FMOVE; ///< File move failed + static const uint32_t JERR_JDIR_STAT; ///< File stat failed + static const uint32_t JERR_JDIR_UNLINK; ///< File delete failed + static const uint32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode) // class JournalFile static const uint32_t JERR_JNLF_OPEN; ///< Unable to open file for write @@ -92,47 +92,42 @@ namespace qls_jrnl static const uint32_t JERR_JNLF_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs // class LinearFileController - static const uint32_t JERR_LFCR_SEQNUMNOTFOUND;///< File sequence number not found + static const uint32_t JERR_LFCR_SEQNUMNOTFOUND; ///< File sequence number not found // class jrec, enq_rec, deq_rec, txn_rec - static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header - static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail + static const uint32_t JERR_JREC_BADRECHDR; ///< Invalid data record header + static const uint32_t JERR_JREC_BADRECTAIL; ///< Invalid data record tail // class wmgr - static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state. - static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state. - static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl. - static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl. - static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued - static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle + static const uint32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal state. + static const uint32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state. + static const uint32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl. + static const uint32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl. + static const uint32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued + static const uint32_t JERR_WMGR_BADFH; ///< Bad file handle // class RecoveryManager - static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read - static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read + static const uint32_t JERR_RCVM_OPENRD; ///< Unable to open file for read + static const uint32_t JERR_RCVM_STREAMBAD; ///< Read/write stream error + static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read static const uint32_t JERR_RCVM_WRITE; ///< Write error - -// // class rmgr -// static const uint32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic -// static const uint32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok -// //static const uint32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc -// static const uint32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ -// static const uint32_t JERR_RMGR_BADRECTYPE; ///< Attempted op on incorrect rec type + static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header // class data_tok - static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state -// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set + static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state +// static const uint32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set // class enq_map, txn_map - static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key - static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map - static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn + static const uint32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using duplicate key + static const uint32_t JERR_MAP_NOTFOUND; ///< Key not found in map + static const uint32_t JERR_MAP_LOCKED; ///< rid locked by pending txn // EFP errors static const uint32_t JERR_EFP_BADPARTITIONNAME; ///< Partition name invalid or of value 0 - static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid - static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory - static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size - static const uint32_t JERR_EFP_EMPTY; ///< EFP empty + static const uint32_t JERR_EFP_BADEFPDIRNAME; ///< Empty File Pool directory name invalid + static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory + static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size + static const uint32_t JERR_EFP_EMPTY; ///< EFP empty // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp Mon Nov 4 22:15:14 2013 @@ -39,14 +39,19 @@ int16_t txn_map::TMAP_OK = 0; int16_t txn_map::TMAP_NOT_SYNCED = 0; int16_t txn_map::TMAP_SYNCED = 1; -txn_data_struct::txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid, - const bool enq_flag, const bool commit_flag): - _rid(rid), - _drid(drid), - _pfid(pfid), - _enq_flag(enq_flag), - _commit_flag(commit_flag), - _aio_compl(false) +txn_data_t::txn_data_t(const uint64_t rid, + const uint64_t drid, + const uint16_t pfid, + const uint64_t foffs, + const bool enq_flag, + const bool commit_flag): + rid_(rid), + drid_(drid), + pfid_(pfid), + foffs_(foffs), + enq_flag_(enq_flag), + commit_flag_(commit_flag), + aio_compl_(false) {} txn_map::txn_map(): @@ -56,24 +61,8 @@ txn_map::txn_map(): txn_map::~txn_map() {} -/* -void -txn_map::set_num_jfiles(const uint16_t num_jfiles) -{ - _pfid_txn_cnt.resize(num_jfiles, 0); -} -*/ - -/* -uint32_t -txn_map::get_txn_pfid_cnt(const uint16_t pfid) const -{ - return _pfid_txn_cnt.at(pfid); -} -*/ - bool -txn_map::insert_txn_data(const std::string& xid, const txn_data& td) +txn_map::insert_txn_data(const std::string& xid, const txn_data_t& td) { bool ok = true; slock s(_mutex); @@ -88,7 +77,6 @@ txn_map::insert_txn_data(const std::stri } else itr->second.push_back(td); -// _pfid_txn_cnt.at(td._pfid)++; return ok; } @@ -117,8 +105,6 @@ txn_map::get_remove_tdata_list(const std return _empty_data_list; txn_data_list list = itr->second; _map.erase(itr); -// for (tdl_itr i=list.begin(); i!=list.end(); i++) -// _pfid_txn_cnt.at(i->_pfid)--; return list; } @@ -151,7 +137,7 @@ txn_map::cnt(const bool enq_flag) { for (tdl_itr j = i->second.begin(); j < i->second.end(); j++) { - if (j->_enq_flag == enq_flag) + if (j->enq_flag_ == enq_flag) c++; } } @@ -168,7 +154,7 @@ txn_map::is_txn_synced(const std::string bool is_synced = true; for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) { - if (!litr->_aio_compl) + if (!litr->aio_compl_) { is_synced = false; break; @@ -186,9 +172,9 @@ txn_map::set_aio_compl(const std::string return TMAP_XID_NOT_FOUND; for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++) { - if (litr->_rid == rid) + if (litr->rid_ == rid) { - litr->_aio_compl = true; + litr->aio_compl_ = true; return TMAP_OK; // rid found } } @@ -206,7 +192,7 @@ txn_map::data_exists(const std::string& tdl_itr itr = tdl.begin(); while (itr != tdl.end() && !found) { - found = itr->_rid == rid; + found = itr->rid_ == rid; itr++; } } @@ -224,10 +210,10 @@ txn_map::is_enq(const uint64_t rid) txn_data_list list = i->second; for (tdl_itr j = list.begin(); j < list.end() && !found; j++) { - if (j->_enq_flag) - found = j->_rid == rid; + if (j->enq_flag_) + found = j->rid_ == rid; else - found = j->_drid == rid; + found = j->drid_ == rid; } } } Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h Mon Nov 4 22:15:14 2013 @@ -45,19 +45,23 @@ namespace qls_jrnl * \brief Struct encapsulating transaction data necessary for processing a transaction * in the journal once it is closed with either a commit or abort. */ - struct txn_data_struct + typedef struct txn_data_t { - uint64_t _rid; ///< Record id for this operation - uint64_t _drid; ///< Dequeue record id for this operation - uint16_t _pfid; ///< Physical file id, to be used when transferring to emap on commit - bool _enq_flag; ///< If true, enq op, otherwise deq op - bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode - bool _aio_compl; ///< Initially false, set to true when record AIO returns - txn_data_struct(const uint64_t rid, const uint64_t drid, const uint16_t pfid, - const bool enq_flag, const bool commit_flag = false); - }; - typedef txn_data_struct txn_data; - typedef std::vector txn_data_list; + uint64_t rid_; ///< Record id for this operation + uint64_t drid_; ///< Dequeue record id for this operation + uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit + uint64_t foffs_; ///< Offset in file for this record + bool enq_flag_; ///< If true, enq op, otherwise deq op + bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode + bool aio_compl_; ///< Initially false, set to true when record AIO returns + txn_data_t(const uint64_t rid, + const uint64_t drid, + const uint16_t pfid, + const uint64_t foffs, + const bool enq_flag, + const bool commit_flag = false); + } txn_data_t; + typedef std::vector txn_data_list; typedef txn_data_list::iterator tdl_itr; /** @@ -112,16 +116,13 @@ namespace qls_jrnl xmap _map; smutex _mutex; -// std::vector _pfid_txn_cnt; const txn_data_list _empty_data_list; public: txn_map(); virtual ~txn_map(); -// void set_num_jfiles(const uint16_t num_jfiles); -// uint32_t get_txn_pfid_cnt(const uint16_t pfid) const; - bool insert_txn_data(const std::string& xid, const txn_data& td); + bool insert_txn_data(const std::string& xid, const txn_data_t& td); const txn_data_list get_tdata_list(const std::string& xid); const txn_data_list get_remove_tdata_list(const std::string& xid); bool in_map(const std::string& xid); Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c Mon Nov 4 22:15:14 2013 @@ -28,7 +28,7 @@ void file_hdr_create(file_hdr_t* dest, c dest->_fhdr_size_sblks = fhdr_size_sblks; dest->_efp_partition = efp_partition; dest->_reserved = 0; - dest->_file_size_kib = file_size; + dest->_data_size_kib = file_size; dest->_fro = 0; dest->_ts_nsec = 0; dest->_ts_sec = 0; @@ -58,7 +58,7 @@ void file_hdr_copy(file_hdr_t* dest, con rec_hdr_copy(&dest->_rhdr, &src->_rhdr); dest->_fhdr_size_sblks = src->_fhdr_size_sblks; // Should this be copied? dest->_efp_partition = src->_efp_partition; // Should this be copied? - dest->_file_size_kib = src->_file_size_kib; + dest->_data_size_kib = src->_data_size_kib; dest->_fro = src->_fro; dest->_ts_sec = src->_ts_sec; dest->_ts_nsec = src->_ts_nsec; Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h Mon Nov 4 22:15:14 2013 @@ -79,7 +79,7 @@ typedef struct file_hdr_t { uint16_t _fhdr_size_sblks; /**< File header size in sblks (defined by JRNL_SBLK_SIZE) */ uint16_t _efp_partition; /**< EFP Partition number from which this file was obtained */ uint32_t _reserved; - uint64_t _file_size_kib; /**< Size of this file in KiB, excluding header sblk */ + uint64_t _data_size_kib; /**< Size of the data part of this file in KiB. (ie file size excluding file header sblk) */ uint64_t _fro; /**< First Record Offset (FRO) */ uint64_t _ts_sec; /**< Time stamp (seconds part) */ uint64_t _ts_nsec; /**< Time stamp (nanoseconds part) */ Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Mon Nov 4 22:15:14 2013 @@ -33,7 +33,7 @@ #include #include -#include // DEBUG +//#include // DEBUG namespace qpid { @@ -99,7 +99,7 @@ wmgr::initialize(aio_callback* const cbp if (eo) { const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS; - uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr + uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); // exclude file header _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); } @@ -115,6 +115,7 @@ wmgr::enqueue(const void* const data_buf const bool transient, const bool external) { +//std::cout << _lfc.status(10) << std::endl; if (xid_len) assert(xid_ptr != 0); @@ -160,7 +161,7 @@ wmgr::enqueue(const void* const data_buf dtokp->clear_xid(); _enq_busy = true; } -//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG +//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { @@ -193,12 +194,12 @@ wmgr::enqueue(const void* const data_buf // message. AIO callbacks will then only process this token when entire message is // enqueued. _lfc.incrEnqueuedRecordCount(dtokp->fid()); -//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(dtokp->fid()) << std::dec << std::flush; // DEBUG if (xid_len) // If part of transaction, add to transaction map { std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true)); + _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true)); } else { @@ -213,7 +214,7 @@ wmgr::enqueue(const void* const data_buf done = true; } else { -//std::cout << "$" << std::endl << std::flush; // DEBUG +//std::cout << "$" << std::flush; // DEBUG dtokp->set_wstate(data_tok::ENQ_PART); } @@ -313,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp, // If the enqueue is part of a pending txn, it will not yet be in emap _emap.lock(dequeue_rid); // ignore rid not found error std::string xid((const char*)xid_ptr, xid_len); - _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false)); + _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false)); } else { @@ -427,10 +428,10 @@ wmgr::abort(data_tok* dtokp, txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (!itr->_enq_flag) - _emap.unlock(itr->_drid); // ignore rid not found error - if (itr->_enq_flag) - _lfc.decrEnqueuedRecordCount(itr->_pfid); + if (!itr->enq_flag_) + _emap.unlock(itr->drid_); // ignore rid not found error + if (itr->enq_flag_) + _lfc.decrEnqueuedRecordCount(itr->pfid_); } std::pair::iterator, bool> res = _txn_pending_set.insert(xid); if (!res.second) @@ -441,9 +442,9 @@ wmgr::abort(data_tok* dtokp, } done = true; - } - else + } else { dtokp->set_wstate(data_tok::ABORT_PART); + } file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); flush_check(res, cont, done, rid); @@ -525,20 +526,20 @@ wmgr::commit(data_tok* dtokp, txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) { - if (itr->_enq_flag) // txn enqueue + if (itr->enq_flag_) // txn enqueue { - if (_emap.insert_pfid(itr->_rid, itr->_pfid, 0) < enq_map::EMAP_OK) // fail + if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail { // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. std::ostringstream oss; - oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid; + oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_; throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit"); } } else // txn dequeue { uint64_t fid; - short eres = _emap.get_remove_pfid(itr->_drid, fid, true); + short eres = _emap.get_remove_pfid(itr->drid_, fid, true); if (eres < enq_map::EMAP_OK) // fail { if (eres == enq_map::EMAP_RID_NOT_FOUND) @@ -566,9 +567,9 @@ wmgr::commit(data_tok* dtokp, } done = true; - } - else + } else { dtokp->set_wstate(data_tok::COMMIT_PART); + } file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); flush_check(res, cont, done, rid); @@ -585,6 +586,7 @@ wmgr::file_header_check(const uint64_t r { if (_lfc.isEmpty()) // File never written (i.e. no header or data) { +//std::cout << "e" << std::flush; std::size_t fro = 0; if (cont) { bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file @@ -608,6 +610,7 @@ wmgr::flush_check(iores& res, // Is page is full, flush if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { +//std::cout << "^" << _pg_offset_dblks << ">=" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << std::flush; res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -621,6 +624,7 @@ wmgr::flush_check(iores& res, uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks; if (_pg_cntr >= fileSize_pgs) { +//std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush; get_next_file(); if (!done) { cont = true; @@ -649,7 +653,7 @@ wmgr::write_flush() if (_cached_offset_dblks) { if (_page_cb_arr[_pg_index]._state == AIO_PENDING) { -//std::cout << "#"; // DEBUG +//std::cout << "#" << std::flush; // DEBUG res = RHM_IORES_PAGE_AIOWAIT; } else { if (_page_cb_arr[_pg_index]._state != IN_USE) @@ -688,7 +692,7 @@ void wmgr::get_next_file() { _pg_cntr = 0; -//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG +//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG _lfc.pullEmptyFileFromEfp(); } @@ -972,6 +976,7 @@ wmgr::dblk_roundup() uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { +//std::cout << "^0x" << std::hex << _cached_offset_dblks << "<0x" << wdblks << std::dec << std::flush; void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); #ifdef QLS_CLEAN --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org