Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB44110B75 for ; Wed, 10 Jul 2013 18:21:26 +0000 (UTC) Received: (qmail 88670 invoked by uid 500); 10 Jul 2013 18:21:26 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 88645 invoked by uid 500); 10 Jul 2013 18:21:25 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 88636 invoked by uid 99); 10 Jul 2013 18:21:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jul 2013 18:21:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jul 2013 18:21:22 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4A6A32388BEC; Wed, 10 Jul 2013 18:20:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1501895 [10/10] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ Date: Wed, 10 Jul 2013 18:20:20 -0000 To: commits@qpid.apache.org From: tross@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130710182025.4A6A32388BEC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1501895&view=auto ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (added) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Wed Jul 10 18:20:19 2013 @@ -0,0 +1,1051 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file wmgr.cpp + * + * Qpid asynchronous store plugin library + * + * File containing code for class mrg::journal::wmgr (write manager). See + * comments in file wmgr.h for details. + * + * \author Kim van der Riet + */ + +#include "qpid/legacystore/jrnl/wmgr.h" + +#include +#include +#include +#include +#include "qpid/legacystore/jrnl/file_hdr.h" +#include "qpid/legacystore/jrnl/jcntl.h" +#include "qpid/legacystore/jrnl/jerrno.h" +#include + +namespace mrg +{ +namespace journal +{ + +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc): + pmgr(jc, emap, tmap), + _wrfc(wrfc), + _max_dtokpp(0), + _max_io_wait_us(0), + _fhdr_base_ptr(0), + _fhdr_ptr_arr(0), + _fhdr_aio_cb_arr(0), + _cached_offset_dblks(0), + _jfsize_dblks(0), + _jfsize_pgs(0), + _num_jfiles(0), + _enq_busy(false), + _deq_busy(false), + _abort_busy(false), + _commit_busy(false), + _txn_pending_set() +{} + +wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, + const u_int32_t max_dtokpp, const u_int32_t max_iowait_us): + pmgr(jc, emap, tmap /* , dtoklp */), + _wrfc(wrfc), + _max_dtokpp(max_dtokpp), + _max_io_wait_us(max_iowait_us), + _fhdr_base_ptr(0), + _fhdr_ptr_arr(0), + _fhdr_aio_cb_arr(0), + _cached_offset_dblks(0), + _jfsize_dblks(0), + _jfsize_pgs(0), + _num_jfiles(0), + _enq_busy(false), + _deq_busy(false), + _abort_busy(false), + _commit_busy(false), + _txn_pending_set() +{} + +wmgr::~wmgr() +{ + wmgr::clean(); +} + +void +wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, + const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us, + std::size_t eo) +{ + _enq_busy = false; + _deq_busy = false; + _abort_busy = false; + _commit_busy = false; + _max_dtokpp = max_dtokpp; + _max_io_wait_us = max_iowait_us; + + initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); + + _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE; + _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; + assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); + + if (eo) + { + const u_int32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE; + u_int32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr + _pg_cntr = data_dblks / wr_pg_size_dblks; + _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); + } +} + +iores +wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, + const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr, + const std::size_t xid_len, const bool transient, const bool external) +{ + if (xid_len) + assert(xid_ptr != 0); + + if (_deq_busy || _abort_busy || _commit_busy) + return RHM_IORES_BUSY; + + if (this_data_len != tot_data_len && !external) + return RHM_IORES_NOTIMPL; + + iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external); + if (res != RHM_IORES_SUCCESS) + return res; + + bool cont = false; + if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT + { + if (dtokp->wstate() == data_tok::ENQ_PART) + cont = true; + else + { + std::ostringstream oss; + oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_ENQDISCONT, oss.str(), "wmgr", "enqueue"); + } + } + + u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); + _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient, + external); + if (!cont) + { + dtokp->set_rid(rid); + dtokp->set_dequeue_rid(0); + if (xid_len) + dtokp->set_xid(xid_ptr, xid_len); + else + dtokp->clear_xid(); + _enq_busy = true; + } + bool done = false; + while (!done) + { + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + u_int32_t data_offs_dblks = dtokp->dblocks_written(); + u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks, + (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + + // Remember fid which contains the record header in case record is split over several files + if (data_offs_dblks == 0) + dtokp->set_fid(_wrfc.index()); + _pg_offset_dblks += ret; + _cached_offset_dblks += ret; + dtokp->incr_dblocks_written(ret); + dtokp->incr_pg_cnt(); + _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp); + + // Is the encoding of this record complete? + if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks()) + { + // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. + dtokp->set_wstate(data_tok::ENQ_SUBM); + dtokp->set_dsize(tot_data_len); + // Only add this data token to page token list when submit is complete, this way + // long multi-page messages have their token on the page containing the END of the + // message. AIO callbacks will then only process this token when entire message is + // enqueued. + _wrfc.incr_enqcnt(dtokp->fid()); + + if (xid_len) // If part of transaction, add to transaction map + { + std::string xid((const char*)xid_ptr, xid_len); + _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true)); + } + else + { + if (_emap.insert_pfid(rid, dtokp->fid()) < enq_map::EMAP_OK) // fail + { + // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. + std::ostringstream oss; + oss << std::hex << "rid=0x" << rid << " _pfid=0x" << dtokp->fid(); + throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "enqueue"); + } + } + + done = true; + } + else + dtokp->set_wstate(data_tok::ENQ_PART); + + file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks); + flush_check(res, cont, done); + } + if (dtokp->wstate() >= data_tok::ENQ_SUBM) + _enq_busy = false; + return res; +} + +iores +wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit) +{ + if (xid_len) + assert(xid_ptr != 0); + + if (_enq_busy || _abort_busy || _commit_busy) + return RHM_IORES_BUSY; + + iores res = pre_write_check(WMGR_DEQUEUE, dtokp); + if (res != RHM_IORES_SUCCESS) + return res; + + bool cont = false; + if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT + { + if (dtokp->wstate() == data_tok::DEQ_PART) + cont = true; + else + { + std::ostringstream oss; + oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "dequeue"); + } + } + + const bool ext_rid = dtokp->external_rid(); + u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); + u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid(); + _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit); + if (!cont) + { + if (!ext_rid) + { + dtokp->set_rid(rid); + dtokp->set_dequeue_rid(dequeue_rid); + } + if (xid_len) + dtokp->set_xid(xid_ptr, xid_len); + else + dtokp->clear_xid(); + dequeue_check(dtokp->xid(), dequeue_rid); + dtokp->set_dblocks_written(0); // Reset dblks_written from previous op + _deq_busy = true; + } + bool done = false; + while (!done) + { + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + u_int32_t data_offs_dblks = dtokp->dblocks_written(); + u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks, + (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + + // Remember fid which contains the record header in case record is split over several files + if (data_offs_dblks == 0) + dtokp->set_fid(_wrfc.index()); + _pg_offset_dblks += ret; + _cached_offset_dblks += ret; + dtokp->incr_dblocks_written(ret); + dtokp->incr_pg_cnt(); + _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp); + + // Is the encoding of this record complete? + if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks()) + { + // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. + dtokp->set_wstate(data_tok::DEQ_SUBM); + + if (xid_len) // If part of transaction, add to transaction map + { + // If the enqueue is part of a pending txn, it will not yet be in emap + _emap.lock(dequeue_rid); // ignore rid not found error + std::string xid((const char*)xid_ptr, xid_len); + _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false)); + } + else + { + int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid()); + if (fid < enq_map::EMAP_OK) // fail + { + if (fid == enq_map::EMAP_RID_NOT_FOUND) + { + std::ostringstream oss; + oss << std::hex << "rid=0x" << rid; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); + } + if (fid == enq_map::EMAP_LOCKED) + { + std::ostringstream oss; + oss << std::hex << "rid=0x" << rid; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); + } + } + _wrfc.decr_enqcnt(fid); + } + + done = true; + } + else + dtokp->set_wstate(data_tok::DEQ_PART); + + file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks); + flush_check(res, cont, done); + } + if (dtokp->wstate() >= data_tok::DEQ_SUBM) + _deq_busy = false; + return res; +} + +iores +wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +{ + // commit and abort MUST have a valid xid + assert(xid_ptr != 0 && xid_len > 0); + + if (_enq_busy || _deq_busy || _commit_busy) + return RHM_IORES_BUSY; + + iores res = pre_write_check(WMGR_ABORT, dtokp); + if (res != RHM_IORES_SUCCESS) + return res; + + bool cont = false; + if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT + { + if (dtokp->wstate() == data_tok::ABORT_PART) + cont = true; + else + { + std::ostringstream oss; + oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "abort"); + } + } + + u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); + _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi()); + if (!cont) + { + dtokp->set_rid(rid); + dtokp->set_dequeue_rid(0); + dtokp->set_xid(xid_ptr, xid_len); + dtokp->set_dblocks_written(0); // Reset dblks_written from previous op + _abort_busy = true; + } + bool done = false; + while (!done) + { + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + u_int32_t data_offs_dblks = dtokp->dblocks_written(); + u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks, + (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + + // Remember fid which contains the record header in case record is split over several files + if (data_offs_dblks == 0) + dtokp->set_fid(_wrfc.index()); + _pg_offset_dblks += ret; + _cached_offset_dblks += ret; + dtokp->incr_dblocks_written(ret); + dtokp->incr_pg_cnt(); + _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp); + + // Is the encoding of this record complete? + if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks()) + { + dtokp->set_wstate(data_tok::ABORT_SUBM); + + // Delete this txn from tmap, unlock any locked records in emap + std::string xid((const char*)xid_ptr, xid_len); + txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) + { + if (!itr->_enq_flag) + _emap.unlock(itr->_drid); // ignore rid not found error + if (itr->_enq_flag) + _wrfc.decr_enqcnt(itr->_pfid); + } + std::pair::iterator, bool> res = _txn_pending_set.insert(xid); + if (!res.second) + { + std::ostringstream oss; + oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\""; + throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "abort"); + } + + done = true; + } + else + dtokp->set_wstate(data_tok::ABORT_PART); + + file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); + flush_check(res, cont, done); + } + if (dtokp->wstate() >= data_tok::ABORT_SUBM) + _abort_busy = false; + return res; +} + +iores +wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +{ + // commit and abort MUST have a valid xid + assert(xid_ptr != 0 && xid_len > 0); + + if (_enq_busy || _deq_busy || _abort_busy) + return RHM_IORES_BUSY; + + iores res = pre_write_check(WMGR_COMMIT, dtokp); + if (res != RHM_IORES_SUCCESS) + return res; + + bool cont = false; + if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT + { + if (dtokp->wstate() == data_tok::COMMIT_PART) + cont = true; + else + { + std::ostringstream oss; + oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "commit"); + } + } + + u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid(); + _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi()); + if (!cont) + { + dtokp->set_rid(rid); + dtokp->set_dequeue_rid(0); + dtokp->set_xid(xid_ptr, xid_len); + dtokp->set_dblocks_written(0); // Reset dblks_written from previous op + _commit_busy = true; + } + bool done = false; + while (!done) + { + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + u_int32_t data_offs_dblks = dtokp->dblocks_written(); + u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks, + (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + + // Remember fid which contains the record header in case record is split over several files + if (data_offs_dblks == 0) + dtokp->set_fid(_wrfc.index()); + _pg_offset_dblks += ret; + _cached_offset_dblks += ret; + dtokp->incr_dblocks_written(ret); + dtokp->incr_pg_cnt(); + _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp); + + // Is the encoding of this record complete? + if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks()) + { + dtokp->set_wstate(data_tok::COMMIT_SUBM); + + // Delete this txn from tmap, process records into emap + std::string xid((const char*)xid_ptr, xid_len); + txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) + { + if (itr->_enq_flag) // txn enqueue + { + if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail + { + // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. + std::ostringstream oss; + oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid; + throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit"); + } + } + else // txn dequeue + { + int16_t fid = _emap.get_remove_pfid(itr->_drid, true); + if (fid < enq_map::EMAP_OK) // fail + { + if (fid == enq_map::EMAP_RID_NOT_FOUND) + { + std::ostringstream oss; + oss << std::hex << "rid=0x" << rid; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); + } + if (fid == enq_map::EMAP_LOCKED) + { + std::ostringstream oss; + oss << std::hex << "rid=0x" << rid; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); + } + } + _wrfc.decr_enqcnt(fid); + } + } + std::pair::iterator, bool> res = _txn_pending_set.insert(xid); + if (!res.second) + { + std::ostringstream oss; + oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\""; + throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit"); + } + + done = true; + } + else + dtokp->set_wstate(data_tok::COMMIT_PART); + + file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); + flush_check(res, cont, done); + } + if (dtokp->wstate() >= data_tok::COMMIT_SUBM) + _commit_busy = false; + return res; +} + +void +wmgr::file_header_check(const u_int64_t rid, const bool cont, const u_int32_t rec_dblks_rem) +{ + // Has the file header been written (i.e. write pointers still at 0)? + if (_wrfc.is_void()) + { + bool file_fit = rec_dblks_rem <= _jfsize_dblks; + bool file_full = rec_dblks_rem == _jfsize_dblks; + std::size_t fro = 0; + if (cont) + { + if (file_fit && !file_full) + fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE; + } + else + fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; + write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); + } +} + +void +wmgr::flush_check(iores& res, bool& cont, bool& done) +{ + // Is page is full, flush + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + { + res = write_flush(); + assert(res == RHM_IORES_SUCCESS); + + if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done) + { + res = RHM_IORES_PAGE_AIOWAIT; + done = true; + } + + // If file is full, rotate to next file + if (_pg_cntr >= _jfsize_pgs) + { + iores rfres = rotate_file(); + if (rfres != RHM_IORES_SUCCESS) + res = rfres; + if (!done) + { + if (rfres == RHM_IORES_SUCCESS) + cont = true; + else + done = true; + } + } + } +} + +iores +wmgr::flush() +{ + iores res = write_flush(); + if (_pg_cntr >= _jfsize_pgs) + { + iores rfres = rotate_file(); + if (rfres != RHM_IORES_SUCCESS) + res = rfres; + } + return res; +} + +iores +wmgr::write_flush() +{ + iores res = RHM_IORES_SUCCESS; + // Don't bother flushing an empty page or one that is still in state AIO_PENDING + if (_cached_offset_dblks) + { + if (_page_cb_arr[_pg_index]._state == AIO_PENDING) + res = RHM_IORES_PAGE_AIOWAIT; + else + { + if (_page_cb_arr[_pg_index]._state != IN_USE) + { + std::ostringstream oss; + oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str(); + throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", + "write_flush"); + } + + // Send current page using AIO + + // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") + // if necessary. + dblk_roundup(); + + std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE; + aio_cb* aiocbp = &_aio_cb_arr[_pg_index]; + aio::prep_pwrite_2(aiocbp, _wrfc.fh(), + (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE, + _wrfc.subm_offs()); + page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb) + pcbp->_wdblks = _cached_offset_dblks; + pcbp->_wfh = _wrfc.file_controller(); + if (aio::submit(_ioctx, 1, &aiocbp) < 0) + throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush"); + _wrfc.add_subm_cnt_dblks(_cached_offset_dblks); + _wrfc.incr_aio_cnt(); + _aio_evt_rem++; + _cached_offset_dblks = 0; + _jc->instr_incr_outstanding_aio_cnt(); + + rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd + if (_page_cb_arr[_pg_index]._state == UNUSED) + _page_cb_arr[_pg_index]._state = IN_USE; + } + } + get_events(UNUSED, 0); + if (_page_cb_arr[_pg_index]._state == UNUSED) + _page_cb_arr[_pg_index]._state = IN_USE; + return res; +} + +iores +wmgr::rotate_file() +{ + _pg_cntr = 0; + iores res = _wrfc.rotate(); + _jc->chk_wr_frot(); + return res; +} + +int32_t +wmgr::get_events(page_state state, timespec* const timeout, bool flush) +{ + if (_aio_evt_rem == 0) // no events to get + return 0; + + int ret = 0; + if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0) + { + if (ret == -EINTR) // Interrupted by signal + return 0; + std::ostringstream oss; + oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")"; + throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events"); + } + + if (ret == 0 && timeout) + return jerrno::AIO_TIMEOUT; + + int32_t tot_data_toks = 0; + for (int i=0; idata); // This page control block (pcb) + long aioret = (long)_aio_event_arr[i].res; + if (aioret < 0) + { + std::ostringstream oss; + oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") ["; + if (pcbp) + oss << "pg=" << pcbp->_index; + else + { + file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf; + oss << "fid=" << fhp->_pfid; + } + oss << " size=" << aiocbp->u.c.nbytes; + oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]"; + throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events"); + } + if (pcbp) // Page writes have pcb + { + u_int32_t s = pcbp->_pdtokl->size(); + std::vector dtokl; + dtokl.reserve(s); + for (u_int32_t k=0; k_pdtokl->at(k); + if (dtokp->decr_pg_cnt() == 0) + { + std::set::iterator it; + switch (dtokp->wstate()) + { + case data_tok::ENQ_SUBM: + dtokl.push_back(dtokp); + tot_data_toks++; + dtokp->set_wstate(data_tok::ENQ); + if (dtokp->has_xid()) + // Ignoring return value here. A non-zero return can signify that the transaction + // has committed or aborted, and which was completed prior to the aio returning. + _tmap.set_aio_compl(dtokp->xid(), dtokp->rid()); + break; + case data_tok::DEQ_SUBM: + dtokl.push_back(dtokp); + tot_data_toks++; + dtokp->set_wstate(data_tok::DEQ); + if (dtokp->has_xid()) + // Ignoring return value - see note above. + _tmap.set_aio_compl(dtokp->xid(), dtokp->rid()); + break; + case data_tok::ABORT_SUBM: + dtokl.push_back(dtokp); + tot_data_toks++; + dtokp->set_wstate(data_tok::ABORTED); + it = _txn_pending_set.find(dtokp->xid()); + if (it == _txn_pending_set.end()) + { + std::ostringstream oss; + oss << std::hex << "_txn_pending_set: abort xid=\""; + oss << dtokp->xid() << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", + "get_events"); + } + _txn_pending_set.erase(it); + break; + case data_tok::COMMIT_SUBM: + dtokl.push_back(dtokp); + tot_data_toks++; + dtokp->set_wstate(data_tok::COMMITTED); + it = _txn_pending_set.find(dtokp->xid()); + if (it == _txn_pending_set.end()) + { + std::ostringstream oss; + oss << std::hex << "_txn_pending_set: commit xid=\""; + oss << dtokp->xid() << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", + "get_events"); + } + _txn_pending_set.erase(it); + break; + case data_tok::ENQ_PART: + case data_tok::DEQ_PART: + case data_tok::ABORT_PART: + case data_tok::COMMIT_PART: + // ignore these + break; + default: + // throw for anything else + std::ostringstream oss; + oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr", + "get_events"); + } // switch + } // if + } // for + + // Increment the completed write offset + // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count. + // Use stored pointer to fcntl in the pcb instead. + pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks); + pcbp->_wfh->decr_aio_cnt(); + _jc->instr_decr_outstanding_aio_cnt(); + + // Clean up this pcb's data_tok list + pcbp->_pdtokl->clear(); + pcbp->_state = state; + + // Perform AIO return callback + if (_cbp && tot_data_toks) + _cbp->wr_aio_cb(dtokl); + } + else // File header writes have no pcb + { + // get lfid from original file header record, update info for that lfid + file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf; + u_int32_t lfid = fhp->_lfid; + fcntl* fcntlp = _jc->get_fcntlp(lfid); + fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE); + fcntlp->decr_aio_cnt(); + fcntlp->set_wr_fhdr_aio_outstanding(false); + } + } + + return tot_data_toks; +} + +bool +wmgr::is_txn_synced(const std::string& xid) +{ + // Ignore xid not found error here + if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED) + return false; + // Check for outstanding commit/aborts + std::set::iterator it = _txn_pending_set.find(xid); + return it == _txn_pending_set.end(); +} + +void +wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages) +{ + pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); + wmgr::clean(); + _num_jfiles = _jc->num_jfiles(); + if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles)) + { + wmgr::clean(); + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize; + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "wmgr", "initialize"); + } + _fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*)); + MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize"); + _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles); + MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize"); + std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles); + for (u_int16_t i=0; i<_num_jfiles; i++) + { + _fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i); + _fhdr_aio_cb_arr[i] = new aio_cb; + } + _page_cb_arr[0]._state = IN_USE; + _ddtokl.clear(); + _cached_offset_dblks = 0; + _enq_busy = false; +} + +iores +wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, + const std::size_t xidsize, const std::size_t dsize, const bool external + ) const +{ + // Check status of current file + if (!_wrfc.is_wr_reset()) + { + if (!_wrfc.wr_reset()) + return RHM_IORES_FULL; + } + + // Check status of current page is ok for writing + if (_page_cb_arr[_pg_index]._state != IN_USE) + { + if (_page_cb_arr[_pg_index]._state == UNUSED) + _page_cb_arr[_pg_index]._state = IN_USE; + else if (_page_cb_arr[_pg_index]._state == AIO_PENDING) + return RHM_IORES_PAGE_AIOWAIT; + else + { + std::ostringstream oss; + oss << "jrnl=" << _jc->id() << " op=" << _op_str[op]; + oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str(); + throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check"); + } + } + + // operation-specific checks + switch (op) + { + case WMGR_ENQUEUE: + { + // Check for enqueue reaching cutoff threshold + u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize, + external)); + if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks)) + return RHM_IORES_ENQCAPTHRESH; + if (!dtokp->is_writable()) + { + std::ostringstream oss; + oss << "jrnl=" << _jc->id() << " op=" << _op_str[op]; + oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr", + "pre_write_check"); + } + } + break; + case WMGR_DEQUEUE: + if (!dtokp->is_dequeueable()) + { + std::ostringstream oss; + oss << "jrnl=" << _jc->id() << " op=" << _op_str[op]; + oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str(); + throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr", + "pre_write_check"); + } + break; + case WMGR_ABORT: + break; + case WMGR_COMMIT: + break; + } + + return RHM_IORES_SUCCESS; +} + +void +wmgr::dequeue_check(const std::string& xid, const u_int64_t drid) +{ + // First check emap + bool found = false; + int16_t fid = _emap.get_pfid(drid); + if (fid < enq_map::EMAP_OK) // fail + { + if (fid == enq_map::EMAP_RID_NOT_FOUND) + { + if (xid.size()) + found = _tmap.data_exists(xid, drid); + } + else if (fid == enq_map::EMAP_LOCKED) + { + std::ostringstream oss; + oss << std::hex << "drid=0x" << drid; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check"); + } + } + else + found = true; + if (!found) + { + std::ostringstream oss; + oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid; + throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check"); + } +} + +void +wmgr::dblk_roundup() +{ + const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC; + u_int32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE; + while (_cached_offset_dblks < wdblks) + { + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); + std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); +#ifdef RHM_CLEAN + std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic)); +#endif + _pg_offset_dblks++; + _cached_offset_dblks++; + } +} + +void +wmgr::write_fhdr(u_int64_t rid, u_int16_t fid, u_int16_t lid, std::size_t fro) +{ + file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, lid, fro, _wrfc.owi(), true); + std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr)); +#ifdef RHM_CLEAN + std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr)); +#endif + aio_cb* aiocbp = _fhdr_aio_cb_arr[fid]; + aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0); + if (aio::submit(_ioctx, 1, &aiocbp) < 0) + throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr"); + _aio_evt_rem++; + _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); + _wrfc.incr_aio_cnt(); + _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true); +} + +void +wmgr::rotate_page() +{ + _page_cb_arr[_pg_index]._state = AIO_PENDING; + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + { + _pg_offset_dblks = 0; + _pg_cntr++; + } + if (++_pg_index >= _cache_num_pages) + _pg_index = 0; +} + +void +wmgr::clean() +{ + std::free(_fhdr_base_ptr); + _fhdr_base_ptr = 0; + + std::free(_fhdr_ptr_arr); + _fhdr_ptr_arr = 0; + + if (_fhdr_aio_cb_arr) + { + for (u_int32_t i=0; i<_num_jfiles; i++) + delete _fhdr_aio_cb_arr[i]; + std::free(_fhdr_aio_cb_arr); + _fhdr_aio_cb_arr = 0; + } +} + +const std::string +wmgr::status_str() const +{ + std::ostringstream oss; + oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr; + oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem; + oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F"); + oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F"); + oss << " ps=["; + for (int i=0; i<_cache_num_pages; i++) + { + switch (_page_cb_arr[i]._state) + { + case UNUSED: oss << "-"; break; + case IN_USE: oss << "U"; break; + case AIO_PENDING: oss << "A"; break; + case AIO_COMPLETE: oss << "*"; break; + default: oss << _page_cb_arr[i]._state; + } + } + oss << "] " << _wrfc.status_str(); + return oss.str(); +} + +// static + +const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"}; + +} // namespace journal +} // namespace mrg Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h?rev=1501895&view=auto ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h (added) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h Wed Jul 10 18:20:19 2013 @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file wmgr.h + * + * Qpid asynchronous store plugin library + * + * File containing code for class mrg::journal::wmgr (write manager). See + * class documentation for details. + * + * \author Kim van der Riet + */ + +#ifndef QPID_LEGACYSTORE_JRNL_WMGR_H +#define QPID_LEGACYSTORE_JRNL_WMGR_H + +namespace mrg +{ +namespace journal +{ +class wmgr; +} +} + +#include +#include "qpid/legacystore/jrnl/enums.h" +#include "qpid/legacystore/jrnl/pmgr.h" +#include "qpid/legacystore/jrnl/wrfc.h" +#include + +namespace mrg +{ +namespace journal +{ + + /** + * \brief Class for managing a write page cache of arbitrary size and number of pages. + * + * The write page cache works on the principle of caching the write data within a page until + * that page is either full or flushed; this initiates a single AIO write operation to store + * the data on disk. + * + * The maximum disk throughput is achieved by keeping the write operations of uniform size. + * Waiting for a page cache to fill achieves this; and in high data volume/throughput situations + * achieves the optimal disk throughput. Calling flush() forces a write of the current page cache + * no matter how full it is, and disrupts the uniformity of the write operations. This should + * normally only be done if throughput drops and there is a danger of a page of unwritten data + * waiting around for excessive time. + * + * The usual tradeoff between data storage latency and throughput performance applies. + */ + class wmgr : public pmgr + { + private: + wrfc& _wrfc; ///< Ref to write rotating file controller + u_int32_t _max_dtokpp; ///< Max data writes per page + u_int32_t _max_io_wait_us; ///< Max wait in microseconds till submit + void* _fhdr_base_ptr; ///< Base pointer to file header memory + void** _fhdr_ptr_arr; ///< Array of pointers to file headers memory + aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes + u_int32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) + std::deque _ddtokl; ///< Deferred dequeue data_tok list + u_int32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!) + u_int32_t _jfsize_pgs; ///< Journal file size in cache pages + u_int16_t _num_jfiles; ///< Number of files used in iocb mallocs + + // TODO: Convert _enq_busy etc into a proper threadsafe lock + // TODO: Convert to enum? Are these encodes mutually exclusive? + bool _enq_busy; ///< Flag true if enqueue is in progress + bool _deq_busy; ///< Flag true if dequeue is in progress + bool _abort_busy; ///< Flag true if abort is in progress + bool _commit_busy; ///< Flag true if commit is in progress + + enum _op_type { WMGR_ENQUEUE = 0, WMGR_DEQUEUE, WMGR_ABORT, WMGR_COMMIT }; + static const char* _op_str[]; + + enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding + deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding + txn_rec _txn_rec; ///< Transaction record used for encoding/decoding + std::set _txn_pending_set; ///< Set containing xids of pending commits/aborts + + public: + wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc); + wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, const u_int32_t max_dtokpp, + const u_int32_t max_iowait_us); + virtual ~wmgr(); + + void initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, + const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp, + const u_int32_t max_iowait_us, std::size_t eo = 0); + iores enqueue(const void* const data_buff, const std::size_t tot_data_len, + const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr, + const std::size_t xid_len, const bool transient, const bool external); + iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, + const bool txn_coml_commit); + iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len); + iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len); + iores flush(); + int32_t get_events(page_state state, timespec* const timeout, bool flush = false); + bool is_txn_synced(const std::string& xid); + inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; } + inline bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; } + inline u_int32_t unflushed_dblks() { return _cached_offset_dblks; } + + // Debug aid + const std::string status_str() const; + + private: + void initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, + const u_int16_t wcache_num_pages); + iores pre_write_check(const _op_type op, const data_tok* const dtokp, + const std::size_t xidsize = 0, const std::size_t dsize = 0, const bool external = false) + const; + void dequeue_check(const std::string& xid, const u_int64_t drid); + void file_header_check(const u_int64_t rid, const bool cont, const u_int32_t rec_dblks_rem); + void flush_check(iores& res, bool& cont, bool& done); + iores write_flush(); + iores rotate_file(); + void dblk_roundup(); + void write_fhdr(u_int64_t rid, u_int16_t fid, u_int16_t lid, std::size_t fro); + void rotate_page(); + void clean(); + }; + +} // namespace journal +} // namespace mrg + +#endif // ifndef QPID_LEGACYSTORE_JRNL_WMGR_H Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.h ------------------------------------------------------------------------------ svn:keywords = Author Date Id Rev URL Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp?rev=1501895&view=auto ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp (added) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp Wed Jul 10 18:20:19 2013 @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file wrfc.cpp + * + * Qpid asynchronous store plugin library + * + * File containing code for class mrg::journal::wrfc (rotating + * file controller). See comments in file wrfc.h for details. + * + * \author Kim van der Riet + */ + +#include "qpid/legacystore/jrnl/wrfc.h" + +#include +#include "qpid/legacystore/jrnl/jerrno.h" +#include "qpid/legacystore/jrnl/jexception.h" + +namespace mrg +{ +namespace journal +{ + +wrfc::wrfc(const lpmgr* lpmp): + rfc(lpmp), + _fsize_sblks(0), + _fsize_dblks(0), + _enq_cap_offs_dblks(0), + _rid(0), + _reset_ok(false), + _owi(false), + _frot(true) +{} + +wrfc::~wrfc() +{} + +void +wrfc::initialize(const u_int32_t fsize_sblks, rcvdat* rdp) +{ + if (rdp) + { + _fc_index = rdp->_lfid; + _curr_fc = _lpmp->get_fcntlp(_fc_index); + _curr_fc->wr_reset(rdp); + _rid = rdp->_h_rid + 1; + _reset_ok = true; + _owi = rdp->_owi; + _frot = rdp->_frot; + if (rdp->_lffull) + rotate(); + } + else + { + rfc::initialize(); + rfc::set_findex(0); + _rid = 0ULL; + _reset_ok = false; + } + _fsize_sblks = fsize_sblks; + _fsize_dblks = fsize_sblks * JRNL_SBLK_SIZE; + _enq_cap_offs_dblks = (u_int32_t)std::ceil(_fsize_dblks * _lpmp->num_jfiles() * (100.0 - JRNL_ENQ_THRESHOLD) / 100); + // Check the offset is at least one file; if not, make it so + if (_enq_cap_offs_dblks < _fsize_dblks) + _enq_cap_offs_dblks = _fsize_dblks; +} + +iores wrfc::rotate() +{ + if (!_lpmp->num_jfiles()) + throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate"); + _fc_index++; + if (_fc_index == _lpmp->num_jfiles()) + { + _fc_index = 0; + _owi = !_owi; + _frot = false; + } + _curr_fc = _lpmp->get_fcntlp(_fc_index); + if (_curr_fc->aio_cnt()) + return RHM_IORES_FILE_AIOWAIT; + if (!wr_reset()) //Checks if file is still in use (ie not fully dequeued yet) + return RHM_IORES_FULL; + return RHM_IORES_SUCCESS; +} + +u_int16_t wrfc::earliest_index() const +{ + if (_frot) + return 0; + u_int16_t next_index = _fc_index + 1; + if (next_index >= _lpmp->num_jfiles()) + next_index = 0; + return next_index; +} + +bool +wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const +{ + u_int32_t subm_dblks = subm_cnt_dblks(); // includes file hdr if > 0 + // This compensates for new files which don't have their file headers written yet, + // as file header space cannot be included in this calculation. + if (subm_dblks != 0) + subm_dblks -= 4; + u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks; + u_int16_t findex = _fc_index; + fcntl* fcp = _curr_fc; + bool in_use = false; + while (fwd_dblks && !(findex != _fc_index && fcp->enqcnt())) + { + fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks; + if (fwd_dblks) + { + if (++findex == _lpmp->num_jfiles()) + findex = 0; + fcp = _lpmp->get_fcntlp(findex); + } + in_use |= fcp->enqcnt() > 0; + } + // Return true if threshold exceeded + return findex != _fc_index && in_use; +} + +bool wrfc::wr_reset() +{ + _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains enqueued recs) + return _reset_ok; +} + +// TODO: update this to reflect all status data +std::string +wrfc::status_str() const +{ + std::ostringstream oss; + oss << "wrfc: " << rfc::status_str(); + if (is_active()) + oss << " fcntl[" << _fc_index << "]: " << _curr_fc->status_str(); + return oss.str(); +} + +} // namespace journal +} // namespace mrg Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h?rev=1501895&view=auto ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h (added) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h Wed Jul 10 18:20:19 2013 @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file wrfc.h + * + * Qpid asynchronous store plugin library + * + * File containing code for class mrg::journal::wrfc (write rotating + * file controller). See class documentation for details. + * + * \author Kim van der Riet + */ + +#ifndef QPID_LEGACYSTORE_JRNL_WRFC_H +#define QPID_LEGACYSTORE_JRNL_WRFC_H + +namespace mrg +{ +namespace journal +{ +class wrfc; +} +} + +#include +#include "qpid/legacystore/jrnl/enums.h" +#include "qpid/legacystore/jrnl/rrfc.h" + +namespace mrg +{ +namespace journal +{ + + /** + * \class wrfc + * \brief Class to handle write management of a journal rotating file controller. + */ + class wrfc : public rfc + { + private: + u_int32_t _fsize_sblks; ///< Size of journal files in sblks + u_int32_t _fsize_dblks; ///< Size of journal files in dblks + u_int32_t _enq_cap_offs_dblks; ///< Enqueue capacity offset + u_int64_t _rid; ///< Master counter for record ID (rid) + bool _reset_ok; ///< Flag set when reset succeeds + bool _owi; ///< Overwrite indicator + bool _frot; ///< Flag is true for first rotation, false otherwise + + public: + wrfc(const lpmgr* lpmp); + virtual ~wrfc(); + + /** + * \brief Initialize the controller. + * \param fsize_sblks Size of each journal file in sblks. + * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to 0 (NULL). + */ + using rfc::initialize; + void initialize(const u_int32_t fsize_sblks, rcvdat* rdp = 0); + + /** + * \brief Rotate active file controller to next file in rotating file group. + * \exception jerrno::JERR__NINIT if called before calling initialize(). + */ + iores rotate(); + + /** + * \brief Returns the index of the earliest complete file within the rotating + * file group. Unwritten files are excluded. The currently active file is + * excluded unless it is the only written file. + */ + u_int16_t earliest_index() const; + + /** + * \brief Determines if a proposed write would cause the enqueue threshold to be exceeded. + * + * The following routine finds whether the next write will take the write pointer to beyond the + * enqueue limit threshold. The following illustrates how this is achieved. + *
+        * Current file index: 4                         +---+----------+
+        * X's mark still-enqueued records               |msg| 1-thresh |
+        * msg = current msg size + unwritten cache      +---+----------+
+        * thresh = JRNL_ENQ_THRESHOLD as a fraction     ^              V
+        *            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+        * file num ->|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |
+        * enq recs ->| X  XX |XX XXX |XX XXXX|XXXXXXX|XX     |       |       |     X |
+        *            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+        *                                               ^        ^       ^
+        *                                  subm_dblks --+        |       |
+        *                                                      These files must be free of enqueues
+        *                                                      If not, return true.
+        * 
+ * \param enq_dsize_dblks Proposed size of write in dblocks + */ + bool enq_threshold(const u_int32_t enq_dsize_dblks) const; + + inline u_int64_t rid() const { return _rid; } + inline u_int64_t get_incr_rid() { return _rid++; } + bool wr_reset(); + inline bool is_wr_reset() const { return _reset_ok; } + inline bool owi() const { return _owi; } + inline bool frot() const { return _frot; } + + // Convenience access methods to current file controller + + inline int fh() const { return _curr_fc->wr_fh(); } + + inline u_int32_t subm_cnt_dblks() const { return _curr_fc->wr_subm_cnt_dblks(); } + inline std::size_t subm_offs() const { return _curr_fc->wr_subm_offs(); } + inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_subm_cnt_dblks(a); } + + inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->wr_cmpl_cnt_dblks(); } + inline std::size_t cmpl_offs() const { return _curr_fc->wr_cmpl_offs(); } + inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_cmpl_cnt_dblks(a); } + + inline u_int16_t aio_cnt() const { return _curr_fc->aio_cnt(); } + inline u_int16_t incr_aio_cnt() { return _curr_fc->incr_aio_cnt(); } + inline u_int16_t decr_aio_cnt() { return _curr_fc->decr_aio_cnt(); } + + inline bool is_void() const { return _curr_fc->wr_void(); } + inline bool is_empty() const { return _curr_fc->wr_empty(); } + inline u_int32_t remaining_dblks() const { return _curr_fc->wr_remaining_dblks(); } + inline bool is_full() const { return _curr_fc->is_wr_full(); }; + inline bool is_compl() const { return _curr_fc->is_wr_compl(); }; + inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->wr_aio_outstanding_dblks(); } + inline bool file_rotate() const { return _curr_fc->wr_file_rotate(); } + + // Debug aid + std::string status_str() const; + }; + +} // namespace journal +} // namespace mrg + +#endif // ifndef QPID_LEGACYSTORE_JRNL_WRFC_H Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wrfc.h ------------------------------------------------------------------------------ svn:keywords = Author Date Id Rev URL Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml?rev=1501895&view=auto ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml (added) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml Wed Jul 10 18:20:19 2013 @@ -0,0 +1,99 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/management-schema.xml ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org