qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1530024 [2/4] - in /qpid/branches/linearstore/qpid/cpp/src: ./ qpid/linearstore/ qpid/linearstore/jrnl/ qpid/linearstore/jrnl/utils/ tests/linearstore/
Date Mon, 07 Oct 2013 18:39:25 GMT
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.cpp Mon Oct  7 18:39:24 2013
@@ -21,34 +21,271 @@
 
 #include "qpid/linearstore/jrnl/JournalFile.h"
 
+#include <fcntl.h>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/pmgr.h"
+#include "qpid/linearstore/jrnl/utils/file_hdr.h"
+#include <unistd.h>
+
 namespace qpid {
 namespace qls_jrnl {
 
-JournalFile::JournalFile(const std::string& fqFileName_) :
-            fqfn(fqFileName_)
+JournalFile::JournalFile(const std::string& fqFileName_,
+                         const uint64_t fileSeqNum_,
+                         const uint32_t fileSize_kib_) :
+            fqFileName(fqFileName_),
+            fileSeqNum(fileSeqNum_),
+            fileHandle(-1),
+            fileCloseFlag(false),
+            fileHeaderBasePtr (0),
+            fileHeaderPtr(0),
+            aioControlBlockPtr(0),
+            fileSizeDblks(((fileSize_kib_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES)) / JRNL_DBLK_SIZE_BYTES),
+            enqueuedRecordCount(0),
+            submittedDblkCount(0),
+            completedDblkCount(0),
+            outstandingAioOpsCount(0)
 {}
 
-JournalFile::~JournalFile() {}
+JournalFile::~JournalFile() {
+    finalize();
+}
+
+void
+JournalFile::initialize() {
+    if (::posix_memalign(&fileHeaderBasePtr, QLS_AIO_ALIGN_BOUNDARY, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024))
+    {
+        std::ostringstream oss;
+        oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024);
+        oss << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+    }
+    fileHeaderPtr = (::file_hdr_t*)fileHeaderBasePtr;
+    aioControlBlockPtr = new aio_cb;
+}
+
+void
+JournalFile::finalize() {
+    if (fileHeaderBasePtr != 0) {
+        std::free(fileHeaderBasePtr);
+        fileHeaderBasePtr = 0;
+        fileHeaderPtr = 0;
+    }
+    if (aioControlBlockPtr != 0) {
+        std::free(aioControlBlockPtr);
+        aioControlBlockPtr = 0;
+    }
+}
 
 const std::string
-JournalFile::directory() const {
-    return fqfn.substr(0, fqfn.rfind('/'));
+JournalFile::getDirectory() const {
+    return fqFileName.substr(0, fqFileName.rfind('/'));
 }
 
 const std::string
-JournalFile::fileName() const {
-    return fqfn.substr(fqfn.rfind('/'));
+JournalFile::getFileName() const {
+    return fqFileName.substr(fqFileName.rfind('/')+1);
 }
 
 const std::string
-JournalFile::fqFileName() const {
-    return fqfn;
+JournalFile::getFqFileName() const {
+    return fqFileName;
+}
+
+uint64_t
+JournalFile::getFileSeqNum() const {
+    return fileSeqNum;
+}
+
+int
+JournalFile::open() {
+    fileHandle = ::open(fqFileName.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+    if (fileHandle < 0) {
+        std::ostringstream oss;
+        oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JNLF_OPEN, oss.str(), "JournalFile", "open");
+    }
+    return fileHandle;
 }
 
 bool
-JournalFile::empty() const {
-    // TODO: return true if no still-enqueued records (or parts of records) exist in this file
-    return true;
+JournalFile::isOpen() const {
+    return fileHandle >= 0;
+}
+
+void
+JournalFile::close() {
+    if (fileHandle >= 0) {
+        if (getOutstandingAioDblks()) {
+            fileCloseFlag = true; // Close later when all outstanding AIOs have returned
+        } else {
+            int res = ::close(fileHandle);
+            fileHandle = -1;
+            if (res != 0) {
+                std::ostringstream oss;
+                oss << "file=\"" << fqFileName << "\"" << FORMAT_SYSERR(errno);
+                throw jexception(jerrno::JERR_JNLF_CLOSE, oss.str(), "JournalFile", "open");
+            }
+        }
+    }
+}
+
+void
+JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr_,
+                                  const efpPartitionNumber_t efpPartitionNumber_,
+                                  const efpDataSize_kib_t efpDataSize_kib_,
+                                  const uint16_t userFlags_,
+                                  const uint64_t recordId_,
+                                  const uint64_t firstRecordOffset_,
+                                  const std::string queueName_) {
+    ::file_hdr_create(fileHeaderPtr, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber_, efpDataSize_kib_);
+    ::file_hdr_init(fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, userFlags_, recordId_, firstRecordOffset_, fileSeqNum, queueName_.size(), queueName_.data());
+    aio::prep_pwrite(aioControlBlockPtr, fileHandle, (void*)fileHeaderBasePtr, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_KIB * 1024, 0UL);
+    if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr) < 0)
+        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+    addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS);
+    incrOutstandingAioOperationCount();
+}
+
+void
+JournalFile::asyncPageWrite(io_context_t ioContextPtr_,
+                            aio_cb* aioControlBlockPtr_,
+                            void* data_,
+                            uint32_t dataSize_dblks_) {
+    aio::prep_pwrite_2(aioControlBlockPtr_, fileHandle, data_, dataSize_dblks_ * JRNL_DBLK_SIZE_BYTES, submittedDblkCount.get() * JRNL_DBLK_SIZE_BYTES);
+    pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr_->data); // This page's control block (pcb)
+    pcbp->_wdblks = dataSize_dblks_;
+    pcbp->_jfp = this;
+    if (aio::submit(ioContextPtr_, 1, &aioControlBlockPtr_) < 0)
+        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+    addSubmittedDblkCount(dataSize_dblks_);
+    incrOutstandingAioOperationCount();
+}
+
+uint32_t
+JournalFile::getEnqueuedRecordCount() const {
+    return enqueuedRecordCount.get();
+}
+
+uint32_t
+JournalFile::incrEnqueuedRecordCount() {
+    return enqueuedRecordCount.increment();
+}
+
+uint32_t
+JournalFile::addEnqueuedRecordCount(const uint32_t a) {
+    return enqueuedRecordCount.add(a);
+}
+
+uint32_t
+JournalFile::decrEnqueuedRecordCount() {
+    return enqueuedRecordCount.decrementLimit();
+}
+
+uint32_t
+JournalFile::subtrEnqueuedRecordCount(const uint32_t s) {
+    return enqueuedRecordCount.subtractLimit(s);
+}
+
+uint32_t
+JournalFile::getSubmittedDblkCount() const {
+    return submittedDblkCount.get();
+}
+
+uint32_t
+JournalFile::addSubmittedDblkCount(const uint32_t a) {
+    return submittedDblkCount.addLimit(a, fileSizeDblks, jerrno::JERR_JNLF_FILEOFFSOVFL);
+}
+
+uint32_t
+JournalFile::getCompletedDblkCount() const {
+    return completedDblkCount.get();
+}
+
+uint32_t
+JournalFile::addCompletedDblkCount(const uint32_t a) {
+    return completedDblkCount.addLimit(a, submittedDblkCount.get(), jerrno::JERR_JNLF_CMPLOFFSOVFL);
+}
+
+uint16_t JournalFile::getOutstandingAioOperationCount() const {
+    return outstandingAioOpsCount.get();
+}
+
+uint16_t JournalFile::incrOutstandingAioOperationCount() {
+    return outstandingAioOpsCount.increment();
+}
+
+uint16_t JournalFile::decrOutstandingAioOperationCount() {
+    uint16_t r = outstandingAioOpsCount.decrementLimit();
+    if (fileCloseFlag && outstandingAioOpsCount == 0) { // Delayed close
+        close();
+    }
+    return r;
+}
+
+bool
+JournalFile::isEmpty() const {
+    return submittedDblkCount == 0;
+}
+
+bool
+JournalFile::isDataEmpty() const {
+    return submittedDblkCount <= QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS;
+}
+
+u_int32_t
+JournalFile::dblksRemaining() const {
+    return fileSizeDblks - submittedDblkCount;
+}
+
+bool
+JournalFile::isFull() const {
+    return submittedDblkCount == fileSizeDblks;
+}
+
+bool
+JournalFile::isFullAndComplete() const {
+    return completedDblkCount == fileSizeDblks;
+}
+
+u_int32_t
+JournalFile::getOutstandingAioDblks() const {
+    return submittedDblkCount - completedDblkCount;
+}
+
+bool
+JournalFile::getNextFile() const {
+    return isFull();
+}
+
+bool
+JournalFile::isNoEnqueuedRecordsRemaining() const {
+    return !isDataEmpty() &&          // Must be written to, not empty
+           enqueuedRecordCount == 0;  // No remaining enqueued records
+}
+
+const std::string
+JournalFile::status_str(const uint8_t indentDepth_) const {
+    std::string indent((size_t)indentDepth_, '.');
+    std::ostringstream oss;
+    oss << indent << "JournalFile: fileName=" << getFileName() << std::endl;
+    oss << indent << "  directory=" << getDirectory() << std::endl;
+    oss << indent << "  fileSizeDblks=" << fileSizeDblks << std::endl;
+    oss << indent << "  open=" << (isOpen() ? "T" : "F") << std::endl;
+    oss << indent << "  fileHandle=" << fileHandle << std::endl;
+    oss << indent << "  enqueuedRecordCount=" << getEnqueuedRecordCount() << std::endl;
+    oss << indent << "  submittedDblkCount=" << getSubmittedDblkCount() << std::endl;
+    oss << indent << "  completedDblkCount=" << getCompletedDblkCount() << std::endl;
+    oss << indent << "  outstandingAioOpsCount=" << getOutstandingAioOperationCount() << std::endl;
+    oss << indent << "  isEmpty()=" << (isEmpty() ? "T" : "F") << std::endl;
+    oss << indent << "  isDataEmpty()=" << (isDataEmpty() ? "T" : "F") << std::endl;
+    oss << indent << "  dblksRemaining()=" << dblksRemaining() << std::endl;
+    oss << indent << "  isFull()=" << (isFull() ? "T" : "F") << std::endl;
+    oss << indent << "  isFullAndComplete()=" << (isFullAndComplete() ? "T" : "F") << std::endl;
+    oss << indent << "  getOutstandingAioDblks()=" << getOutstandingAioDblks() << std::endl;
+    oss << indent << "  getNextFile()=" << (getNextFile() ? "T" : "F") << std::endl;
+    return oss.str();
 }
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFile.h Mon Oct  7 18:39:24 2013
@@ -22,23 +22,90 @@
 #ifndef QPID_LINEARSTORE_JOURNALFILE_H_
 #define QPID_LINEARSTORE_JOURNALFILE_H_
 
+#include "qpid/linearstore/jrnl/aio.h"
+#include "qpid/linearstore/jrnl/AtomicCounter.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
+#include <stdint.h>
 #include <string>
 
+class file_hdr_t;
+
 namespace qpid {
 namespace qls_jrnl {
 
 class JournalFile
 {
 protected:
-    const std::string fqfn;
+    const std::string fqFileName;
+    const uint64_t fileSeqNum;
+    int fileHandle;
+    bool fileCloseFlag;
+    void* fileHeaderBasePtr;
+    ::file_hdr_t* fileHeaderPtr;
+    aio_cb* aioControlBlockPtr;
+    uint32_t fileSizeDblks;                            ///< File size in data blocks, including file header
+    AtomicCounter<uint32_t> enqueuedRecordCount;       ///< Count of enqueued records
+    AtomicCounter<uint32_t> submittedDblkCount;        ///< Write file count (data blocks) for submitted AIO
+    AtomicCounter<uint32_t> completedDblkCount;        ///< Write file count (data blocks) for completed AIO
+    AtomicCounter<uint16_t> outstandingAioOpsCount;    ///< Outstanding AIO operations on this file
+
 public:
-    JournalFile(const std::string& fqFileName_);
+    JournalFile(const std::string& fqFileName,
+                const uint64_t fileSeqNum,
+                const uint32_t fileSize_kib);
     virtual ~JournalFile();
 
-    const std::string directory() const;
-    const std::string fileName() const;
-    const std::string fqFileName() const;
-    bool empty() const;
+    void initialize();
+    void finalize();
+
+    const std::string getDirectory() const;
+    const std::string getFileName() const;
+    const std::string getFqFileName() const;
+    uint64_t getFileSeqNum() const;
+
+    int open();
+    bool isOpen() const;
+    void close();
+    void asyncFileHeaderWrite(io_context_t ioContextPtr,
+                              const efpPartitionNumber_t efpPartitionNumber,
+                              const efpDataSize_kib_t efpDataSize_kib,
+                              const uint16_t userFlags,
+                              const uint64_t recordId,
+                              const uint64_t firstRecordOffset,
+                              const std::string queueName);
+    void asyncPageWrite(io_context_t ioContextPtr,
+                        aio_cb* aioControlBlockPtr,
+                        void* data,
+                        uint32_t dataSize_dblks);
+
+    uint32_t getEnqueuedRecordCount() const;
+    uint32_t incrEnqueuedRecordCount();
+    uint32_t addEnqueuedRecordCount(const uint32_t a);
+    uint32_t decrEnqueuedRecordCount();
+    uint32_t subtrEnqueuedRecordCount(const uint32_t s);
+
+    uint32_t getSubmittedDblkCount() const;
+    uint32_t addSubmittedDblkCount(const uint32_t a);
+
+    uint32_t getCompletedDblkCount() const;
+    uint32_t addCompletedDblkCount(const uint32_t a);
+
+    uint16_t getOutstandingAioOperationCount() const;
+    uint16_t incrOutstandingAioOperationCount();
+    uint16_t decrOutstandingAioOperationCount();
+
+    // Status helper functions
+    bool isEmpty() const;                      ///< True if no writes of any kind have occurred
+    bool isDataEmpty() const;                  ///< True if only file header written, data is still empty
+    u_int32_t dblksRemaining() const;          ///< Dblks remaining until full
+    bool isFull() const;                       ///< True if all possible dblks have been submitted (but may not yet have returned from AIO)
+    bool isFullAndComplete() const;            ///< True if all submitted dblks have returned from AIO
+    u_int32_t getOutstandingAioDblks() const;  ///< Dblks still to be written
+    bool getNextFile() const;                  ///< True when next file is needed
+    bool isNoEnqueuedRecordsRemaining() const; ///< True when all enqueued records (or parts) have been dequeued
+
+    // Debug aid
+    const std::string status_str(const uint8_t indentDepth) const;
 };
 
 }} // namespace qpid::qls_jrnl

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalLog.cpp Mon Oct  7 18:39:24 2013
@@ -38,7 +38,7 @@ void
 JournalLog::log(log_level_t ll, const char* jid, const char* const log_stmt) const {
     if (ll > LOG_ERROR) {
         std::cerr << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
-    } else if (ll > LOG_INFO) {
+    } else if (ll >= LOG_INFO) {
         std::cout << log_level_str(ll) << ": Journal \"" << jid << "\": " << log_stmt << std::endl;
     }
 

Copied: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp (from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp?p2=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp&p1=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp&r1=1525056&r2=1530024&rev=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.cpp Mon Oct  7 18:39:24 2013
@@ -19,11 +19,12 @@
  *
  */
 
-#include "qpid/linearstore/jrnl/JournalFileController.h"
+#include "qpid/linearstore/jrnl/LinearFileController.h"
 
 #include <fstream>
 #include "qpid/linearstore/jrnl/EmptyFilePool.h"
 #include "qpid/linearstore/jrnl/jcfg.h"
+#include "qpid/linearstore/jrnl/jcntl.h"
 #include "qpid/linearstore/jrnl/JournalFile.h"
 #include "qpid/linearstore/jrnl/slock.h"
 #include "qpid/linearstore/jrnl/utils/file_hdr.h"
@@ -33,110 +34,291 @@
 namespace qpid {
 namespace qls_jrnl {
 
-JournalFileController::JournalFileController(const std::string& dir_,
-                                             EmptyFilePool* efpp_) :
-            dir(dir_),
-            efpp(efpp_),
-            fileSeqCounter(0)
-{
-    //std::cout << "*** JournalFileController::JournalFileController() dir=" << dir << std::endl;
-}
-
-JournalFileController::~JournalFileController() {}
-
-void
-JournalFileController::pullEmptyFileFromEfp(const uint64_t recId_,
-                                            const uint64_t firstRecOffs_,
-                                            const std::string& queueName_) {
-    std::string ef = efpp->takeEmptyFile(dir);
-    //std::cout << "*** JournalFileController::pullEmptyFileFromEfp() qn=" << queueName_ << " ef=" << ef << std::endl;
-    const JournalFile* jfp = new JournalFile(ef/*efpp->takeEmptyFile(dir)*/);
-    initialzeFileHeader(jfp->fqFileName(), recId_, firstRecOffs_, getNextFileSeqNum(), queueName_);
+LinearFileController::LinearFileController(jcntl& jcntlRef_) :
+            jcntlRef(jcntlRef_),
+            emptyFilePoolPtr(0),
+            currentJournalFilePtr(0),
+            fileSeqCounter(0),
+            recordIdCounter(0)
+{}
+
+LinearFileController::~LinearFileController() {}
+
+void
+LinearFileController::initialize(const std::string& journalDirectory_,
+                                 EmptyFilePool* emptyFilePoolPtr_) {
+    journalDirectory.assign(journalDirectory_);
+    emptyFilePoolPtr = emptyFilePoolPtr_;
+}
+
+void
+LinearFileController::finalize() {
+    while (!journalFileList.empty()) {
+        delete journalFileList.front();
+        journalFileList.pop_front();
+    }
+}
+
+void
+LinearFileController::pullEmptyFileFromEfp() {
+    if (currentJournalFilePtr)
+        currentJournalFilePtr->close();
+    std::string ef = emptyFilePoolPtr->takeEmptyFile(journalDirectory); // Moves file from EFP only, returns new file name
+    std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
+    currentJournalFilePtr = new JournalFile(ef, getNextFileSeqNum(), emptyFilePoolPtr->dataSize_kib());
+    currentJournalFilePtr->initialize();
     {
         slock l(journalFileListMutex);
-        journalFileList.push_back(jfp);
+        journalFileList.push_back(currentJournalFilePtr);
     }
+    currentJournalFilePtr->open();
 }
 
 void
-JournalFileController::purgeFilesToEfp() {
+LinearFileController::purgeFilesToEfp() {
     slock l(journalFileListMutex);
-    while (journalFileList.front()->empty()) {
-
-        efpp->returnEmptyFile(journalFileList.front());
+    while (journalFileList.front()->isNoEnqueuedRecordsRemaining()) {
+        emptyFilePoolPtr->returnEmptyFile(journalFileList.front());
         delete journalFileList.front();
         journalFileList.pop_front();
     }
 }
 
+efpDataSize_kib_t
+LinearFileController::dataSize_kib() const {
+    return emptyFilePoolPtr->dataSize_kib();
+}
+
+efpFileSize_kib_t
+LinearFileController::fileSize_kib() const {
+    return emptyFilePoolPtr->fileSize_kib();
+}
+
+efpDataSize_sblks_t
+LinearFileController::dataSize_sblks() const {
+    return emptyFilePoolPtr->dataSize_sblks();
+}
+
+efpFileSize_sblks_t
+LinearFileController::fileSize_sblks() const {
+    return emptyFilePoolPtr->fileSize_sblks();
+}
+
+uint64_t
+LinearFileController::getNextRecordId() {
+    return recordIdCounter.increment();
+}
+
+uint32_t
+LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex);
+    return find(fileSeqNumber)->decrEnqueuedRecordCount();
+}
+
+uint32_t
+LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+    slock l(journalFileListMutex);
+    return find(fileSeqNumber)->addCompletedDblkCount(a);
+}
+
+uint16_t
+LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+    slock l(journalFileListMutex);
+    return find(fileSeqNumber)->decrOutstandingAioOperationCount();
+}
+
 void
-JournalFileController::finalize() {}
+LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr,
+                                           const uint16_t userFlags,
+                                           const uint64_t recordId,
+                                           const uint64_t firstRecordOffset) {
+    currentJournalFilePtr->asyncFileHeaderWrite(ioContextPtr,
+                                                emptyFilePoolPtr->getPartitionNumber(),
+                                                emptyFilePoolPtr->dataSize_kib(),
+                                                userFlags,
+                                                recordId,
+                                                firstRecordOffset,
+                                                jcntlRef.id());
+}
 
 void
-JournalFileController::setFileSeqNum(const uint64_t fileSeqNum) {
-    fileSeqCounter = fileSeqNum;
+LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
+                                     aio_cb* aioControlBlockPtr,
+                                     void* data,
+                                     uint32_t dataSize_dblks) {
+    assertCurrentJournalFileValid("asyncPageWrite");
+    currentJournalFilePtr->asyncPageWrite(ioContextPtr, aioControlBlockPtr, data, dataSize_dblks);
 }
 
-// protected
+uint64_t
+LinearFileController::getCurrentFileSeqNum() const {
+    assertCurrentJournalFileValid("getCurrentFileSeqNum");
+    return currentJournalFilePtr->getFileSeqNum();
+}
 
-std::string
-JournalFileController::readFileHeader(file_hdr_t* fhdr_,
-                                      const std::string& fileName_) {
-    //std::cout << "*** JournalFileController::readFileHeader() fn=" << fileName_ << std::endl;
-    char buff[JRNL_SBLK_SIZE];
-    std::ifstream ifs(fileName_.c_str(), std::ifstream::in | std::ifstream::binary);
-    if (ifs.good()) {
-        ifs.read(buff, JRNL_SBLK_SIZE);
-        ifs.close();
-        std::memcpy(fhdr_, buff, sizeof(file_hdr_t));
-        return std::string(buff + sizeof(file_hdr_t), fhdr_->_queue_name_len);
-    } else {
-        std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for reading." << std::endl;
-    }
-    return std::string("");
+uint32_t
+LinearFileController::getEnqueuedRecordCount() const {
+    assertCurrentJournalFileValid("getEnqueuedRecordCount");
+    return currentJournalFilePtr->getEnqueuedRecordCount();
 }
 
-void
-JournalFileController::writeFileHeader(const file_hdr_t* fhdr_,
-                                       const std::string& queueName_,
-                                       const std::string& fileName_) {
-    //std::cout << "*** JournalFileController::writeFileHeader() qn=" << queueName_ << " fn=" << fileName_ << std::endl;
-    std::fstream fs(fileName_.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
-    if (fs.good()) {
-        fs.seekp(0);
-        fs.write((const char*)fhdr_, sizeof(file_hdr_t));
-        fs.write(queueName_.data(), fhdr_->_queue_name_len);
-        fs.close();
+uint32_t
+LinearFileController::incrEnqueuedRecordCount() {
+    assertCurrentJournalFileValid("incrEnqueuedRecordCount");
+    return currentJournalFilePtr->incrEnqueuedRecordCount();
+}
+
+uint32_t
+LinearFileController::addEnqueuedRecordCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addEnqueuedRecordCount");
+    return currentJournalFilePtr->addEnqueuedRecordCount(a);
+}
+
+uint32_t
+LinearFileController::decrEnqueuedRecordCount() {
+    assertCurrentJournalFileValid("decrEnqueuedRecordCount");
+    return currentJournalFilePtr->decrEnqueuedRecordCount();
+}
+
+uint32_t
+LinearFileController::subtrEnqueuedRecordCount(const uint32_t s) {
+    assertCurrentJournalFileValid("subtrEnqueuedRecordCount");
+    return currentJournalFilePtr->subtrEnqueuedRecordCount(s);
+}
+
+uint32_t
+LinearFileController::getWriteSubmittedDblkCount() const {
+    assertCurrentJournalFileValid("getWriteSubmittedDblkCount");
+    return currentJournalFilePtr->getSubmittedDblkCount();
+}
+
+uint32_t
+LinearFileController::addWriteSubmittedDblkCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addWriteSubmittedDblkCount");
+    return currentJournalFilePtr->addSubmittedDblkCount(a);
+}
+
+uint32_t
+LinearFileController::getWriteCompletedDblkCount() const {
+    assertCurrentJournalFileValid("getWriteCompletedDblkCount");
+    return currentJournalFilePtr->getCompletedDblkCount();
+}
+
+uint32_t
+LinearFileController::addWriteCompletedDblkCount(const uint32_t a) {
+    assertCurrentJournalFileValid("addWriteCompletedDblkCount");
+    return currentJournalFilePtr->addCompletedDblkCount(a);
+}
+
+uint16_t
+LinearFileController::getOutstandingAioOperationCount() const {
+    assertCurrentJournalFileValid("getOutstandingAioOperationCount");
+    return currentJournalFilePtr->getOutstandingAioOperationCount();
+}
+
+uint16_t
+LinearFileController::incrOutstandingAioOperationCount() {
+    assertCurrentJournalFileValid("incrOutstandingAioOperationCount");
+    return currentJournalFilePtr->incrOutstandingAioOperationCount();
+}
+
+uint16_t
+LinearFileController::decrOutstandingAioOperationCount() {
+    assertCurrentJournalFileValid("decrOutstandingAioOperationCount");
+    return currentJournalFilePtr->decrOutstandingAioOperationCount();
+}
+
+bool
+LinearFileController::isEmpty() const {
+    assertCurrentJournalFileValid("isEmpty");
+    return currentJournalFilePtr->isEmpty();
+}
+
+bool
+LinearFileController::isDataEmpty() const {
+    assertCurrentJournalFileValid("isDataEmpty");
+    return currentJournalFilePtr->isDataEmpty();
+}
+
+u_int32_t
+LinearFileController::dblksRemaining() const {
+    assertCurrentJournalFileValid("dblksRemaining");
+    return currentJournalFilePtr->dblksRemaining();
+}
+
+bool
+LinearFileController::isFull() const {
+    assertCurrentJournalFileValid("isFull");
+    return currentJournalFilePtr->isFull();
+}
+
+bool
+LinearFileController::isFullAndComplete() const {
+    assertCurrentJournalFileValid("isFullAndComplete");
+    return currentJournalFilePtr->isFullAndComplete();
+}
+
+u_int32_t
+LinearFileController::getOutstandingAioDblks() const {
+    assertCurrentJournalFileValid("getOutstandingAioDblks");
+    return currentJournalFilePtr->getOutstandingAioDblks();
+}
+
+bool
+LinearFileController::getNextFile() const {
+    assertCurrentJournalFileValid("getNextFile");
+    return currentJournalFilePtr->getNextFile();
+}
+
+const std::string
+LinearFileController::status(const uint8_t indentDepth) const {
+    std::string indent((size_t)indentDepth, '.');
+    std::ostringstream oss;
+    oss << indent << "LinearFileController: queue=" << jcntlRef.id() << std::endl;
+    oss << indent << "  journalDirectory=" << journalDirectory << std::endl;
+    oss << indent << "  fileSeqCounter=" << fileSeqCounter.get() << std::endl;
+    oss << indent << "  recordIdCounter=" << recordIdCounter.get() << std::endl;
+    oss << indent << "  journalFileList.size=" << journalFileList.size() << std::endl;
+    if (checkCurrentJournalFileValid()) {
+        oss << currentJournalFilePtr->status_str(indentDepth+2);
     } else {
-        std::cerr << "ERROR: Could not open file \"" << fileName_ << "\" for writing." << std::endl;
+        oss << indent << "  <No current journal file>" << std::endl;
     }
+    return oss.str();
 }
 
-void
-JournalFileController::resetFileHeader(const std::string& fileName_) {
-    //std::cout << "*** JournalFileController::resetFileHeader() fn=" << fileName_ << std::endl;
-    file_hdr_t fhdr;
-    readFileHeader(&fhdr, fileName_);
-    ::file_hdr_reset(&fhdr);
-    writeFileHeader(&fhdr, std::string(""), fileName_);
+// protected
+
+bool
+LinearFileController::checkCurrentJournalFileValid() const {
+    return currentJournalFilePtr != 0;
 }
 
 void
-JournalFileController::initialzeFileHeader(const std::string& fileName_,
-                                           const uint64_t recId_,
-                                           const uint64_t firstRecOffs_,
-                                           const uint64_t fileSeqNum_,
-                                           const std::string& queueName_) {
-    //std::cout << "*** JournalFileController::initialzeFileHeader() fn=" << fileName_ << " sn=" << fileSeqNum_ << " qn=" << queueName_ << std::endl;
-    file_hdr_t fhdr;
-    readFileHeader(&fhdr, fileName_);
-    ::file_hdr_init(&fhdr, 0, recId_, firstRecOffs_, fileSeqNum_, queueName_.length(), queueName_.data());
-    writeFileHeader(&fhdr, queueName_, fileName_);
+LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
+    if (!checkCurrentJournalFileValid()) {
+        throw jexception(jerrno::JERR__NULL, "LinearFileController", functionName);
+    }
+}
+
+// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
+JournalFile*
+LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+    if (currentJournalFilePtr != 0 && currentJournalFilePtr->getFileSeqNum() == fileSeqNumber)
+        return currentJournalFilePtr;
+    for (JournalFileListItr_t i=journalFileList.begin(); i!=journalFileList.end(); ++i) {
+        if ((*i)->getFileSeqNum() == fileSeqNumber) {
+            return *i;
+        }
+    }
+    std::ostringstream oss;
+    oss << "fileSeqNumber=" << fileSeqNumber;
+    throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
 }
 
 uint64_t
-JournalFileController::getNextFileSeqNum() {
-    return __sync_add_and_fetch(&fileSeqCounter, 1); // GCC atomic increment, not portable
+LinearFileController::getNextFileSeqNum() {
+    return fileSeqCounter.increment();
 }
 
 }} // namespace qpid::qls_jrnl

Copied: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h (from r1525056, qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h)
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h?p2=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h&p1=qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h&r1=1525056&r2=1530024&rev=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/JournalFileController.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/LinearFileController.h Mon Oct  7 18:39:24 2013
@@ -19,50 +19,105 @@
  *
  */
 
-#ifndef QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
-#define QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
+#ifndef QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
+#define QPID_LINEARSTORE_LINEARFILECONTROLLER_H_
 
 #include <deque>
+#include "qpid/linearstore/jrnl/aio.h"
+#include "qpid/linearstore/jrnl/AtomicCounter.h"
+#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
 #include "qpid/linearstore/jrnl/smutex.h"
 
 struct file_hdr_t;
 namespace qpid {
 namespace qls_jrnl {
 class EmptyFilePool;
+class jcntl;
 class JournalFile;
-//typedef struct file_hdr_t file_hdr_t;
 
-class JournalFileController
+class LinearFileController
 {
 protected:
-    typedef std::deque<const JournalFile*> JournalFileList_t;
+    typedef std::deque<JournalFile*> JournalFileList_t;
     typedef JournalFileList_t::iterator JournalFileListItr_t;
 
-    const std::string dir;
-    EmptyFilePool* efpp;
-    uint64_t fileSeqCounter;
+    jcntl& jcntlRef;
+    std::string journalDirectory;
+    EmptyFilePool* emptyFilePoolPtr;
+    JournalFile* currentJournalFilePtr;
+    AtomicCounter<uint64_t> fileSeqCounter;
+    AtomicCounter<uint64_t> recordIdCounter;
+
     JournalFileList_t journalFileList;
     smutex journalFileListMutex;
 
 public:
-    JournalFileController(const std::string& dir,
-                          EmptyFilePool* efpp);
-    virtual ~JournalFileController();
+    LinearFileController(jcntl& jcntlRef_);
+    virtual ~LinearFileController();
 
-    void pullEmptyFileFromEfp(const uint64_t recId, const uint64_t firstRecOffs, const std::string& queueName);
-    void purgeFilesToEfp();
+    void initialize(const std::string& journalDirectory_, EmptyFilePool* emptyFilePoolPtr_);
     void finalize();
-    void setFileSeqNum(const uint64_t fileSeqNum);
+
+    void pullEmptyFileFromEfp();
+    void purgeFilesToEfp();
+    efpDataSize_kib_t dataSize_kib() const;
+    efpFileSize_kib_t fileSize_kib() const;
+    efpDataSize_sblks_t dataSize_sblks() const;
+    efpFileSize_sblks_t fileSize_sblks() const;
+
+    uint64_t getNextRecordId();
+
+    // Functions for manipulating counts of non-current JournalFile instances in journalFileList
+    uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
+    uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a);
+    uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
+
+    // Pass-through functions for JournalFile class
+    void asyncFileHeaderWrite(io_context_t ioContextPtr,
+                              const uint16_t userFlags,
+                              const uint64_t recordId,
+                              const uint64_t firstRecordOffset);
+    void asyncPageWrite(io_context_t ioContextPtr,
+                        aio_cb* aioControlBlockPtr,
+                        void* data,
+                        uint32_t dataSize_dblks);
+
+    uint64_t getCurrentFileSeqNum() const;
+
+    uint32_t getEnqueuedRecordCount() const;
+    uint32_t incrEnqueuedRecordCount();
+    uint32_t addEnqueuedRecordCount(const uint32_t a);
+    uint32_t decrEnqueuedRecordCount();
+    uint32_t subtrEnqueuedRecordCount(const uint32_t s);
+
+    uint32_t getWriteSubmittedDblkCount() const;
+    uint32_t addWriteSubmittedDblkCount(const uint32_t a);
+
+    uint32_t getWriteCompletedDblkCount() const;
+    uint32_t addWriteCompletedDblkCount(const uint32_t a);
+
+    uint16_t getOutstandingAioOperationCount() const;
+    uint16_t incrOutstandingAioOperationCount();
+    uint16_t decrOutstandingAioOperationCount();
+
+    bool isEmpty() const;                      // True if no writes of any kind have occurred
+    bool isDataEmpty() const;                  // True if only file header written, data is still empty
+    u_int32_t dblksRemaining() const;          // Dblks remaining until full
+    bool isFull() const;                       // True if all possible dblks have been submitted (but may not yet have returned from AIO)
+    bool isFullAndComplete() const;            // True if all submitted dblks have returned from AIO
+    u_int32_t getOutstandingAioDblks() const;  // Dblks still to be written
+    bool getNextFile() const;                  // True when next file is needed
+
+    // Debug aid
+    const std::string status(const uint8_t indentDepth) const;
 
 protected:
-    std::string readFileHeader(file_hdr_t* fhdr, const std::string& fileName);
-    void writeFileHeader(const file_hdr_t* fhdr, const std::string& queueName, const std::string& fileName);
-    void resetFileHeader(const std::string& fileName);
-    void initialzeFileHeader(const std::string& fileName, const uint64_t recId, const uint64_t firstRecOffs,
-                             const uint64_t fileSeqNum, const std::string& queueName);
+    bool checkCurrentJournalFileValid() const;
+    void assertCurrentJournalFileValid(const char* const functionName) const;
+    JournalFile* find(const efpFileCount_t fileSeqNumber); // NOT THREAD SAFE - use under external lock
     uint64_t getNextFileSeqNum();
 };
 
 }} // namespace qpid::qls_jrnl
 
-#endif // QPID_LINEARSTORE_JOURNALFILECONTROLLER_H_
+#endif // QPID_LINEARSTORE_LINEARFILECONTROLLER_H_

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.cpp Mon Oct  7 18:39:24 2013
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/jrnl/RecoveryManager.h"
+
+#include <iomanip>
+#include "qpid/linearstore/jrnl/jcfg.h"
+#include <sstream>
+
+namespace qpid {
+namespace qls_jrnl
+{
+
+RecoveryManager::RecoveryManager() : _journalFileList(),
+                                     _fileNumberNameMap(),
+                                     _enqueueCountList(),
+                                     _journalEmptyFlag(false),
+                                     _firstRecordOffset(0),
+                                     _endOffset(0),
+                                     _highestRecordId(0ULL),
+                                     _lastFileFullFlag(false),
+                                     _currentRid(0ULL),
+                                     _currentFileNumber(0ULL),
+                                     _currentFileName(),
+                                     _fileSize(0),
+                                     _recordStart(0),
+                                     _inFileStream(),
+                                     _readComplete(false)
+{}
+
+RecoveryManager::~RecoveryManager() {}
+
+std::string
+RecoveryManager::toString(const std::string& jid,
+                          bool compact) {
+    std::ostringstream oss;
+    if (compact) {
+        oss << "Recovery journal analysis (jid=\"" << jid << "\"):";
+        oss << " jfl=[";
+        for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
+            if (i!=_fileNumberNameMap.begin()) oss << " ";
+            oss << i->first << ":" << i->second.substr(i->second.rfind('/')+1);
+        }
+        oss << "] ecl=[ ";
+        for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
+            if (j != _enqueueCountList.begin()) oss << " ";
+            oss << *j;
+        }
+        oss << " ] empty=" << (_journalEmptyFlag ? "T" : "F");
+        oss << " fro=0x" << std::hex << _firstRecordOffset << std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " eo=0x" << std::hex << _endOffset << std::dec << " ("  << (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)";
+        oss << " hrid=0x" << std::hex << _highestRecordId << std::dec;
+        oss << " lffull=" << (_lastFileFullFlag ? "T" : "F");
+    } else {
+        oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
+        oss << "  Number of journal files = " << _fileNumberNameMap.size() << std::endl;
+        oss << "  Journal File List:" << std::endl;
+        for (std::map<uint64_t, std::string>::const_iterator i=_fileNumberNameMap.begin(); i!=_fileNumberNameMap.end(); ++i) {
+            oss << "    " << i->first << ": " << i->second.substr(i->second.rfind('/')+1) << std::endl;
+        }
+        oss << "  Enqueue Counts: [ " << std::endl;
+        for (std::vector<uint32_t>::const_iterator j = _enqueueCountList.begin(); j!=_enqueueCountList.end(); ++j) {
+            if (j != _enqueueCountList.begin()) oss << ", ";
+            oss << *j;
+        }
+        oss << " ]" << std::endl;
+        for (unsigned i=0; i<_enqueueCountList.size(); i++)
+           oss << "    File " << std::setw(2) << i << ": " << _enqueueCountList[i] << std::endl;
+        oss << "  Journal empty (_jempty) = " << (_journalEmptyFlag ? "TRUE" : "FALSE") << std::endl;
+        oss << "  First record offset in first fid (_fro) = 0x" << std::hex << _firstRecordOffset <<
+                std::dec << " (" << (_firstRecordOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  End offset (_eo) = 0x" << std::hex << _endOffset << std::dec << " ("  <<
+                (_endOffset/JRNL_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
+        oss << "  Highest rid (_h_rid) = 0x" << std::hex << _highestRecordId << std::dec << std::endl;
+        oss << "  Last file full (_lffull) = " << (_lastFileFullFlag ? "TRUE" : "FALSE") << std::endl;
+        oss << "  Enqueued records (txn & non-txn):" << std::endl;
+    }
+    return oss.str();
+}
+
+}} // namespace qpid::qls_jrnl

Added: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h?rev=1530024&view=auto
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h (added)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/RecoveryManager.h Mon Oct  7 18:39:24 2013
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_RECOVERYSTATE_H_
+#define QPID_LINEARSTORE_RECOVERYSTATE_H_
+
+#include <fstream>
+#include <map>
+#include <stdint.h>
+#include <vector>
+
+namespace qpid {
+namespace qls_jrnl {
+
+class RecoveryManager
+{
+private:
+    // Initial journal analysis data
+    std::vector<std::string> _journalFileList;          ///< Journal file list
+    std::map<uint64_t, std::string> _fileNumberNameMap; ///< File number - name map
+    std::vector<uint32_t> _enqueueCountList;            ///< Number enqueued records found for each file
+    bool _journalEmptyFlag;                             ///< Journal data files empty
+    std::streamoff _firstRecordOffset;                  ///< First record offset in ffid
+    std::streamoff _endOffset;                          ///< End offset (first byte past last record)
+    uint64_t _highestRecordId;                          ///< Highest rid found
+    bool _lastFileFullFlag;                             ///< Last file is full
+
+    // State for recovery of individual enqueued records
+    uint64_t _currentRid;
+    uint64_t _currentFileNumber;
+    std::string _currentFileName;
+    std::streamoff _fileSize;
+    std::streamoff _recordStart;
+    std::ifstream _inFileStream;
+    bool _readComplete;
+
+public:
+    RecoveryManager();
+    virtual ~RecoveryManager();
+
+    std::string toString(const std::string& jid,
+                         bool compact = true);
+};
+
+}} // namespace qpid::qls_jrnl
+
+#endif // QPID_LINEARSTORE_RECOVERYSTATE_H_

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.cpp Mon Oct  7 18:39:24 2013
@@ -39,10 +39,10 @@ smutex data_tok::_mutex;
 
 data_tok::data_tok():
     _wstate(NONE),
-    _rstate(UNREAD),
+//    _rstate(UNREAD),
     _dsize(0),
     _dblks_written(0),
-    _dblks_read(0),
+//    _dblks_read(0),
     _pg_cnt(0),
     _fid(0),
     _rid(0),
@@ -106,12 +106,15 @@ data_tok::wstate_str(write_state wstate)
     return "<wstate unknown>";
 }
 
+/*
 const char*
 data_tok::rstate_str() const
 {
     return rstate_str(_rstate);
 }
+*/
 
+/*
 const char*
 data_tok::rstate_str(read_state rstate)
 {
@@ -129,7 +132,9 @@ data_tok::rstate_str(read_state rstate)
     }
     return "<rstate unknown>";
 }
+*/
 
+/*
 void
 data_tok::set_rstate(const read_state rstate)
 {
@@ -143,15 +148,16 @@ data_tok::set_rstate(const read_state rs
     }
     _rstate = rstate;
 }
+*/
 
 void
 data_tok::reset()
 {
     _wstate = NONE;
-    _rstate = UNREAD;
+//    _rstate = UNREAD;
     _dsize = 0;
     _dblks_written = 0;
-    _dblks_read = 0;
+//    _dblks_read = 0;
     _pg_cnt = 0;
     _fid = 0;
     _rid = 0;
@@ -164,7 +170,7 @@ data_tok::status_str() const
 {
     std::ostringstream oss;
     oss << std::hex << std::setfill('0');
-    oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str() << "; rs=" << rstate_str();
+    oss << "dtok id=0x" << _icnt << "; ws=" << wstate_str()/* << "; rs=" << rstate_str()*/;
     oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
     for (unsigned i=0; i<_xid.size(); i++)
     {
@@ -174,8 +180,8 @@ data_tok::status_str() const
             oss << "/" << std::setw(2) << (int)((char)_xid[i]);
     }
     oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
-    oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read;
-    oss << " pc=0x" << _pg_cnt;
+    oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written/* << "; dr=0x" << _dblks_read*/;
+    oss << "; pc=0x" << _pg_cnt;
     return oss.str();
 }
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/data_tok.h Mon Oct  7 18:39:24 2013
@@ -72,6 +72,7 @@ namespace qls_jrnl
             COMMITTED
         };
 
+/*
         enum read_state
         {
             UNREAD,     ///< Data block not read
@@ -79,18 +80,19 @@ namespace qls_jrnl
             SKIP_PART,  ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
             READ        ///< Data block is fully read
         };
+*/
 
     protected:
         static smutex _mutex;
         static uint64_t _cnt;
         uint64_t    _icnt;
         write_state _wstate;        ///< Enqueued / dequeued state of data
-        read_state  _rstate;        ///< Read state of data
+//        read_state  _rstate;        ///< Read state of data
         std::size_t _dsize;         ///< Data size in bytes
         uint32_t    _dblks_written; ///< Data blocks read/written
-        uint32_t    _dblks_read;    ///< Data blocks read/written
+//        uint32_t    _dblks_read;    ///< Data blocks read/written
         uint32_t    _pg_cnt;        ///< Page counter - incr for each page containing part of data
-        uint16_t    _fid;           ///< FID containing header of enqueue record
+        uint64_t    _fid;           ///< FID containing header of enqueue record
         uint64_t    _rid;           ///< RID of data set by enqueue operation
         std::string _xid;           ///< XID set by enqueue operation
         uint64_t    _dequeue_rid;   ///< RID of data set by dequeue operation
@@ -104,16 +106,16 @@ namespace qls_jrnl
         inline write_state wstate() const { return _wstate; }
         const char* wstate_str() const;
         static const char* wstate_str(write_state wstate);
-        inline read_state rstate() const { return _rstate; }
-        const char* rstate_str() const;
-        static const char* rstate_str(read_state rstate);
+//        inline read_state rstate() const { return _rstate; }
+//        const char* rstate_str() const;
+//        static const char* rstate_str(read_state rstate);
         inline bool is_writable() const { return _wstate == NONE || _wstate == ENQ_PART; }
         inline bool is_enqueued() const { return _wstate == ENQ; }
         inline bool is_readable() const { return _wstate == ENQ; }
-        inline bool is_read() const { return _rstate == READ; }
+//        inline bool is_read() const { return _rstate == READ; }
         inline bool is_dequeueable() const { return _wstate == ENQ || _wstate == DEQ_PART; }
         inline void set_wstate(const write_state wstate) { _wstate = wstate; }
-        void set_rstate(const read_state rstate);
+//        void set_rstate(const read_state rstate);
         inline std::size_t dsize() const { return _dsize; }
         inline void set_dsize(std::size_t dsize) { _dsize = dsize; }
 
@@ -122,16 +124,16 @@ namespace qls_jrnl
                 { _dblks_written += dblks_written; }
         inline void set_dblocks_written(uint32_t dblks_written) { _dblks_written = dblks_written; }
 
-        inline uint32_t dblocks_read() const { return _dblks_read; }
-        inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
-        inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
+//        inline uint32_t dblocks_read() const { return _dblks_read; }
+//        inline void incr_dblocks_read(uint32_t dblks_read) { _dblks_read += dblks_read; }
+//        inline void set_dblocks_read(uint32_t dblks_read) { _dblks_read = dblks_read; }
 
         inline uint32_t pg_cnt() const { return _pg_cnt; }
         inline uint32_t incr_pg_cnt() { return ++_pg_cnt; }
         inline uint32_t decr_pg_cnt() { assert(_pg_cnt != 0); return --_pg_cnt; }
 
-        inline uint16_t fid() const { return _fid; }
-        inline void set_fid(const uint16_t fid) { _fid = fid; }
+        inline uint64_t fid() const { return _fid; }
+        inline void set_fid(const uint64_t fid) { _fid = fid; }
         inline uint64_t rid() const { return _rid; }
         inline void set_rid(const uint64_t rid) { _rid = rid; }
         inline uint64_t dequeue_rid() const {return _dequeue_rid; }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp Mon Oct  7 18:39:24 2013
@@ -110,8 +110,8 @@ deq_rec::encode(void* wptr, uint32_t rec
     if (_xidp == 0)
         assert(_deq_hdr._xidsize == 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
     {
@@ -162,8 +162,8 @@ deq_rec::encode(void* wptr, uint32_t rec
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
                 wr_cnt += wsize;
 #ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
                 std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
@@ -206,7 +206,7 @@ deq_rec::encode(void* wptr, uint32_t rec
                 wr_cnt += sizeof(_deq_tail);
             }
 #ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
             std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
@@ -226,7 +226,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
         const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
         const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
                 sizeof(rec_tail_t));
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
 
         if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
         {
@@ -259,7 +259,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
             const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
             std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
             rd_cnt += xid_rem;
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
@@ -269,7 +269,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Remainder of xid split
-            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+            const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
             rd_cnt += xid_cp_size;
         }
@@ -309,7 +309,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
                 // Entire header and xid fit within this page, tail split
                 std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
                 rd_cnt += _deq_hdr._xidsize;
-                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
                 if (tail_rem)
                 {
                     std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -319,7 +319,7 @@ deq_rec::decode(rec_hdr_t& h, void* rptr
             else
             {
                 // Header fits within this page, xid split
-                const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+                const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
                 std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
                 rd_cnt += xid_cp_size;
             }
@@ -381,7 +381,7 @@ deq_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
     if (_deq_hdr._xidsize)
         chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.cpp Mon Oct  7 18:39:24 2013
@@ -41,42 +41,33 @@ int16_t enq_map::EMAP_FALSE = 0;
 int16_t enq_map::EMAP_TRUE = 1;
 
 enq_map::enq_map():
-        _map(),
-        _pfid_enq_cnt()
-{}
+        _map(){}
 
 enq_map::~enq_map() {}
 
-void
-enq_map::set_num_jfiles(const uint16_t num_jfiles)
-{
-    _pfid_enq_cnt.resize(num_jfiles, 0);
-}
 
-
-int16_t
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid)
+short
+enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn)
 {
-    return insert_pfid(rid, pfid, false);
+    return insert_pfid(rid, pfid, file_posn, false);
 }
 
-int16_t
-enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const bool locked)
+short
+enq_map::insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked)
 {
     std::pair<emap_itr, bool> ret;
-    emap_data_struct rec(pfid, locked);
+    emap_data_struct_t rec(pfid, file_posn, locked);
     {
         slock s(_mutex);
         ret = _map.insert(emap_param(rid, rec));
     }
     if (ret.second == false)
         return EMAP_DUP_RID;
-    _pfid_enq_cnt.at(pfid)++;
     return EMAP_OK;
 }
 
-int16_t
-enq_map::get_pfid(const uint64_t rid)
+short
+enq_map::get_pfid(const uint64_t rid, int16_t& pfid)
 {
     slock s(_mutex);
     emap_itr itr = _map.find(rid);
@@ -84,11 +75,12 @@ enq_map::get_pfid(const uint64_t rid)
         return EMAP_RID_NOT_FOUND;
     if (itr->second._lock)
         return EMAP_LOCKED;
-    return itr->second._pfid;
+    pfid = itr->second._pfid;
+    return EMAP_OK;
 }
 
-int16_t
-enq_map::get_remove_pfid(const uint64_t rid, const bool txn_flag)
+short
+enq_map::get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag)
 {
     slock s(_mutex);
     emap_itr itr = _map.find(rid);
@@ -96,10 +88,33 @@ enq_map::get_remove_pfid(const uint64_t 
         return EMAP_RID_NOT_FOUND;
     if (itr->second._lock && !txn_flag) // locked, but not a commit/abort
         return EMAP_LOCKED;
-    uint16_t pfid = itr->second._pfid;
+    pfid = itr->second._pfid;
     _map.erase(itr);
-    _pfid_enq_cnt.at(pfid)--;
-    return pfid;
+    return EMAP_OK;
+}
+
+short
+enq_map::get_file_posn(const uint64_t rid, std::streampos& file_posn) {
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    if (itr->second._lock)
+        return EMAP_LOCKED;
+    file_posn = itr->second._file_posn;
+    return EMAP_OK;
+}
+
+short
+enq_map::get_data(const uint64_t rid, emap_data_struct_t& eds) {
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+        return EMAP_RID_NOT_FOUND;
+    eds._pfid = itr->second._pfid;
+    eds._file_posn = itr->second._file_posn;
+    eds._lock = itr->second._lock;
+    return EMAP_OK;
 }
 
 bool
@@ -114,7 +129,7 @@ enq_map::is_enqueued(const uint64_t rid,
     return true;
 }
 
-int16_t
+short
 enq_map::lock(const uint64_t rid)
 {
     slock s(_mutex);
@@ -125,7 +140,7 @@ enq_map::lock(const uint64_t rid)
     return EMAP_OK;
 }
 
-int16_t
+short
 enq_map::unlock(const uint64_t rid)
 {
     slock s(_mutex);
@@ -136,7 +151,7 @@ enq_map::unlock(const uint64_t rid)
     return EMAP_OK;
 }
 
-int16_t
+short
 enq_map::is_locked(const uint64_t rid)
 {
     slock s(_mutex);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_map.h Mon Oct  7 18:39:24 2013
@@ -64,44 +64,46 @@ namespace qls_jrnl
     {
     public:
         // return/error codes
-        static int16_t EMAP_DUP_RID;
-        static int16_t EMAP_LOCKED;
-        static int16_t EMAP_RID_NOT_FOUND;
-        static int16_t EMAP_OK;
-        static int16_t EMAP_FALSE;
-        static int16_t EMAP_TRUE;
-
-    private:
-
-        struct emap_data_struct
-        {
-            uint16_t    _pfid;
-            bool        _lock;
-            emap_data_struct(const uint16_t pfid, const bool lock) : _pfid(pfid), _lock(lock) {}
-        };
-        typedef std::pair<uint64_t, emap_data_struct> emap_param;
-        typedef std::map<uint64_t, emap_data_struct> emap;
+        static short EMAP_DUP_RID;
+        static short EMAP_LOCKED;
+        static short EMAP_RID_NOT_FOUND;
+        static short EMAP_OK;
+        static short EMAP_FALSE;
+        static short EMAP_TRUE;
+
+        typedef struct emap_data_struct_t {
+            uint16_t        _pfid;
+            std::streampos  _file_posn;
+            bool            _lock;
+            emap_data_struct_t() : _pfid(0), _file_posn(0), _lock(false) {}
+            emap_data_struct_t(const uint16_t pfid, const std::streampos file_posn, const bool lock) : _pfid(pfid), _file_posn(file_posn), _lock(lock) {}
+        } emqp_data_struct_t;
+        typedef std::pair<uint64_t, emap_data_struct_t> emap_param;
+        typedef std::map<uint64_t, emap_data_struct_t> emap;
         typedef emap::iterator emap_itr;
 
+    private:
         emap _map;
         smutex _mutex;
-        std::vector<uint32_t> _pfid_enq_cnt;
+//        std::vector<uint32_t> _pfid_enq_cnt;
 
     public:
         enq_map();
         virtual ~enq_map();
 
-        void set_num_jfiles(const uint16_t num_jfiles);
-        inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); };
+//        void set_num_jfiles(const uint16_t num_jfiles);
+//        inline uint32_t get_enq_cnt(const uint16_t pfid) const { return _pfid_enq_cnt.at(pfid); };
 
-        int16_t insert_pfid(const uint64_t rid, const uint16_t pfid); // 0=ok; -3=duplicate rid;
-        int16_t insert_pfid(const uint64_t rid, const uint16_t pfid, const bool locked); // 0=ok; -3=duplicate rid;
-        int16_t get_pfid(const uint64_t rid); // >=0=pfid; -1=rid not found; -2=locked
-        int16_t get_remove_pfid(const uint64_t rid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+        short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn); // 0=ok; -3=duplicate rid;
+        short insert_pfid(const uint64_t rid, const uint16_t pfid, const std::streampos file_posn, const bool locked); // 0=ok; -3=duplicate rid;
+        short get_pfid(const uint64_t rid, int16_t& pfid); // >=0=pfid; -1=rid not found; -2=locked
+        short get_remove_pfid(const uint64_t rid, int16_t& pfid, const bool txn_flag = false); // >=0=pfid; -1=rid not found; -2=locked
+        short get_file_posn(const uint64_t rid, std::streampos& file_posn); // -1=rid not found; -2=locked
+        short get_data(const uint64_t rid, emap_data_struct_t& eds);
         bool is_enqueued(const uint64_t rid, bool ignore_lock = false);
-        int16_t lock(const uint64_t rid); // 0=ok; -1=rid not found
-        int16_t unlock(const uint64_t rid); // 0=ok; -1=rid not found
-        int16_t is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
+        short lock(const uint64_t rid); // 0=ok; -1=rid not found
+        short unlock(const uint64_t rid); // 0=ok; -1=rid not found
+        short is_locked(const uint64_t rid); // 1=true; 0=false; -1=rid not found
         inline void clear() { _map.clear(); }
         inline bool empty() const { return _map.empty(); }
         inline uint32_t size() const { return uint32_t(_map.size()); }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enq_rec.cpp Mon Oct  7 18:39:24 2013
@@ -109,8 +109,8 @@ enq_rec::encode(void* wptr, uint32_t rec
     if (_xidp == 0)
         assert(_enq_hdr._xidsize == 0);
 
-    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+    std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+    std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE_BYTES;
     std::size_t wr_cnt = 0;
     if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
     {
@@ -182,8 +182,8 @@ enq_rec::encode(void* wptr, uint32_t rec
                 std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
                 wr_cnt += wsize;
 #ifdef RHM_CLEAN
-                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
-                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+                std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
+                std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE_BYTES;
                 std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
             }
@@ -238,7 +238,7 @@ enq_rec::encode(void* wptr, uint32_t rec
             std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
             wr_cnt += sizeof(_enq_tail);
 #ifdef RHM_CLEAN
-            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+            std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE_BYTES;
             std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
 #endif
         }
@@ -260,7 +260,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
         const uint32_t hdr_xid_data_tail_size = hdr_xid_data_size + sizeof(rec_tail_t);
         const uint32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
         const uint32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
-        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+        const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE_BYTES;
         const std::size_t offs = rec_offs - sizeof(enq_hdr_t);
 
         if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks)
@@ -331,7 +331,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                 std::memcpy((char*)_buff + offs, rptr, data_rem);
                 rd_cnt += data_rem;
             }
-            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+            const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
             if (tail_rem)
             {
                 std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem);
@@ -341,7 +341,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
         else
         {
             // Since xid and data are contiguous, both fit within current page - copy whole page
-            const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+            const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES);
             std::memcpy((char*)_buff + offs, rptr, data_cp_size);
             rd_cnt += data_cp_size;
         }
@@ -405,7 +405,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                             _enq_hdr._dsize);
                     rd_cnt += _enq_hdr._dsize;
                 }
-                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+                const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
                 if (tail_rem)
                 {
                     std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem);
@@ -422,7 +422,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
                 }
                 if (_enq_hdr._dsize && !::is_enq_external(&_enq_hdr))
                 {
-                    const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+                    const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
                     std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size);
                     rd_cnt += data_cp_size;
                 }
@@ -430,7 +430,7 @@ enq_rec::decode(rec_hdr_t& h, void* rptr
             else
             {
                 // Header fits within this page, xid split or separated
-                const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+                const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE_BYTES) - rd_cnt;
                 std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size);
                 rd_cnt += data_cp_size;
             }
@@ -516,7 +516,7 @@ enq_rec::rcv_decode(rec_hdr_t h, std::if
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE_BYTES - rec_size());
     chk_tail(); // Throws if tail invalid or record incomplete
     assert(!ifsp->fail() && !ifsp->bad());
     return true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/enums.h Mon Oct  7 18:39:24 2013
@@ -29,7 +29,7 @@ namespace qls_jrnl
 
     // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
     /**
-    * \brief Enumeration of possilbe return states from journal read and write operations.
+    * \brief Enumeration of possible return states from journal read and write operations.
     */
     enum _iores
     {
@@ -37,12 +37,12 @@ namespace qls_jrnl
         RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
         RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
         RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
-        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
-        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
-        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
+//        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
+//        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
+//        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
         RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
         RHM_IORES_TXPENDING,    ///< Operation blocked by pending transaction.
-        RHM_IORES_NOTIMPL       ///< Function is not yet implemented.
+        RHM_IORES_NOTIMPL       ///< Function is not implemented.
     };
     typedef _iores iores;
 
@@ -54,9 +54,9 @@ namespace qls_jrnl
             case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
             case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
             case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
-            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
-            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
-            case RHM_IORES_FULL: return "RHM_IORES_FULL";
+//            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
+//            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
+//            case RHM_IORES_FULL: return "RHM_IORES_FULL";
             case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
             case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
             case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h?rev=1530024&r1=1530023&r2=1530024&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jcfg.h Mon Oct  7 18:39:24 2013
@@ -22,70 +22,46 @@
 #ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
 #define QPID_LEGACYSTORE_JRNL_JCFG_H
 
-/*
-#if defined(__i386__)  little endian, 32 bits
-#define JRNL_LITTLE_ENDIAN
-#define JRNL_32_BIT
-#elif defined(__PPC__) || defined(__s390__)   big endian, 32 bits
-#define JRNL_BIG_ENDIAN
-#define JRNL_32_BIT
-#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__)  little endian, 64 bits
-#define JRNL_LITTLE_ENDIAN
-#define JRNL_64_BIT
-#elif defined(__powerpc64__) || defined(__s390x__)  big endian, 64 bits
-#define JRNL_BIG_ENDIAN
-#define JRNL_64_BIT
-#else
-#error endian?
-#endif
-*/
-
 /**
-* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that
+* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE_BYTES) MUST be a power of 2 AND
+* a power of 2 factor of the disk softblock size (JRNL_SBLK_SIZE_BYTES):
 * <pre>
-* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...)
+* n * JRNL_DBLK_SIZE_BYTES == JRNL_SBLK_SIZE_BYTES (n = 1,2,4,8...)
 * </pre>
-* (The disk softblock size is 512 for Linux kernels >= 2.6)
 */
-#define JRNL_DBLK_SIZE          128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
-#define JRNL_SBLK_SIZE_DBLKS    32          /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
-#define JRNL_SBLK_SIZE          JRNL_SBLK_SIZE_DBLKS * JRNL_DBLK_SIZE        /**< Disk softblock size in bytes */
-#define JRNL_SBLK_SIZE_KIB      JRNL_SBLK_SIZE / 1024 /**< Disk softblock size in KiB */
+#define JRNL_SBLK_SIZE_BYTES           4096        /**< Disk softblock size in bytes */
+#define QLS_AIO_ALIGN_BOUNDARY         JRNL_SBLK_SIZE_BYTES
+#define JRNL_DBLK_SIZE_BYTES           128         /**< Data block size in bytes (CANNOT BE LESS THAN 32!) */
+#define JRNL_SBLK_SIZE_DBLKS           (JRNL_SBLK_SIZE_BYTES / JRNL_DBLK_SIZE_BYTES)          /**< Disk softblock size in multiples of JRNL_DBLK_SIZE */
+#define JRNL_SBLK_SIZE_KIB             (JRNL_SBLK_SIZE_BYTES / 1024) /**< Disk softblock size in KiB */
 //#define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. file_hdr)
 //#define JRNL_MAX_FILE_SIZE      4194176     ///< Max. jrnl file size in sblks (excl. file_hdr)
 //#define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
 //#define JRNL_MAX_NUM_FILES      64          ///< Max. number of journal files
 //#define JRNL_ENQ_THRESHOLD      80          ///< Percent full when enqueue connection will be closed
 //
-#define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks
-#define JRNL_RMGR_PAGES         16          ///< Number of pages to use in wmgr
+//#define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks
+//#define JRNL_RMGR_PAGES         16          ///< Number of pages to use in wmgr
 //
-#define JRNL_WMGR_DEF_PAGE_SIZE 64          ///< Journal write page size in softblocks (default)
-#define JRNL_WMGR_DEF_PAGES     32          ///< Number of pages to use in wmgr (default)
+#define JRNL_WMGR_DEF_PAGE_SIZE_KIB    32
+#define JRNL_WMGR_DEF_PAGE_SIZE_SBLKS  (JRNL_WMGR_DEF_PAGE_SIZE_KIB / JRNL_SBLK_SIZE_KIB)          ///< Journal write page size in softblocks (default)
+#define JRNL_WMGR_DEF_PAGES            32          ///< Number of pages to use in wmgr (default)
 //
-#define JRNL_WMGR_MAXDTOKPP     1024        ///< Max. dtoks (data blocks) per page in wmgr
-#define JRNL_WMGR_MAXWAITUS     100         ///< Max. wait time (us) before submitting AIO
+#define JRNL_WMGR_MAXDTOKPP            1024        ///< Max. dtoks (data blocks) per page in wmgr
+#define JRNL_WMGR_MAXWAITUS            100         ///< Max. wait time (us) before submitting AIO
 //
 //#define JRNL_INFO_EXTENSION     "jinf"      ///< Extension for journal info files
 //#define JRNL_DATA_EXTENSION     "jdat"      ///< Extension for journal data files
-#define QLS_JRNL_FILE_EXTENSION ".jrnl"     /**< Extension for journal data files */
-//#define RHM_JDAT_TXA_MAGIC      0x614d4852  ///< ("RHMa" in little endian) Magic for dtx abort hdrs
-#define QLS_TXA_MAGIC           0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-//#define RHM_JDAT_TXC_MAGIC      0x634d4852  ///< ("RHMc" in little endian) Magic for dtx commit hdrs
-#define QLS_TXC_MAGIC           0x63534c51  ///< ("RHMc" in little endian) Magic for dtx commit hdrs
-//#define RHM_JDAT_DEQ_MAGIC      0x644d4852  ///< ("RHMd" in little endian) Magic for deq rec hdrs
-#define QLS_DEQ_MAGIC           0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
-//#define RHM_JDAT_ENQ_MAGIC      0x654d4852  ///< ("RHMe" in little endian) Magic for enq rec hdrs
-#define QLS_ENQ_MAGIC           0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
-//#define RHM_JDAT_FILE_MAGIC     0x664d4852  ///< ("RHMf" in little endian) Magic for file hdrs
-#define QLS_FILE_MAGIC          0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */
-//#define RHM_JDAT_EMPTY_MAGIC    0x784d4852  ///< ("RHMx" in little endian) Magic for empty dblk
-#define QLS_EMPTY_MAGIC         0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
-//#define RHM_JDAT_VERSION        0x01        ///< Version (of file layout)
-#define QLS_JRNL_VERSION        0x0002      /**< Version (of file layout) */
-#define QLS_JRNL_FHDRSIZESBLKS  0x0001      /**< Journal file header size in sblks (as defined by JRNL_SBLK_SIZE) */
-//#define RHM_CLEAN_CHAR          0xff        ///< Char used to clear empty space on disk
-#define QLS_CLEAN_CHAR          0xff        ///< Char used to clear empty space on disk
+#define QLS_JRNL_FILE_EXTENSION        ".jrnl"     /**< Extension for journal data files */
+#define QLS_TXA_MAGIC                  0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC                  0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_DEQ_MAGIC                  0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
+#define QLS_ENQ_MAGIC                  0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
+#define QLS_FILE_MAGIC                 0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */
+#define QLS_EMPTY_MAGIC                0x78534c51  /**< ("QLSx" in little endian) Magic for empty dblk */
+#define QLS_JRNL_VERSION               2           /**< Version (of file layout) */
+#define QLS_JRNL_FHDR_RES_SIZE_SBLKS   1           /**< Journal file header reserved size in sblks (as defined by JRNL_SBLK_SIZE_BYTES) */
+#define QLS_CLEAN_CHAR                 0xff        /**< Char used to clear empty space on disk */
 //
 //#define RHM_LENDIAN_FLAG 0      ///< Value of little endian flag on disk
 //#define RHM_BENDIAN_FLAG 1      ///< Value of big endian flag on disk



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message