qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1538790 [1/2] - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ jrnl/ jrnl/utils/
Date Mon, 04 Nov 2013 22:15:15 GMT
Author: kpvdr
Date: Mon Nov  4 22:15:14 2013
New Revision: 1538790

URL: http://svn.apache.org/r1538790
Log:
QPID-4984: WIP. Basic enqueue/dequeue/txns work, still no EFP recycling.

Added:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
Modified:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jcntl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/jerrno.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/txn_map.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/utils/file_hdr.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp

Added: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1538790&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (added)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Mon Nov  4 22:15:14 2013
@@ -0,0 +1,36 @@
+LinearStore issues:
+
+Store:
+------
+
+1. Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start,
+   no way of discriminating old from new at boundary (used to use OWI).
+
+2. Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first.
+
+3. Checksum not implemented in record tail, not checked during read.
+
+4. Rework qpid management parameters and controls.
+
+Tests
+-----
+
+* No existing tests for linearstore:
+** Basic broker-level tests for txn and non-txn recovery
+** Store-level tests which check write boundary conditions
+** Unit tests
+** Basic performance tests
+
+Tools
+-----
+
+* Store analysis and status
+* Recovery/reading of message content
+
+Code tidy-up
+------------
+
+* Remove old comments
+* Use c++ cast templates instead of (xxx)y
+* Member names: xxx_
+* Add docs to classes

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Mon Nov  4 22:15:14 2013
@@ -63,14 +63,8 @@ JournalImpl::JournalImpl(qpid::sys::Time
                          timer(timer_),
                          _journalLogRef(journalLogRef),
                          getEventsTimerSetFlag(false),
-//                         lastReadRid(0),
                          writeActivityFlag(false),
                          flushTriggeredFlag(true),
-//                         _xidp(0),
-//                         _datap(0),
-//                         _dlen(0),
-//                         _dtok(),
-//                         _external(false),
                          deleteCallback(onDelete)
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -97,7 +91,6 @@ JournalImpl::~JournalImpl()
 	}
     getEventsFireEventsPtr->cancel();
     inactivityFireEventPtr->cancel();
-//    free_read_buffers();
 
     if (_mgmtObject.get() != 0) {
         _mgmtObject->resourceDestroy();
@@ -149,7 +142,7 @@ JournalImpl::initialize(qpid::qls_jrnl::
 //    oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
 //    oss << " wcache_num_pages=" << wcache_num_pages;
 //    QLS_LOG2(debug, _jid, oss.str());
-    jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
+    jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp);
 //    QLS_LOG2(debug, _jid, "Initialization complete");
     // TODO: replace for linearstore: _lpmgr
 /*
@@ -183,7 +176,6 @@ JournalImpl::recover(/*const uint16_t nu
                      uint64_t queue_id)
 {
     std::ostringstream oss1;
-//    oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
     oss1 << "Recover;";
     oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
     oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
@@ -210,11 +202,9 @@ JournalImpl::recover(/*const uint16_t nu
             prep_xid_list.push_back(i->xid);
         }
 
-        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
-                cbp, &prep_xid_list, highest_rid);
+        jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid);
     } else {
-        jcntl::recover(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/efpm.get(), wcache_num_pages, wcache_pgsize_sblks,
-                cbp, 0, highest_rid);
+        jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid);
     }
 
     // Populate PreparedTransaction lists from _tmap
@@ -223,10 +213,10 @@ JournalImpl::recover(/*const uint16_t nu
         for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
             txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
             for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
-                if (tdl_itr->_enq_flag) { // enqueue op
-                    i->enqueues->add(queue_id, tdl_itr->_rid);
+                if (tdl_itr->enq_flag_) { // enqueue op
+                    i->enqueues->add(queue_id, tdl_itr->rid_);
                 } else { // dequeue op
-                    i->dequeues->add(queue_id, tdl_itr->_drid);
+                    i->dequeues->add(queue_id, tdl_itr->drid_);
                 }
             }
         }
@@ -260,102 +250,6 @@ JournalImpl::recover_complete()
 */
 }
 
-//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
-//#define AIO_SLEEP_TIME_US   10 // 0.01 ms
-// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
-// Throw exception for all errors.
-/*
-bool
-JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
-{
-    qpid::sys::Mutex::ScopedLock sl(_read_lock);
-    if (_dtok.rid() != rid)
-    {
-        // Free any previous msg
-        free_read_buffers();
-
-        // Last read encountered out-of-order rids, check if this rid is in that list
-        bool oooFlag = false;
-        for (std::vector<uint64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
-            if (*i == rid) {
-                oooFlag = true;
-            }
-        }
-
-        // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering
-        // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
-        // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
-        // combination of lid/offset.
-        // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
-        if (oooFlag || rid < lastReadRid) {
-            _rmgr.invalidate();
-            oooRidList.clear();
-        }
-        _dlen = 0;
-        _dtok.reset();
-        _dtok.set_wstate(DataTokenImpl::ENQ);
-        _dtok.set_rid(0);
-        _external = false;
-        size_t xlen = 0;
-        bool transient = false;
-        bool done = false;
-        bool rid_found = false;
-        while (!done) {
-            iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
-            switch (res) {
-                case qpid::qls_jrnl::RHM_IORES_SUCCESS:
-                    if (_dtok.rid() != rid) {
-                        // Check if this is an out-of-order rid that may impact next read
-                        if (_dtok.rid() > rid)
-                            oooRidList.push_back(_dtok.rid());
-                        free_read_buffers();
-                        // Reset data token for next read
-                        _dlen = 0;
-                        _dtok.reset();
-                        _dtok.set_wstate(DataTokenImpl::ENQ);
-                        _dtok.set_rid(0);
-                    } else {
-                        rid_found = _dtok.rid() == rid;
-                        lastReadRid = rid;
-                        done = true;
-                    }
-                    break;
-                case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
-                    if (get_wr_events(&_aio_cmpl_timeout) == qpid::qls_jrnl::jerrno::AIO_TIMEOUT) {
-                        std::stringstream ss;
-                        ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
-                        ss << "; timed out waiting for page to be processed.";
-                        throw jexception(qpid::qls_jrnl::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
-                            "loadMsgContent");
-                    }
-                    break;
-                default:
-                    std::stringstream ss;
-                    ss << "read_data_record() returned " << qpid::qls_jrnl::iores_str(res);
-                    throw jexception(qpid::qls_jrnl::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
-                        "loadMsgContent");
-            }
-        }
-        if (!rid_found) {
-            std::stringstream ss;
-            ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
-            ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
-            ss << " (" << _dtok.rid() << ")";
-            throw jexception(qpid::qls_jrnl::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
-        }
-    }
-
-    if (_external) return false;
-
-    uint32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(uint32_t)).getLong() + sizeof(uint32_t);
-    if (hdr_offs + offset + length > _dlen) {
-        data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
-    } else {
-        data.append((const char*)_datap + hdr_offs + offset, length);
-    }
-    return true;
-}
-*/
 
 void
 JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -498,29 +392,6 @@ JournalImpl::flush(const bool block_till
     return res;
 }
 
-/*
-void
-JournalImpl::log(qpid::qls_jrnl::log_level ll, const std::string& log_stmt) const
-{
-    log(ll, log_stmt.c_str());
-}
-
-void
-JournalImpl::log(qpid::qls_jrnl::log_level ll, const char* const log_stmt) const
-{
-    switch (ll)
-    {
-        case LOG_TRACE:  QPID_LOG(trace, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_DEBUG:  QPID_LOG(debug, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_INFO:  QPID_LOG(info, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_NOTICE:  QPID_LOG(notice, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_WARN:  QPID_LOG(warning, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_ERROR: QPID_LOG(error, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-        case LOG_CRITICAL: QPID_LOG(critical, "QLS Journal \"" << _jid << "\": " << log_stmt); break;
-    }
-}
-*/
-
 void
 JournalImpl::getEventsFire()
 {
@@ -581,21 +452,6 @@ void
 JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
 {}
 
-/*
-void
-JournalImpl::free_read_buffers()
-{
-    if (_xidp) {
-        ::free(_xidp);
-        _xidp = 0;
-        _datap = 0;
-    } else if (_datap) {
-        ::free(_datap);
-        _datap = 0;
-    }
-}
-*/
-
 void
 JournalImpl::createStore() {
 
@@ -609,17 +465,6 @@ JournalImpl::handleIoResult(const iores 
     {
         case qpid::qls_jrnl::RHM_IORES_SUCCESS:
             return;
-/*
-        case qpid::qls_jrnl::RHM_IORES_FULL:
-            {
-                std::ostringstream oss;
-                oss << "Journal full on queue \"" << _jid << "\".";
-                QLS_LOG2(critical, _jid, "Journal full.");
-                if (_agent != 0)
-                    _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
-                THROW_STORE_FULL_EXCEPTION(oss.str());
-            }
-*/
         default:
             {
                 std::ostringstream oss;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalImpl.h Mon Nov  4 22:15:14 2013
@@ -140,11 +140,6 @@ class JournalImpl : public qpid::broker:
 
     void recover_complete();
 
-    // Temporary fn to read and save last msg read from journal so it can be assigned
-    // in chunks. To be replaced when coding to do this direct from the journal is ready.
-    // Returns true if the record is extern, false if local.
-//    bool loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset = 0);
-
     // Overrides for write inactivity timer
     void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
                              const size_t this_data_len, qpid::qls_jrnl::data_tok* dtokp,
@@ -191,7 +186,6 @@ class JournalImpl : public qpid::broker:
     void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
 
   protected:
-//    void free_read_buffers();
     void createStore();
 
     inline void setGetEventTimer()
@@ -227,15 +221,12 @@ class TplJournalImpl : public JournalImp
 
     virtual ~TplJournalImpl() {}
 
-/*
     // Special version of read_data_record that ignores transactions - needed when reading the TPL
     inline qpid::qls_jrnl::iores read_data_record(void** const datapp, std::size_t& dsize,
                                                 void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
                                                 qpid::qls_jrnl::data_tok* const dtokp) {
         return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
     }
-    inline void read_reset() { _rmgr.invalidate(); }
-*/
 }; // class TplJournalImpl
 
 } // namespace msgstore

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/JournalLogImpl.h Mon Nov  4 22:15:14 2013
@@ -26,7 +26,7 @@
 #include "qpid/log/Statement.h"
 
 #define QLS_LOG(level, msg) QPID_LOG(level, "Linear Store: " << msg)
-#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \'" << queue << "\":" << msg)
+#define QLS_LOG2(level, queue, msg) QPID_LOG(level, "Linear Store: Journal \"" << queue << "\":" << msg)
 
 namespace qpid {
 namespace linearstore {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Mon Nov  4 22:15:14 2013
@@ -291,7 +291,10 @@ void MessageStoreImpl::init()
         }
     } while (!isInit);
 
-    efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog));
+    efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(),
+                                                          defaultEfpPartitionNumber,
+                                                          defaultEfpFileSize_kib,
+                                                          jrnlLog));
     efpMgr->findEfpPartitions();
 }
 
@@ -331,6 +334,9 @@ void MessageStoreImpl::truncateInit()
         dbenv->close(0);
         isInit = false;
     }
+
+    // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
+
     qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
     qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
     qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
@@ -730,6 +736,7 @@ void MessageStoreImpl::recoverQueues(Txn
             // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
             // from recovery of a store that has had its size changed externally by the resize utility.
             // If so, update the queue store settings so that QMF queries will reflect the new values.
+            // TODO: Update this for new settings, as qpid.file_count and qpid.file_size no longer apply
 /*
             const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
             qpid::framing::FieldTable::ValuePtr value;
@@ -866,8 +873,6 @@ void MessageStoreImpl::recoverMessages(T
     size_t preambleLength = sizeof(uint32_t)/*header size*/;
 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-    DataTokenImpl dtok;
-    size_t readSize = 0;
     unsigned msg_count = 0;
 
     // TODO: This optimization to skip reading if there are no enqueued messages to read
@@ -876,19 +881,20 @@ void MessageStoreImpl::recoverMessages(T
     //bool read = jc->get_enq_cnt() > 0;
     bool read = true;
 
-    void* dbuff = NULL; size_t dbuffSize = 0;
-    void* xidbuff = NULL; size_t xidbuffSize = 0;
+    void* dbuff = NULL;
+    size_t dbuffSize = 0;
+    void* xidbuff = NULL;
+    size_t xidbuffSize = 0;
     bool transientFlag = false;
     bool externalFlag = false;
-
-    dtok.set_wstate(DataTokenImpl::ENQ);
+    DataTokenImpl dtok;
+    dtok.set_wstate(DataTokenImpl::NONE);
 
     // Read the message from the Journal.
     try {
         unsigned aio_sleep_cnt = 0;
         while (read) {
             qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
-            readSize = dtok.dsize();
 
             switch (res)
             {
@@ -909,11 +915,11 @@ void MessageStoreImpl::recoverMessages(T
                 // At some future point if delivery attempts are stored, then this call would
                 // become optional depending on that information.
                 msg->setRedelivered();
-		// Reset the TTL for the recovered message
-		msg->computeExpiration(broker->getExpiryPolicy());
+                // Reset the TTL for the recovered message
+                msg->computeExpiration(broker->getExpiryPolicy());
 
                 uint32_t contentOffset = headerSize + preambleLength;
-                uint64_t contentSize = readSize - contentOffset;
+                uint64_t contentSize = dbuffSize - contentOffset;
                 if (msg->loadContent(contentSize) && !externalFlag) {
                     //now read the content
                     qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
@@ -947,8 +953,8 @@ void MessageStoreImpl::recoverMessages(T
                             bool enq = false;
                             bool deq = false;
                             for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                                if (j->_enq_flag && j->_rid == rid) enq = true;
-                                else if (!j->_enq_flag && j->_drid == rid) deq = true;
+                                if (j->enq_flag_ && j->rid_ == rid) enq = true;
+                                else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
                             }
                             if (enq && !deq && citr->second.commit_flag) {
                                 rcnt++;
@@ -962,7 +968,7 @@ void MessageStoreImpl::recoverMessages(T
                 }
 
                 dtok.reset();
-                dtok.set_wstate(DataTokenImpl::ENQ);
+                dtok.set_wstate(DataTokenImpl::NONE);
 
                 if (xidbuff)
                     ::free(xidbuff);
@@ -1065,11 +1071,11 @@ void MessageStoreImpl::readTplStore()
                     bool commitFlag = true;
 
                     for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                        if (j->_enq_flag) {
-                            rid = j->_rid;
+                        if (j->enq_flag_) {
+                            rid = j->rid_;
                             enqCnt++;
                         } else {
-                            commitFlag = j->_commit_flag;
+                            commitFlag = j->commit_flag_;
                             deqCnt++;
                         }
                     }
@@ -1104,10 +1110,9 @@ void MessageStoreImpl::readTplStore()
 void MessageStoreImpl::recoverTplStore()
 {
     QLS_LOG(info,   "*** MessageStoreImpl::recoverTplStore()");
-/*
-    if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
+    if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) {
         uint64_t thisHighestRid = 0ULL;
-        tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+        tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
         if (highestRid == 0ULL)
             highestRid = thisHighestRid;
         else if (thisHighestRid - highestRid  < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1118,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore()
 
         tplStorePtr->recover_complete(); // start journal.
     }
-*/
 }
 
 void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
@@ -1137,12 +1141,10 @@ void MessageStoreImpl::recoverLockedMapp
     }
 }
 
-void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
+void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
 {
     QLS_LOG(info,   "*** MessageStoreImpl::collectPreparedXids()");
-/*
     if (tplStorePtr->is_ready()) {
-        tplStorePtr->read_reset();
         readTplStore();
     } else {
         recoverTplStore();
@@ -1152,7 +1154,6 @@ void MessageStoreImpl::collectPreparedXi
         if (!i->second.deq_flag && i->second.tpc_flag)
             xids.insert(i->first);
     }
-*/
 }
 
 void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
@@ -1178,31 +1179,6 @@ void MessageStoreImpl::loadContent(const
                                    uint32_t /*length*/)
 {
     throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
-/*
-    checkInit();
-    uint64_t messageId (msg->getPersistenceId());
-
-    if (messageId != 0) {
-        try {
-            JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-            if (jc && jc->is_enqueued(messageId) ) {
-                if (!jc->loadMsgContent(messageId, data, length, offset)) {
-                    std::ostringstream oss;
-                    oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern";
-                    THROW_STORE_EXCEPTION(oss.str());
-                }
-            } else {
-                std::ostringstream oss;
-                oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued";
-                THROW_STORE_EXCEPTION(oss.str());
-            }
-        } catch (const qpid::qls_jrnl::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
-        }
-    } else {
-        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-    }
-*/
 }
 
 void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
@@ -1358,12 +1334,6 @@ void MessageStoreImpl::async_dequeue(qpi
     }
 }
 
-uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
-{
-/*    checkInit();*/
-    return 0;
-}
-
 void MessageStoreImpl::completed(TxnCtxt& txn_,
                                  bool commit_)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Mon Nov  4 22:15:14 2013
@@ -331,7 +331,7 @@ class MessageStoreImpl : public qpid::br
 
     void flush(const qpid::broker::PersistableQueue& queue);
 
-    uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+    inline uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) { return 0; }; // TODO: Deprecate this call
 
     void collectPreparedXids(std::set<std::string>& xids);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/AtomicCounter.h Mon Nov  4 22:15:14 2013
@@ -23,6 +23,7 @@
 #define QPID_LINEARSTORE_ATOMICCOUNTER_H_
 
 #include "qpid/linearstore/jrnl/slock.h"
+#include <string>
 
 namespace qpid {
 namespace qls_jrnl {
@@ -31,11 +32,12 @@ template <class T>
 class AtomicCounter
 {
 private:
+    std::string id_;
     T count_;
     mutable smutex countMutex;
 
 public:
-    AtomicCounter(const T& initValue = T(0)) : count_(initValue) {}
+    AtomicCounter(const std::string& id, const T& initValue) : id_(id), count_(initValue) {}
 
     virtual ~AtomicCounter() {}
 
@@ -44,6 +46,11 @@ public:
         return count_;
     }
 
+    void set(const T v) {
+        slock l(countMutex);
+        count_ = v;
+    }
+
     T increment() {
         slock l(countMutex);
         return ++count_;
@@ -57,7 +64,7 @@ public:
 
     T addLimit(const T& a, const T& limit, const uint32_t jerr) {
         slock l(countMutex);
-        if (count_ + a > limit) throw jexception(jerr, "AtomicCounter", "addLimit");
+        if (count_ + a > limit) throw jexception(jerr, id_, "AtomicCounter", "addLimit");
         count_ += a;
         return count_;
     }
@@ -70,7 +77,7 @@ public:
     T decrementLimit(const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
         slock l(countMutex);
         if (count_ < limit + 1) {
-            throw jexception(jerr, "AtomicCounter", "decrementLimit");
+            throw jexception(jerr, id_, "AtomicCounter", "decrementLimit");
         }
         return --count_;
     }
@@ -83,7 +90,7 @@ public:
 
     T subtractLimit(const T& s, const T& limit = T(0), const uint32_t jerr = jerrno::JERR__UNDERFLOW) {
         slock l(countMutex);
-        if (count_ < limit + s) throw jexception(jerr, "AtomicCounter", "subtractLimit");
+        if (count_ < limit + s) throw jexception(jerr, id_, "AtomicCounter", "subtractLimit");
         count_ -= s;
         return count_;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp Mon Nov  4 22:15:14 2013
@@ -35,6 +35,8 @@
 #include <uuid/uuid.h>
 #include <vector>
 
+//#include <iostream> // DEBUG
+
 namespace qpid {
 namespace qls_jrnl {
 
@@ -42,7 +44,7 @@ EmptyFilePool::EmptyFilePool(const std::
                              const EmptyFilePoolPartition* partitionPtr,
                              JournalLog& journalLogRef) :
                 efpDirectory_(efpDirectory),
-                efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())),
+                efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
                 partitionPtr_(partitionPtr),
                 journalLogRef_(journalLogRef)
 {}
@@ -134,6 +136,39 @@ void EmptyFilePool::returnEmptyFile(cons
     pushEmptyFile(emptyFileName);
 }
 
+//static
+std::string EmptyFilePool::dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib) {
+    std::ostringstream oss;
+    oss << efpDataSize_kib << "k";
+    return oss.str();
+}
+
+
+// static
+efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirName,
+                                                        const efpPartitionNumber_t partitionNumber) {
+    // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
+    std::string n(dirName.substr(dirName.rfind('/')+1));
+    bool valid = true;
+    for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
+        if (charNum < n.length()-1) {
+            if (!::isdigit((int)n[charNum])) {
+                valid = false;
+                break;
+            }
+        } else {
+            valid = n[charNum] == 'k';
+        }
+    }
+    efpDataSize_kib_t s = ::atol(n.c_str());
+    if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
+        std::ostringstream oss;
+        oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
+        throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
+    }
+    return s;
+}
+
 // --- protected functions ---
 
 void EmptyFilePool::createEmptyFile() {
@@ -148,7 +183,7 @@ void EmptyFilePool::createEmptyFile() {
             ofs.put('\0');
         ofs.close();
         pushEmptyFile(efpfn);
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl;
+//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
     } else {
 //std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
     }
@@ -196,7 +231,8 @@ void EmptyFilePool::resetEmptyFileHeader
         std::streampos bytesRead = fs.tellg();
         if (std::streamoff(bytesRead) == buffsize) {
             ::file_hdr_reset((::file_hdr_t*)buff);
-            ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
+            // set rest of buffer to 0
+            ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t));
             fs.seekp(0, std::fstream::beg);
             fs.write(buff, buffsize);
             std::streampos bytesWritten = fs.tellp();
@@ -224,7 +260,8 @@ bool EmptyFilePool::validateEmptyFile(co
     // Size matches pool
     efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024;
     if ((efpDataSize_kib_t)s.st_size != expectedSize) {
-        oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size;
+        oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize
+            << "; actual=" << s.st_size;
         journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
         return false;
     }
@@ -241,18 +278,19 @@ bool EmptyFilePool::validateEmptyFile(co
     fs.read((char*)buff, buffsize);
     std::streampos bytesRead = fs.tellg();
     if (std::streamoff(bytesRead) != buffsize) {
-        oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes";
+        oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read "
+            << buffsize << " bytes; read " << bytesRead << " bytes";
         journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
         fs.close();
         return false;
     }
 
     // Check file header
-    ::file_hdr_t* header(reinterpret_cast< ::file_hdr_t* >(buff));
-    const bool jrnlMagicError = header->_rhdr._magic != QLS_FILE_MAGIC;
-    const bool jrnlVersionError = header->_rhdr._version != QLS_JRNL_VERSION;
-    const bool jrnlPartitionError = header->_efp_partition != partitionPtr_->getPartitionNumber();
-    const bool jrnlFileSizeError = header->_file_size_kib != efpDataSize_kib_;
+    ::file_hdr_t* fhp = (::file_hdr_t*)buff;
+    const bool jrnlMagicError = fhp->_rhdr._magic != QLS_FILE_MAGIC;
+    const bool jrnlVersionError = fhp->_rhdr._version != QLS_JRNL_VERSION;
+    const bool jrnlPartitionError = fhp->_efp_partition != partitionPtr_->getPartitionNumber();
+    const bool jrnlFileSizeError = fhp->_data_size_kib != efpDataSize_kib_;
     if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError)
     {
         oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " <<
@@ -266,14 +304,15 @@ bool EmptyFilePool::validateEmptyFile(co
     }
 
     // Check file header is reset
-    if (!::is_file_hdr_reset(header)) {
-        ::file_hdr_reset(header);
+    if (!::is_file_hdr_reset(fhp)) {
+        ::file_hdr_reset(fhp);
         ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
         fs.seekp(0, std::fstream::beg);
         fs.write(buff, buffsize);
         std::streampos bytesWritten = fs.tellp();
         if (std::streamoff(bytesWritten) != buffsize) {
-            oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes";
+            oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write "
+                << buffsize << " bytes; wrote " << bytesWritten << " bytes";
             journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
             fs.close();
             return false;
@@ -288,31 +327,6 @@ bool EmptyFilePool::validateEmptyFile(co
 }
 
 // static
-efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName,
-                                                       const efpPartitionNumber_t partitionNumber) {
-    // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
-    std::string n(dirName.substr(dirName.rfind('/')+1));
-    bool valid = true;
-    for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
-        if (charNum < n.length()-1) {
-            if (!::isdigit((int)n[charNum])) {
-                valid = false;
-                break;
-            }
-        } else {
-            valid = n[charNum] == 'k';
-        }
-    }
-    efpDataSize_kib_t s = ::atol(n.c_str());
-    if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
-        std::ostringstream oss;
-        oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'";
-        throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
-    }
-    return s;
-}
-
-// static
 int EmptyFilePool::moveEmptyFile(const std::string& from,
                                  const std::string& to) {
     if (::rename(from.c_str(), to.c_str())) {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.h Mon Nov  4 22:15:14 2013
@@ -76,6 +76,10 @@ public:
     std::string takeEmptyFile(const std::string& destDirectory);
     void returnEmptyFile(const std::string& srcFile);
 
+    static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib);
+    static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName,
+                                                     const efpPartitionNumber_t partitionNumber);
+
 protected:
     void createEmptyFile();
     std::string getEfpFileName();
@@ -84,8 +88,6 @@ protected:
     void resetEmptyFileHeader(const std::string& fqFileName);
     bool validateEmptyFile(const std::string& emptyFileName) const;
 
-    static efpDataSize_kib_t fileSizeKbFromDirName(const std::string& dirName,
-                                                   const efpPartitionNumber_t partitionNumber);
     static int moveEmptyFile(const std::string& fromFqPath,
                              const std::string& toFqPath);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.cpp Mon Nov  4 22:15:14 2013
@@ -24,15 +24,22 @@
 #include <dirent.h>
 #include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
 #include "qpid/linearstore/jrnl/jdir.h"
+#include "qpid/linearstore/jrnl/JournalLog.h"
 #include "qpid/linearstore/jrnl/slock.h"
 #include <vector>
 
+//#include <iostream> // DEBUG
+
 namespace qpid {
 namespace qls_jrnl {
 
 EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
+                                           const efpPartitionNumber_t defaultPartitionNumber,
+                                           const efpDataSize_kib_t defaultEfpDataSize_kib,
                                            JournalLog& journalLogRef) :
                 qlsStorePath_(qlsStorePath),
+                defaultPartitionNumber_(defaultPartitionNumber),
+                defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
                 journalLogRef_(journalLogRef)
 {}
 
@@ -45,53 +52,70 @@ EmptyFilePoolManager::~EmptyFilePoolMana
 }
 
 void EmptyFilePoolManager::findEfpPartitions() {
-    //std::cout << "*** Reading " << qlsStorePath << std::endl; // DEBUG
+//std::cout << "*** Reading " << qlsStorePath_ << std::endl; // DEBUG
+    bool foundPartition = false;
     std::vector<std::string> dirList;
-    jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
-    for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
-        if ((*i)[0] == 'p' && i->length() == 4) { // Filter: look only at names pNNN
-            efpPartitionNumber_t pn = ::atoi(i->c_str() + 1);
-            std::string fullDirPath(qlsStorePath_ + "/" + (*i));
-            EmptyFilePoolPartition* efppp = 0;
-            try {
-                efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
-                {
-                    slock l(partitionMapMutex_);
-                    partitionMap_[pn] = efppp;
-                }
-            } catch (const std::exception& e) {
-                if (efppp != 0) {
-                    delete efppp;
-                    efppp = 0;
+    while (!foundPartition) {
+        jdir::read_dir(qlsStorePath_, dirList, true, false, true, false);
+        for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+            efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i);
+            if (pn > 0) { // valid partition name found
+                std::string fullDirPath(qlsStorePath_ + "/" + (*i));
+                EmptyFilePoolPartition* efppp = 0;
+                try {
+                    efppp = new EmptyFilePoolPartition(pn, fullDirPath, journalLogRef_);
+                    {
+                        slock l(partitionMapMutex_);
+                        partitionMap_[pn] = efppp;
+                    }
+                } catch (const std::exception& e) {
+                    if (efppp != 0) {
+                        delete efppp;
+                        efppp = 0;
+                    }
+//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
                 }
-                //std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl;
+                if (efppp != 0)
+                    efppp->findEmptyFilePools();
+                foundPartition = true;
             }
-            if (efppp != 0)
-                efppp->findEmptyFilePools();
+        }
+
+        // If no partition was found, create an empty default partition with a warning.
+        if (!foundPartition) {
+            journalLogRef_.log(JournalLog::LOG_WARN, "No EFP partition found, creating an empty partition.");
+            std::ostringstream oss;
+            oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+                << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+            jdir::create_dir(oss.str());
         }
     }
-    // TODO: Log results
-/*
-    QLS_LOG(info, "EFP Manager initialization complete");
+
+    journalLogRef_.log(JournalLog::LOG_NOTICE, "EFP Manager initialization complete");
     std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
     std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
     getEfpPartitions(partitionList);
     if (partitionList.size() == 0) {
-        QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
+        journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
     } else {
-        QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
+        std::stringstream oss;
+        oss << "> EFP Partitions found: " << partitionList.size();
+        journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
         for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
             filePoolList.clear();
             (*i)->getEmptyFilePools(filePoolList);
-            QLS_LOG(info, "  * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
-                          (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
+            std::stringstream oss;
+            oss << "  * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
+                << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
+            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
             for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
-                QLS_LOG(info, "    - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
-                              " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB");
+                std::ostringstream oss;
+                oss << "    - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
+                              " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
+            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
             }
         }
     }
-*/
 }
 
 void EmptyFilePoolManager::getEfpFileSizes(std::vector<efpDataSize_kib_t>& efpFileSizeList,
@@ -155,7 +179,7 @@ void EmptyFilePoolManager::getEfpPartiti
 }
 
 EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIdentity) {
-    return getEmptyFilePool(efpIdentity.first, efpIdentity.second);
+    return getEmptyFilePool(efpIdentity.pn_, efpIdentity.ds_);
 }
 
 EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolManager.h Mon Nov  4 22:15:14 2013
@@ -36,13 +36,17 @@ protected:
     typedef partitionMap_t::iterator partitionMapItr_t;
     typedef partitionMap_t::const_iterator partitionMapConstItr_t;
 
-    std::string qlsStorePath_;
+    const std::string qlsStorePath_;
+    const efpPartitionNumber_t defaultPartitionNumber_;
+    const efpDataSize_kib_t defaultEfpDataSize_kib_;
     JournalLog& journalLogRef_;
     partitionMap_t partitionMap_;
     smutex partitionMapMutex_;
 
 public:
     EmptyFilePoolManager(const std::string& qlsStorePath_,
+                         const efpPartitionNumber_t defaultPartitionNumber,
+                         const efpDataSize_kib_t defaultEfpDataSize_kib,
                          JournalLog& journalLogRef_);
     virtual ~EmptyFilePoolManager();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp Mon Nov  4 22:15:14 2013
@@ -22,11 +22,14 @@
 #include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h"
 
 #include <dirent.h>
+#include <iomanip>
 #include "qpid/linearstore/jrnl/jdir.h"
 #include "qpid/linearstore/jrnl/jerrno.h"
 #include "qpid/linearstore/jrnl/jexception.h"
 #include "qpid/linearstore/jrnl/slock.h"
 
+//#include <iostream> // DEBUG
+
 namespace qpid {
 namespace qls_jrnl {
 
@@ -53,7 +56,7 @@ EmptyFilePoolPartition::~EmptyFilePoolPa
 
 void
 EmptyFilePoolPartition::findEmptyFilePools() {
-    //std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
     std::vector<std::string> dirList;
     jdir::read_dir(partitionDir_, dirList, true, false, false, false);
     bool foundEfpDir = false;
@@ -65,7 +68,7 @@ EmptyFilePoolPartition::findEmptyFilePoo
     }
     if (foundEfpDir) {
         std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
-        //std::cout << "Reading " << efpDir << std::endl; // DEBUG
+//std::cout << "Reading " << efpDir << std::endl; // DEBUG
         dirList.clear();
         jdir::read_dir(efpDir, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
@@ -117,6 +120,26 @@ efpPartitionNumber_t EmptyFilePoolPartit
     return partitionNum_;
 }
 
+// static
+std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) {
+    std::ostringstream oss;
+    oss << "p" << std::setfill('0') << std::setw(3) << partitionNumber;
+    return oss.str();
+}
+
+//static
+efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) {
+    if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) {
+        long pn = ::strtol(name.c_str() + 1, 0, 0);
+        if (pn == 0 && errno) {
+            return 0;
+        } else {
+            return (efpPartitionNumber_t)pn;
+        }
+    }
+    return 0;
+}
+
 // --- protected functions ---
 
 void EmptyFilePoolPartition::validatePartitionDir() {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolPartition.h Mon Nov  4 22:15:14 2013
@@ -68,6 +68,9 @@ public:
     std::string getPartitionDirectory() const;
     efpPartitionNumber_t getPartitionNumber() const;
 
+    static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber);
+    static efpPartitionNumber_t getPartitionNumber(const std::string& name);
+
 protected:
     void validatePartitionDir();
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/EmptyFilePoolTypes.h Mon Nov  4 22:15:14 2013
@@ -22,19 +22,27 @@
 #ifndef QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
 #define QPID_QLS_JRNL_EMPTYFILEPOOLTYPES_H_
 
+#include <iostream>
 #include <stdint.h>
 #include <utility> // std::pair
 
 namespace qpid {
 namespace qls_jrnl {
 
-    typedef uint64_t efpDataSize_kib_t;     ///< Size of data part of file (excluding file header) in kib
-    typedef uint64_t efpFileSize_kib_t;     ///< Size of file (header + data) in kib
-    typedef uint32_t efpDataSize_sblks_t;   ///< Size of data part of file (excluding file header) in sblks
-    typedef uint32_t efpFileSize_sblks_t;   ///< Size of file (header + data) in sblks
-    typedef uint32_t efpFileCount_t;        ///< Number of files in a partition or pool
-    typedef uint16_t efpPartitionNumber_t;  ///< Number assigned to a partition
-    typedef std::pair<efpPartitionNumber_t, efpDataSize_kib_t> efpIdentity_t; ///< Unique identity of a pool, consisting of the partition number and data size
+typedef uint64_t efpDataSize_kib_t;     ///< Size of data part of file (excluding file header) in kib
+typedef uint64_t efpFileSize_kib_t;     ///< Size of file (header + data) in kib
+typedef uint32_t efpDataSize_sblks_t;   ///< Size of data part of file (excluding file header) in sblks
+typedef uint32_t efpFileSize_sblks_t;   ///< Size of file (header + data) in sblks
+typedef uint32_t efpFileCount_t;        ///< Number of files in a partition or pool
+typedef uint16_t efpPartitionNumber_t;  ///< Number assigned to a partition
+
+typedef struct efpIdentity_t {
+    efpPartitionNumber_t pn_;
+    efpDataSize_kib_t ds_;
+    efpIdentity_t() : pn_(0), ds_(0) {}
+    efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
+    friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+} efpIdentity_t;
 
 }} // namespace qpid::qls_jrnl
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp Mon Nov  4 22:15:14 2013
@@ -28,10 +28,28 @@
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
 #include <unistd.h>
 
+//#include <iostream> // DEBUG
+
 namespace qpid {
 namespace qls_jrnl {
 
 JournalFile::JournalFile(const std::string& fqFileName,
+                         const ::file_hdr_t& fileHeader) :
+            fqFileName_(fqFileName),
+            fileSeqNum_(fileHeader._file_number),
+            fileHandle_(-1),
+            fileCloseFlag_(false),
+            fileHeaderBasePtr_ (0),
+            fileHeaderPtr_(0),
+            aioControlBlockPtr_(0),
+            fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+            enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
+            submittedDblkCount_("JournalFile::submittedDblkCount", 0),
+            completedDblkCount_("JournalFile::completedDblkCount", 0),
+            outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0)
+{}
+
+JournalFile::JournalFile(const std::string& fqFileName,
                          const uint64_t fileSeqNum,
                          const efpDataSize_kib_t efpDataSize_kib) :
             fqFileName_(fqFileName),
@@ -42,10 +60,10 @@ JournalFile::JournalFile(const std::stri
             fileHeaderPtr_(0),
             aioControlBlockPtr_(0),
             fileSize_dblks_(((efpDataSize_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
-            enqueuedRecordCount_(0),
-            submittedDblkCount_(0),
-            completedDblkCount_(0),
-            outstandingAioOpsCount_(0)
+            enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
+            submittedDblkCount_("JournalFile::submittedDblkCount", 0),
+            completedDblkCount_("JournalFile::completedDblkCount", 0),
+            outstandingAioOpsCount_("JournalFile::outstandingAioOpsCount", 0)
 {}
 
 JournalFile::~JournalFile() {
@@ -82,14 +100,6 @@ JournalFile::finalize() {
     }
 }
 
-const std::string JournalFile::getDirectory() const {
-    return fqFileName_.substr(0, fqFileName_.rfind('/'));
-}
-
-const std::string JournalFile::getFileName() const {
-    return fqFileName_.substr(fqFileName_.rfind('/')+1);
-}
-
 const std::string JournalFile::getFqFileName() const {
     return fqFileName_;
 }
@@ -98,10 +108,6 @@ uint64_t JournalFile::getFileSeqNum() co
     return fileSeqNum_;
 }
 
-bool JournalFile::isOpen() const {
-    return fileHandle_ >= 0;
-}
-
 int JournalFile::open() {
     fileHandle_ = ::open(fqFileName_.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
     if (fileHandle_ < 0) {
@@ -182,30 +188,10 @@ uint32_t JournalFile::incrEnqueuedRecord
     return enqueuedRecordCount_.increment();
 }
 
-uint32_t JournalFile::addEnqueuedRecordCount(const uint32_t a) {
-    return enqueuedRecordCount_.add(a);
-}
-
 uint32_t JournalFile::decrEnqueuedRecordCount() {
     return enqueuedRecordCount_.decrementLimit();
 }
 
-uint32_t JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
-    return enqueuedRecordCount_.subtractLimit(s);
-}
-
-uint32_t JournalFile::getSubmittedDblkCount() const {
-    return submittedDblkCount_.get();
-}
-
-uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
-    return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
-}
-
-uint32_t JournalFile::getCompletedDblkCount() const {
-    return completedDblkCount_.get();
-}
-
 uint32_t JournalFile::addCompletedDblkCount(const uint32_t a) {
     return completedDblkCount_.addLimit(a, submittedDblkCount_.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
 }
@@ -214,10 +200,6 @@ uint16_t JournalFile::getOutstandingAioO
     return outstandingAioOpsCount_.get();
 }
 
-uint16_t JournalFile::incrOutstandingAioOperationCount() {
-    return outstandingAioOpsCount_.increment();
-}
-
 uint16_t JournalFile::decrOutstandingAioOperationCount() {
     uint16_t r = outstandingAioOpsCount_.decrementLimit();
     if (fileCloseFlag_ && outstandingAioOpsCount_ == 0) { // Delayed close
@@ -232,33 +214,10 @@ bool JournalFile::isEmpty() const {
     return submittedDblkCount_ == 0;
 }
 
-bool JournalFile::isDataEmpty() const {
-    return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
-}
-
-u_int32_t JournalFile::dblksRemaining() const {
-    return fileSize_dblks_ - submittedDblkCount_;
-}
-
-bool JournalFile::isFull() const {
-    return submittedDblkCount_ == fileSize_dblks_;
-}
-
-bool JournalFile::isFullAndComplete() const {
-    return completedDblkCount_ == fileSize_dblks_;
-}
-
-u_int32_t JournalFile::getOutstandingAioDblks() const {
-    return submittedDblkCount_ - completedDblkCount_;
-}
-
-bool JournalFile::getNextFile() const {
-    return isFull();
-}
-
 bool JournalFile::isNoEnqueuedRecordsRemaining() const {
-    return !isDataEmpty() &&          // Must be written to, not empty
-           enqueuedRecordCount_ == 0;  // No remaining enqueued records
+    return /*!enqueueStarted_ &&*/          // Not part-way through encoding an enqueue
+           isFullAndComplete() &&       // Full with all AIO returned
+           enqueuedRecordCount_ == 0;   // No remaining enqueued records
 }
 
 // debug aid
@@ -284,4 +243,59 @@ const std::string JournalFile::status_st
     return oss.str();
 }
 
+// --- protected functions ---
+
+const std::string JournalFile::getDirectory() const {
+    return fqFileName_.substr(0, fqFileName_.rfind('/'));
+}
+
+const std::string JournalFile::getFileName() const {
+    return fqFileName_.substr(fqFileName_.rfind('/')+1);
+}
+
+bool JournalFile::isOpen() const {
+    return fileHandle_ >= 0;
+}
+
+uint32_t JournalFile::getSubmittedDblkCount() const {
+    return submittedDblkCount_.get();
+}
+
+uint32_t JournalFile::addSubmittedDblkCount(const uint32_t a) {
+    return submittedDblkCount_.addLimit(a, fileSize_dblks_, jerrno::JERR_JNLF_FILEOFFSOVFL);
+}
+
+uint32_t JournalFile::getCompletedDblkCount() const {
+    return completedDblkCount_.get();
+}
+
+uint16_t JournalFile::incrOutstandingAioOperationCount() {
+    return outstandingAioOpsCount_.increment();
+}
+
+u_int32_t JournalFile::dblksRemaining() const {
+    return fileSize_dblks_ - submittedDblkCount_;
+}
+
+bool JournalFile::getNextFile() const {
+    return isFull();
+}
+
+u_int32_t JournalFile::getOutstandingAioDblks() const {
+    return submittedDblkCount_ - completedDblkCount_;
+}
+
+bool JournalFile::isDataEmpty() const {
+    return submittedDblkCount_ <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS;
+}
+
+bool JournalFile::isFull() const {
+    return submittedDblkCount_ == fileSize_dblks_;
+}
+
+bool JournalFile::isFullAndComplete() const {
+    return completedDblkCount_ == fileSize_dblks_;
+}
+
+
 }} // namespace qpid::qls_jrnl

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h Mon Nov  4 22:15:14 2013
@@ -52,6 +52,8 @@ protected:
 
 public:
     JournalFile(const std::string& fqFileName,
+                const ::file_hdr_t& fileHeader);
+    JournalFile(const std::string& fqFileName,
                 const uint64_t fileSeqNum,
                 const efpDataSize_kib_t efpDataSize_kib);
     virtual ~JournalFile();
@@ -59,12 +61,9 @@ public:
     void initialize(const uint32_t completedDblkCount);
     void finalize();
 
-    const std::string getDirectory() const;
-    const std::string getFileName() const;
     const std::string getFqFileName() const;
     uint64_t getFileSeqNum() const;
 
-    bool isOpen() const;
     int open();
     void close();
     void asyncFileHeaderWrite(io_context_t ioContextPtr,
@@ -81,32 +80,38 @@ public:
 
     uint32_t getEnqueuedRecordCount() const;
     uint32_t incrEnqueuedRecordCount();
-    uint32_t addEnqueuedRecordCount(const uint32_t a);
     uint32_t decrEnqueuedRecordCount();
-    uint32_t subtrEnqueuedRecordCount(const uint32_t s);
 
-    uint32_t getSubmittedDblkCount() const;
-    uint32_t addSubmittedDblkCount(const uint32_t a);
-
-    uint32_t getCompletedDblkCount() const;
     uint32_t addCompletedDblkCount(const uint32_t a);
 
     uint16_t getOutstandingAioOperationCount() const;
-    uint16_t incrOutstandingAioOperationCount();
     uint16_t decrOutstandingAioOperationCount();
 
     // Status helper functions
     bool isEmpty() const;                      ///< True if no writes of any kind have occurred
-    bool isDataEmpty() const;                  ///< True if only file header written, data is still empty
-    u_int32_t dblksRemaining() const;          ///< Dblks remaining until full
-    bool isFull() const;                       ///< True if all possible dblks have been submitted (but may not yet have returned from AIO)
-    bool isFullAndComplete() const;            ///< True if all submitted dblks have returned from AIO
-    u_int32_t getOutstandingAioDblks() const;  ///< Dblks still to be written
-    bool getNextFile() const;                  ///< True when next file is needed
     bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
 
     // debug aid
     const std::string status_str(const uint8_t indentDepth) const;
+
+protected:
+    const std::string getDirectory() const;
+    const std::string getFileName() const;
+    bool isOpen() const;
+
+    uint32_t getSubmittedDblkCount() const;
+    uint32_t addSubmittedDblkCount(const uint32_t a);
+
+    uint32_t getCompletedDblkCount() const;
+
+    uint16_t incrOutstandingAioOperationCount();
+
+    u_int32_t dblksRemaining() const;          ///< Dblks remaining until full
+    bool getNextFile() const;                  ///< True when next file is needed
+    u_int32_t getOutstandingAioDblks() const;  ///< Dblks still to be written
+    bool isDataEmpty() const;                  ///< True if only file header written, data is still empty
+    bool isFull() const;                       ///< True if all possible dblks have been submitted (but may not yet have returned from AIO)
+    bool isFullAndComplete() const;            ///< True if all submitted dblks have returned from AIO
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Mon Nov  4 22:15:14 2013
@@ -29,6 +29,8 @@
 #include "qpid/linearstore/jrnl/slock.h"
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
 
+//#include <iostream> // DEBUG
+
 namespace qpid {
 namespace qls_jrnl {
 
@@ -36,8 +38,8 @@ LinearFileController::LinearFileControll
             jcntlRef_(jcntlRef),
             emptyFilePoolPtr_(0),
             currentJournalFilePtr_(0),
-            fileSeqCounter_(0),
-            recordIdCounter_(0)
+            fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
+            recordIdCounter_("LinearFileController::recordIdCounter", 0)
 {}
 
 LinearFileController::~LinearFileController() {}
@@ -47,7 +49,7 @@ void LinearFileController::initialize(co
                                       uint64_t initialFileNumberVal) {
     journalDirectory_.assign(journalDirectory);
     emptyFilePoolPtr_ = emptyFilePoolPtr;
-    fileSeqCounter_ = initialFileNumberVal;
+    fileSeqCounter_.set(initialFileNumberVal);
 }
 
 void LinearFileController::finalize() {
@@ -57,14 +59,13 @@ void LinearFileController::finalize() {
     }
 }
 
-void LinearFileController::addJournalFile(const std::string& fileName,
-                                          const uint64_t fileNumber,
-                                          const uint32_t fileSize_kib,
+void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
                                           const uint32_t completedDblkCount) {
-    if (currentJournalFilePtr_)
+    if (currentJournalFilePtr_) {
         currentJournalFilePtr_->close();
-    currentJournalFilePtr_ = new JournalFile(fileName, fileNumber, fileSize_kib);
-    currentJournalFilePtr_->initialize(completedDblkCount);
+    }
+    journalFilePtr->initialize(completedDblkCount);
+    currentJournalFilePtr_ = journalFilePtr;
     {
         slock l(journalFileListMutex_);
         journalFileList_.push_back(currentJournalFilePtr_);
@@ -72,18 +73,10 @@ void LinearFileController::addJournalFil
     currentJournalFilePtr_->open();
 }
 
-efpDataSize_kib_t LinearFileController::dataSize_kib() const {
-    return emptyFilePoolPtr_->dataSize_kib();
-}
-
 efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
     return emptyFilePoolPtr_->dataSize_sblks();
 }
 
-efpFileSize_kib_t LinearFileController::fileSize_kib() const {
-    return emptyFilePoolPtr_->fileSize_kib();
-}
-
 efpFileSize_sblks_t LinearFileController::fileSize_sblks() const {
     return emptyFilePoolPtr_->fileSize_sblks();
 }
@@ -100,28 +93,24 @@ void LinearFileController::pullEmptyFile
     addJournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr_->dataSize_kib(), 0);
 }
 
-void LinearFileController::purgeFilesToEfp() {
+void LinearFileController::purgeEmptyFilesToEfp() {
     slock l(journalFileListMutex_);
-    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining()) {
-        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
-        delete journalFileList_.front();
-        journalFileList_.pop_front();
-    }
+    purgeEmptyFilesToEfpNoLock();
 }
 
 uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
-    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->getEnqueuedRecordCount();
 }
 
 uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
-    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
     return find(fileSeqNumber)->incrEnqueuedRecordCount();
 }
 
 uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
     slock l(journalFileListMutex_);
-    return find(fileSeqNumber)->decrEnqueuedRecordCount();
+    uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
+//    purgeEmptyFilesToEfpNoLock();
+    return r;
 }
 
 uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
@@ -160,101 +149,11 @@ uint64_t LinearFileController::getCurren
     return currentJournalFilePtr_->getFileSeqNum();
 }
 
-uint32_t LinearFileController::getEnqueuedRecordCount() const {
-    assertCurrentJournalFileValid("getEnqueuedRecordCount");
-    return currentJournalFilePtr_->getEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::incrEnqueuedRecordCount() {
-    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
-    return currentJournalFilePtr_->incrEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
-    assertCurrentJournalFileValid("addEnqueuedRecordCount");
-    return currentJournalFilePtr_->addEnqueuedRecordCount(a);
-}
-
-uint32_t LinearFileController::decrEnqueuedRecordCount() {
-    assertCurrentJournalFileValid("decrEnqueuedRecordCount");
-    return currentJournalFilePtr_->decrEnqueuedRecordCount();
-}
-
-uint32_t LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
-    assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
-    return currentJournalFilePtr_->subtrEnqueuedRecordCount(s);
-}
-
-uint32_t LinearFileController::getWriteSubmittedDblkCount() const {
-    assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
-    return currentJournalFilePtr_->getSubmittedDblkCount();
-}
-
-uint32_t LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
-    assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
-    return currentJournalFilePtr_->addSubmittedDblkCount(a);
-}
-
-uint32_t LinearFileController::getWriteCompletedDblkCount() const {
-    assertCurrentJournalFileValid("getWriteCompletedDblkCount");
-    return currentJournalFilePtr_->getCompletedDblkCount();
-}
-
-uint32_t LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
-    assertCurrentJournalFileValid("addWriteCompletedDblkCount");
-    return currentJournalFilePtr_->addCompletedDblkCount(a);
-}
-
-uint16_t LinearFileController::getOutstandingAioOperationCount() const {
-    assertCurrentJournalFileValid("getOutstandingAioOperationCount");
-    return currentJournalFilePtr_->getOutstandingAioOperationCount();
-}
-
-uint16_t LinearFileController::incrOutstandingAioOperationCount() {
-    assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
-    return currentJournalFilePtr_->incrOutstandingAioOperationCount();
-}
-
-uint16_t LinearFileController::decrOutstandingAioOperationCount() {
-    assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
-    return currentJournalFilePtr_->decrOutstandingAioOperationCount();
-}
-
 bool LinearFileController::isEmpty() const {
     assertCurrentJournalFileValid("isEmpty");
     return currentJournalFilePtr_->isEmpty();
 }
 
-bool LinearFileController::isDataEmpty() const {
-    assertCurrentJournalFileValid("isDataEmpty");
-    return currentJournalFilePtr_->isDataEmpty();
-}
-
-u_int32_t LinearFileController::dblksRemaining() const {
-    assertCurrentJournalFileValid("dblksRemaining");
-    return currentJournalFilePtr_->dblksRemaining();
-}
-
-bool LinearFileController::isFull() const {
-    assertCurrentJournalFileValid("isFull");
-    return currentJournalFilePtr_->isFull();
-}
-
-bool LinearFileController::isFullAndComplete() const {
-    assertCurrentJournalFileValid("isFullAndComplete");
-    return currentJournalFilePtr_->isFullAndComplete();
-}
-
-u_int32_t LinearFileController::getOutstandingAioDblks() const {
-    assertCurrentJournalFileValid("getOutstandingAioDblks");
-    return currentJournalFilePtr_->getOutstandingAioDblks();
-}
-
-bool LinearFileController::needNextFile() const {
-    assertCurrentJournalFileValid("getNextFile");
-    return currentJournalFilePtr_->getNextFile();
-}
-
 const std::string LinearFileController::status(const uint8_t indentDepth) const {
     std::string indent((size_t)indentDepth, '.');
     std::ostringstream oss;
@@ -273,8 +172,12 @@ const std::string LinearFileController::
 
 // --- protected functions ---
 
-bool LinearFileController::checkCurrentJournalFileValid() const {
-    return currentJournalFilePtr_ != 0;
+void LinearFileController::addJournalFile(const std::string& fileName,
+                                          const uint64_t fileNumber,
+                                          const uint32_t fileSize_kib,
+                                          const uint32_t completedDblkCount) {
+    JournalFile* jfp = new JournalFile(fileName, fileNumber, fileSize_kib);
+    addJournalFile(jfp, completedDblkCount);
 }
 
 void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -283,6 +186,10 @@ void LinearFileController::assertCurrent
     }
 }
 
+bool LinearFileController::checkCurrentJournalFileValid() const {
+    return currentJournalFilePtr_ != 0;
+}
+
 // NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
 JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
     if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
@@ -301,4 +208,15 @@ uint64_t LinearFileController::getNextFi
     return fileSeqCounter_.increment();
 }
 
+void LinearFileController::purgeEmptyFilesToEfpNoLock() {
+//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
+    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
+           journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
+        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
+    }
+}
+
 }} // namespace qpid::qls_jrnl

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?rev=1538790&r1=1538789&r2=1538790&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Mon Nov  4 22:15:14 2013
@@ -62,18 +62,14 @@ public:
                     uint64_t initialFileNumberVal);
     void finalize();
 
-    void addJournalFile(const std::string& fileName,
-                        const uint64_t fileNumber,
-                        const uint32_t fileSize_kib,
+    void addJournalFile(JournalFile* journalFilePtr,
                         const uint32_t completedDblkCount);
 
-    efpDataSize_kib_t dataSize_kib() const;
     efpDataSize_sblks_t dataSize_sblks() const;
-    efpFileSize_kib_t fileSize_kib() const;
     efpFileSize_sblks_t fileSize_sblks() const;
     uint64_t getNextRecordId();
     void pullEmptyFileFromEfp();
-    void purgeFilesToEfp();
+    void purgeEmptyFilesToEfp();
 
     // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
     uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
@@ -83,7 +79,7 @@ public:
                                         const uint32_t a);
     uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
 
-    // Pass-through functions for JournalFile class
+    // Pass-through functions for current JournalFile class
     void asyncFileHeaderWrite(io_context_t ioContextPtr,
                               const uint16_t userFlags,
                               const uint64_t recordId,
@@ -94,42 +90,25 @@ public:
                         uint32_t dataSize_dblks);
 
     uint64_t getCurrentFileSeqNum() const;
-
-    uint32_t getEnqueuedRecordCount() const;
-    uint32_t incrEnqueuedRecordCount();
-    uint32_t addEnqueuedRecordCount(const uint32_t a);
-    uint32_t decrEnqueuedRecordCount();
-    uint32_t subtrEnqueuedRecordCount(const uint32_t s);
-
-    uint32_t getWriteSubmittedDblkCount() const;
-    uint32_t addWriteSubmittedDblkCount(const uint32_t a);
-
-    uint32_t getWriteCompletedDblkCount() const;
-    uint32_t addWriteCompletedDblkCount(const uint32_t a);
-
-    uint16_t getOutstandingAioOperationCount() const;
-    uint16_t incrOutstandingAioOperationCount();
-    uint16_t decrOutstandingAioOperationCount();
-
-    bool isEmpty() const;                      // True if no writes of any kind have occurred
-    bool isDataEmpty() const;                  // True if only file header written, data is still empty
-    u_int32_t dblksRemaining() const;          // Dblks remaining until full
-    bool isFull() const;                       // True if all possible dblks have been submitted (but may not yet have returned from AIO)
-    bool isFullAndComplete() const;            // True if all submitted dblks have returned from AIO
-    u_int32_t getOutstandingAioDblks() const;  // Dblks still to be written
-    bool needNextFile() const;                 // True when next file is needed
+    bool isEmpty() const;
 
     // Debug aid
     const std::string status(const uint8_t indentDepth) const;
 
 protected:
+    void addJournalFile(const std::string& fileName,
+                        const uint64_t fileNumber,
+                        const uint32_t fileSize_kib,
+                        const uint32_t completedDblkCount);
     void assertCurrentJournalFileValid(const char* const functionName) const;
     bool checkCurrentJournalFileValid() const;
     JournalFile* find(const efpFileCount_t fileSeqNumber);
     uint64_t getNextFileSeqNum();
+    void purgeEmptyFilesToEfpNoLock();
 };
 
-typedef void (LinearFileController::*lfcAddJournalFileFn)(const std::string&, const uint64_t, const uint32_t, const uint32_t);
+typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
+                                                          const uint32_t completedDblkCount);
 
 }} // namespace qpid::qls_jrnl
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message