qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1560618 [4/5] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.receiver/ qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.sender/ qpid/cpp/bindings/...
Date Thu, 23 Jan 2014 10:15:49 GMT
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan 23 10:15:46 2014
@@ -49,6 +49,18 @@ namespace qpid {
 namespace linearstore {
 namespace journal {
 
+RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) :
+                    recordId_(rid),
+                    fileId_(fid),
+                    fileOffset_(foffs),
+                    pendingTransaction_(ptxn)
+{}
+
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
+    return a.recordId_ < b.recordId_;
+}
+
 RecoveryManager::RecoveryManager(const std::string& journalDirectory,
                                  const std::string& queuename,
                                  enq_map& enqueueMapRef,
@@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(co
     if (!journalEmptyFlag_) {
 
         // Read all records, establish remaining enqueued records
+        if (inFileStream_.is_open()) {
+            inFileStream_.close();
+        }
         while (getNextRecordHeader()) {}
         if (inFileStream_.is_open()) {
             inFileStream_.close();
@@ -102,14 +117,13 @@ void RecoveryManager::analyzeJournals(co
                 std::vector<std::string>::const_iterator pitr =
                         std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
                 if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
-                    txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+                    txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
                     // Unlock any affected enqueues in emap
-                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+                    for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
                         if (i->enq_flag_) { // enq op - decrement enqueue count
                             fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
                         } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
-                            int16_t ret = enqueueMapRef_.unlock(i->drid_);
-                            if (ret < enq_map::EMAP_OK) { // fail
+                            if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
                                 // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
                                 std::ostringstream oss;
                                 oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
@@ -120,11 +134,7 @@ void RecoveryManager::analyzeJournals(co
                 }
             }
         }
-
-        // Set up recordIdList_ from enqueue map
-        enqueueMapRef_.rid_list(recordIdList_);
-
-        recordIdListConstItr_ = recordIdList_.begin();
+        prepareRecordList();
     }
 }
 
@@ -151,37 +161,44 @@ bool RecoveryManager::readNextRemainingR
                                               bool& transient,
                                               bool& external,
                                               data_tok* const dtokp,
-                                              bool /*ignore_pending_txns*/) {
-    if (recordIdListConstItr_ == recordIdList_.end()) {
-        return false;
-    }
-    enq_map::emap_data_struct_t eds;
-    enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
-    if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
-        getFile(eds._pfid, false);
-    }
-//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
+                                              bool ignore_pending_txns) {
+    bool foundRecord = false;
+    do {
+        if (recordIdListConstItr_ == recordIdList_.end()) {
+            return false;
+        }
+        if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
+            ++recordIdListConstItr_; // ignore, go to next record
+        } else {
+            foundRecord = true;
+        }
+    } while (!foundRecord);
 
-    inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+    if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+        if (!getFile(recordIdListConstItr_->fileId_, false)) {
+            std::ostringstream oss;
+            oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
+            throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+        }
+    }
+    inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
     if (!inFileStream_.good()) {
         std::ostringstream oss;
-        oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+        oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
         throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
     }
+
     ::enq_hdr_t enqueueHeader;
     inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
     if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
         std::ostringstream oss;
-        oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+        oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
         throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
     }
     // check flags
     transient = ::is_enq_transient(&enqueueHeader);
     external = ::is_enq_external(&enqueueHeader);
-//char magicBuff[5]; // DEBUG
-//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
-//magicBuff[4] = 0; // DEBUG
-//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
+
     // read xid
     xidSize = enqueueHeader._xidsize;
     *xidPtrPtr = ::malloc(xidSize);
@@ -386,6 +403,12 @@ void RecoveryManager::checkFileStreamOk(
 }
 
 void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+    if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
+        std::ostringstream oss;
+        oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
+        oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES);
+        throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment");
+    }
     std::streampos currentPosn = recordPosition;
     unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
     if (sblkOffset)
@@ -433,7 +456,7 @@ void RecoveryManager::checkJournalAlignm
 bool RecoveryManager::decodeRecord(jrec& record,
                                    std::size_t& cumulativeSizeRead,
                                    ::rec_hdr_t& headerRecord,
-                                    std::streampos& fileOffset)
+                                   std::streampos& fileOffset)
 {
     std::streampos start_file_offs = fileOffset;
 
@@ -455,7 +478,6 @@ bool RecoveryManager::decodeRecord(jrec&
         }
         if (!done && needNextFile()) {
             if (!getNextFile(false)) {
-                checkJournalAlignment(start_file_offs);
                 return false;
             }
         }
@@ -574,7 +596,7 @@ bool RecoveryManager::getNextRecordHeade
                             throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
                         }
                         std::string xid((char*)xidp, er.xid_size());
-                        transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
+                        transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false, false));
                         if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
                             std::ostringstream oss;
                             oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -613,7 +635,7 @@ bool RecoveryManager::getNextRecordHeade
                     }
                     std::string xid((char*)xidp, dr.xid_size());
                     transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
-                                                       false, dr.is_txn_coml_commit()));
+                                                       false, false, dr.is_txn_coml_commit()));
                     if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
                         std::ostringstream oss;
                         oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
@@ -645,8 +667,8 @@ bool RecoveryManager::getNextRecordHeade
                     throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) {
                         fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
                     } else {
@@ -673,8 +695,8 @@ bool RecoveryManager::getNextRecordHeade
                     throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
                 }
                 std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) { // txn enqueue
 //std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
                         if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
@@ -725,6 +747,35 @@ bool RecoveryManager::needNextFile() {
     return true;
 }
 
+void RecoveryManager::prepareRecordList() {
+    // Set up recordIdList_ from enqueue map and transaction map
+    recordIdList_.clear();
+
+    // Extract records from enqueue list
+    std::vector<uint64_t> ridList;
+    enqueueMapRef_.rid_list(ridList);
+    qpid::linearstore::journal::enq_map::emap_data_struct_t eds;
+    for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) {
+        enqueueMapRef_.get_data(*i, eds);
+        recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false));
+    }
+
+    // Extract records from pending transaction enqueues
+    std::vector<std::string> xidList;
+    transactionMapRef_.xid_list(xidList);
+    for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
+        qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
+        for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
+            if (k->enq_flag_) {
+                recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+            }
+        }
+    }
+
+    std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare);
+    recordIdListConstItr_ = recordIdList_.begin();
+}
+
 void RecoveryManager::readJournalData(char* target,
                                       const std::streamsize readSize) {
     std::streamoff bytesRead = 0;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h Thu Jan 23 10:15:46 2014
@@ -22,7 +22,6 @@
 #ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
 #define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
 
-#include <deque>
 #include <fstream>
 #include <map>
 #include "qpid/linearstore/journal/LinearFileController.h"
@@ -44,6 +43,16 @@ class JournalLog;
 class jrec;
 class txn_map;
 
+struct RecoveredRecordData_t {
+    uint64_t recordId_;
+    uint64_t fileId_;
+    std::streampos fileOffset_;
+    bool pendingTransaction_;
+    RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
+};
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
+
 class RecoveryManager
 {
 protected:
@@ -53,7 +62,7 @@ protected:
     typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
     typedef fileNumberMap_t::iterator fileNumberMapItr_t;
     typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
-    typedef std::vector<uint64_t> recordIdList_t;
+    typedef std::vector<RecoveredRecordData_t> recordIdList_t;
     typedef recordIdList_t::const_iterator recordIdListConstItr_t;
 
     // Location and identity
@@ -123,6 +132,7 @@ protected:
     bool getNextFile(bool jumpToFirstRecordOffsetFlag);
     bool getNextRecordHeader();
     bool needNextFile();
+    void prepareRecordList();
     bool readFileHeader();
     void readJournalData(char* target, const std::streamsize size);
     void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan 23 10:15:46 2014
@@ -43,8 +43,8 @@
 #define QLS_WMGR_MAXWAITUS              100         /**< Max. wait time (us) before submitting AIO */
 
 #define QLS_JRNL_FILE_EXTENSION         ".jrnl"     /**< Extension for journal data files */
-#define QLS_TXA_MAGIC                   0x61534c51  /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC                   0x63534c51  /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_TXA_MAGIC                   0x61534c51  /**< ("QLSa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC                   0x63534c51  /**< ("QLSc" in little endian) Magic for dtx commit hdrs */
 #define QLS_DEQ_MAGIC                   0x64534c51  /**< ("QLSd" in little endian) Magic for deq rec hdrs */
 #define QLS_ENQ_MAGIC                   0x65534c51  /**< ("QLSe" in little endian) Magic for enq rec hdrs */
 #define QLS_FILE_MAGIC                  0x66534c51  /**< ("QLSf" in little endian) Magic for file hdrs */

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp Thu Jan 23 10:15:46 2014
@@ -155,7 +155,7 @@ jcntl::enqueue_data_record(const void* c
     check_wstatus("enqueue_data_record");
     {
         slock s(_wr_mutex);
-        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r,
                         dtokp)) ;
     }
     return r;
@@ -170,7 +170,7 @@ jcntl::enqueue_extern_data_record(const 
     check_wstatus("enqueue_extern_data_record");
     {
         slock s(_wr_mutex);
-        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ;
     }
     return r;
 }
@@ -181,6 +181,7 @@ jcntl::enqueue_txn_data_record(const voi
                                const std::size_t this_data_len,
                                data_tok* dtokp,
                                const std::string& xid,
+                               const bool tpc_flag,
                                const bool transient)
 {
     iores r;
@@ -188,7 +189,7 @@ jcntl::enqueue_txn_data_record(const voi
     {
         slock s(_wr_mutex);
         while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
-                        transient, false), r, dtokp)) ;
+                        tpc_flag, transient, false), r, dtokp)) ;
     }
     return r;
 }
@@ -197,14 +198,15 @@ iores
 jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
                                       data_tok* dtokp,
                                       const std::string& xid,
+                                      const bool tpc_flag,
                                       const bool transient)
 {
     iores r;
     check_wstatus("enqueue_extern_txn_data_record");
     {
         slock s(_wr_mutex);
-        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
-                        dtokp)) ;
+        while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient,
+                        true), r, dtokp)) ;
     }
     return r;
 }
@@ -234,7 +236,7 @@ jcntl::dequeue_data_record(data_tok* con
     check_wstatus("dequeue_data");
     {
         slock s(_wr_mutex);
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ;
     }
     return r;
 }
@@ -242,13 +244,14 @@ jcntl::dequeue_data_record(data_tok* con
 iores
 jcntl::dequeue_txn_data_record(data_tok* const dtokp,
                                const std::string& xid,
+                               const bool tpc_flag,
                                const bool txn_coml_commit)
 {
     iores r;
     check_wstatus("dequeue_data");
     {
         slock s(_wr_mutex);
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ;
     }
     return r;
 }
@@ -380,7 +383,7 @@ jcntl::handle_aio_wait(const iores res, 
         while (_wmgr.curr_pg_blocked())
         {
             if (_wmgr.get_aio_evt_rem() == 0) {
-std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+//std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
                 throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
             }
             if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h Thu Jan 23 10:15:46 2014
@@ -270,11 +270,11 @@ public:
                               const std::size_t tot_data_len,
                               const std::size_t this_data_len,
                               data_tok* dtokp,
-                              const bool transient = false);
+                              const bool transient);
 
     iores enqueue_extern_data_record(const std::size_t tot_data_len,
                                      data_tok* dtokp,
-                                     const bool transient = false);
+                                     const bool transient);
 
     /**
     * \brief Enqueue data.
@@ -294,12 +294,14 @@ public:
                                   const std::size_t this_data_len,
                                   data_tok* dtokp,
                                   const std::string& xid,
-                                  const bool transient = false);
+                                  const bool tpc_flag,
+                                  const bool transient);
 
     iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
                                          data_tok* dtokp,
                                          const std::string& xid,
-                                         const bool transient = false);
+                                         const bool tpc_flag,
+                                         const bool transient);
 
     /**
     * \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -350,7 +352,7 @@ public:
                            bool& transient,
                            bool& external,
                            data_tok* const dtokp,
-                           bool ignore_pending_txns = false);
+                           bool ignore_pending_txns);
 
     /**
     * \brief Dequeues (marks as no longer needed) data record in journal.
@@ -370,7 +372,7 @@ public:
     * \exception TODO
     */
     iores dequeue_data_record(data_tok* const dtokp,
-                              const bool txn_coml_commit = false);
+                              const bool txn_coml_commit);
 
     /**
     * \brief Dequeues (marks as no longer needed) data record in journal.
@@ -393,7 +395,8 @@ public:
     */
     iores dequeue_txn_data_record(data_tok* const dtokp,
                                   const std::string& xid,
-                                  const bool txn_coml_commit = false);
+                                  const bool tpc_flag,
+                                  const bool txn_coml_commit);
 
     /**
     * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -458,12 +461,12 @@ public:
     * \param block_till_aio_cmpl If true, will block the thread while waiting for all
     *     outstanding AIO operations to complete.
     */
-    void stop(const bool block_till_aio_cmpl = false);
+    void stop(const bool block_till_aio_cmpl);
 
     /**
     * \brief Force a flush of the write page cache, creating a single AIO write operation.
     */
-    iores flush(const bool block_till_aio_cmpl = false);
+    iores flush(const bool block_till_aio_cmpl);
 
     inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
 
@@ -480,7 +483,7 @@ public:
     *     false if the rid is transactionally enqueued and is not committed, or if it is
     *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
     */
-    inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+    inline bool is_enqueued(const uint64_t rid, bool ignore_lock) { return _emap.is_enqueued(rid, ignore_lock); }
 
     inline bool is_locked(const uint64_t rid) {
         if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Thu Jan 23 10:15:46 2014
@@ -92,6 +92,8 @@ const uint32_t jerrno::JERR_RCVM_STREAMB
 const uint32_t jerrno::JERR_RCVM_READ            = 0x0902;           ///< Read error: no or insufficient data to read
 const uint32_t jerrno::JERR_RCVM_WRITE           = 0x0903;          ///< Write error
 const uint32_t jerrno::JERR_RCVM_NULLXID         = 0x0904;        ///< Null XID when XID length non-null in header
+const uint32_t jerrno::JERR_RCVM_NOTDBLKALIGNED  = 0x0905; ///< Offset is not data block (dblk)-aligned
+
 
 // class data_tok
 const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE    = 0x0a00;
@@ -182,6 +184,7 @@ jerrno::__init()
     _err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read";
     _err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
     _err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header";
+    _err_map[JERR_RCVM_NOTDBLKALIGNED] = "JERR_RCVM_NOTDBLKALIGNED: Offset is not data block (dblk)-aligned";
 
     // class data_tok
     _err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h Thu Jan 23 10:15:46 2014
@@ -110,6 +110,7 @@ namespace journal {
         static const uint32_t JERR_RCVM_READ;           ///< Read error: no or insufficient data to read
         static const uint32_t JERR_RCVM_WRITE;          ///< Write error
         static const uint32_t JERR_RCVM_NULLXID;        ///< Null XID when XID length non-null in header
+        static const uint32_t JERR_RCVM_NOTDBLKALIGNED; ///< Offset is not data block (dblk)-aligned
 
         // class data_tok
         static const uint32_t JERR_DTOK_ILLEGALSTATE;   ///< Attempted to change to illegal state

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp Thu Jan 23 10:15:46 2014
@@ -39,16 +39,50 @@ txn_data_t::txn_data_t(const uint64_t ri
                        const uint16_t pfid,
                        const uint64_t foffs,
                        const bool enq_flag,
+                       const bool tpc_flag,
                        const bool commit_flag):
         rid_(rid),
         drid_(drid),
         pfid_(pfid),
         foffs_(foffs),
         enq_flag_(enq_flag),
+        tpc_flag_(tpc_flag),
         commit_flag_(commit_flag),
         aio_compl_(false)
 {}
 
+txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) :
+        enqCnt(0U),
+        deqCnt(0U),
+        tpcCnt(0U),
+        abortCnt(0U),
+        commitCnt(0U),
+        rid(0ULL)
+{
+    for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) {
+        if (i->enq_flag_) {
+            ++enqCnt;
+            rid = i->rid_;
+        } else {
+            ++deqCnt;
+            if (i->commit_flag_) {
+                ++commitCnt;
+            } else {
+                ++abortCnt;
+            }
+        }
+        if (i->tpc_flag_) {
+            ++tpcCnt;
+        }
+    }
+    if (tpcCnt > 0 && tpcCnt != tdl.size()) {
+        throw jexception("Inconsistent 2PC count"); // TODO: complete exception details
+    }
+    if (abortCnt > 0 && commitCnt > 0) {
+        throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details
+    }
+}
+
 txn_map::txn_map():
         _map()/*,
         _pfid_txn_cnt()*/
@@ -64,7 +98,7 @@ txn_map::insert_txn_data(const std::stri
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
-        txn_data_list list;
+        txn_data_list_t list;
         list.push_back(td);
         std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
         if (!ret.second) // duplicate
@@ -75,14 +109,14 @@ txn_map::insert_txn_data(const std::stri
     return ok;
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_tdata_list(const std::string& xid)
 {
     slock s(_mutex);
     return get_tdata_list_nolock(xid);
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_tdata_list_nolock(const std::string& xid)
 {
     xmap_itr itr = _map.find(xid);
@@ -91,14 +125,14 @@ txn_map::get_tdata_list_nolock(const std
     return itr->second;
 }
 
-const txn_data_list
+const txn_data_list_t
 txn_map::get_remove_tdata_list(const std::string& xid)
 {
     slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
         return _empty_data_list;
-    txn_data_list list = itr->second;
+    txn_data_list_t list = itr->second;
     _map.erase(itr);
     return list;
 }
@@ -130,7 +164,7 @@ txn_map::cnt(const bool enq_flag)
     uint32_t c = 0;
     for (xmap_itr i = _map.begin(); i != _map.end(); i++)
     {
-        for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+        for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++)
         {
             if (j->enq_flag_ == enq_flag)
                 c++;
@@ -147,7 +181,7 @@ txn_map::is_txn_synced(const std::string
     if (itr == _map.end()) // not found in map
         return TMAP_XID_NOT_FOUND;
     bool is_synced = true;
-    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
         if (!litr->aio_compl_)
         {
@@ -165,7 +199,7 @@ txn_map::set_aio_compl(const std::string
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // xid not found in map
         return TMAP_XID_NOT_FOUND;
-    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+    for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
         if (litr->rid_ == rid)
         {
@@ -183,8 +217,8 @@ txn_map::data_exists(const std::string& 
     bool found = false;
     {
         slock s(_mutex);
-        txn_data_list tdl = get_tdata_list_nolock(xid);
-        tdl_itr itr = tdl.begin();
+        txn_data_list_t tdl = get_tdata_list_nolock(xid);
+        tdl_itr_t itr = tdl.begin();
         while (itr != tdl.end() && !found)
         {
             found = itr->rid_ == rid;
@@ -202,8 +236,8 @@ txn_map::is_enq(const uint64_t rid)
         slock s(_mutex);
         for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
         {
-            txn_data_list list = i->second;
-            for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+            txn_data_list_t list = i->second;
+            for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++)
             {
                 if (j->enq_flag_)
                     found = j->rid_ == rid;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h Thu Jan 23 10:15:46 2014
@@ -42,17 +42,31 @@ namespace journal {
         uint16_t pfid_;     ///< Physical file id, to be used when transferring to emap on commit
         uint64_t foffs_;    ///< Offset in file for this record
         bool enq_flag_;     ///< If true, enq op, otherwise deq op
-        bool commit_flag_;  ///< (2PC transactions) Records 2PC complete c/a mode
+        bool tpc_flag_;     ///< 2PC transaction if true
+        bool commit_flag_;  ///< TPL only: (2PC transactions) Records 2PC complete c/a mode
         bool aio_compl_;    ///< Initially false, set to true when record AIO returns
         txn_data_t(const uint64_t rid,
                    const uint64_t drid,
                    const uint16_t pfid,
                    const uint64_t foffs,
                    const bool enq_flag,
-                   const bool commit_flag = false);
+                   const bool tpc_flag,
+                   const bool commit_flag);
     } txn_data_t;
-    typedef std::vector<txn_data_t> txn_data_list;
-    typedef txn_data_list::iterator tdl_itr;
+    typedef std::vector<txn_data_t> txn_data_list_t;
+    typedef txn_data_list_t::iterator tdl_itr_t;
+    typedef txn_data_list_t::const_iterator tdl_const_itr_t;
+
+    typedef struct txn_op_stats_t
+    {
+        uint16_t enqCnt;
+        uint16_t deqCnt;
+        uint16_t tpcCnt;
+        uint16_t abortCnt;
+        uint16_t commitCnt;
+        uint64_t rid;
+        txn_op_stats_t(const txn_data_list_t& tdl);
+    } txn_op_stats_t;
 
     /**
     * \class txn_map
@@ -100,21 +114,21 @@ namespace journal {
         static int16_t TMAP_SYNCED;
 
     private:
-        typedef std::pair<std::string, txn_data_list> xmap_param;
-        typedef std::map<std::string, txn_data_list> xmap;
+        typedef std::pair<std::string, txn_data_list_t> xmap_param;
+        typedef std::map<std::string, txn_data_list_t> xmap;
         typedef xmap::iterator xmap_itr;
 
         xmap _map;
         smutex _mutex;
-        const txn_data_list _empty_data_list;
+        const txn_data_list_t _empty_data_list;
 
     public:
         txn_map();
         virtual ~txn_map();
 
         bool insert_txn_data(const std::string& xid, const txn_data_t& td);
-        const txn_data_list get_tdata_list(const std::string& xid);
-        const txn_data_list get_remove_tdata_list(const std::string& xid);
+        const txn_data_list_t get_tdata_list(const std::string& xid);
+        const txn_data_list_t get_remove_tdata_list(const std::string& xid);
         bool in_map(const std::string& xid);
         uint32_t enq_cnt();
         uint32_t deq_cnt();
@@ -128,7 +142,7 @@ namespace journal {
         void xid_list(std::vector<std::string>& xv);
     private:
         uint32_t cnt(const bool enq_flag);
-        const txn_data_list get_tdata_list_nolock(const std::string& xid);
+        const txn_data_list_t get_tdata_list_nolock(const std::string& xid);
     };
 
 }}}

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Thu Jan 23 10:15:46 2014
@@ -220,11 +220,9 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
         }
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
-    if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
-    }
     assert(!ifsp->fail() && !ifsp->bad());
     assert(_txn_hdr._xidsize > 0);
+
     Checksum checksum;
     checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
     checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c Thu Jan 23 10:15:46 2014
@@ -41,6 +41,6 @@ bool is_txn_coml_commit(const deq_hdr_t 
 }
 
 void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
-    dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
-                                dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+    dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : // set flag bit
+                                dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); // unset flag bit
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan 23 10:15:46 2014
@@ -75,6 +75,7 @@ void file_hdr_copy(file_hdr_t* dest, con
 
 void file_hdr_reset(file_hdr_t* target) {
     target->_rhdr._uflag = 0;
+    target->_rhdr._serial = 0;
     target->_rhdr._rid = 0;
     target->_fro = 0;
     target->_ts_sec = 0;
@@ -85,6 +86,7 @@ void file_hdr_reset(file_hdr_t* target) 
 
 int is_file_hdr_reset(file_hdr_t* target) {
     return target->_rhdr._uflag == 0 &&
+           target->_rhdr._serial == 0 &&
            target->_rhdr._rid == 0 &&
            target->_ts_sec == 0 &&
            target->_ts_nsec == 0 &&

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h Thu Jan 23 10:15:46 2014
@@ -51,7 +51,7 @@ extern "C"{
  * +---+---+---+---+---+---+---+---+   |
  * |              rid              |   |
  * +---+---+---+---+---+---+---+---+  -+
- * |  fs   | partn |   reserved    |
+ * |  fhs  | partn |   reserved    |
  * +---+---+---+---+---+---+---+---+
  * |           data-size           |
  * +---+---+---+---+---+---+---+---+
@@ -70,7 +70,7 @@ extern "C"{
  *
  * ver = Journal version
  * rid = Record ID
- * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * fhs = File header size in sblks (defined by JRNL_SBLK_SIZE)
  * partn = EFP partition from which this file came
  * fro = First Record Offset
  * qnl = Length of the queue name in octets.

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c Thu Jan 23 10:15:46 2014
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 #include "rec_hdr.h"
 
 void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid) {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp Thu Jan 23 10:15:46 2014
@@ -108,6 +108,7 @@ wmgr::enqueue(const void* const data_buf
               data_tok* dtokp,
               const void* const xid_ptr,
               const std::size_t xid_len,
+              const bool tpc_flag,
               const bool transient,
               const bool external)
 {
@@ -196,7 +197,7 @@ wmgr::enqueue(const void* const data_buf
             if (xid_len) // If part of transaction, add to transaction map
             {
                 std::string xid((const char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true));
+                _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true, tpc_flag, false));
             }
             else
             {
@@ -228,6 +229,7 @@ iores
 wmgr::dequeue(data_tok* dtokp,
               const void* const xid_ptr,
               const std::size_t xid_len,
+              const bool tpc_flag,
               const bool txn_coml_commit)
 {
     if (xid_len)
@@ -312,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp,
                 // If the enqueue is part of a pending txn, it will not yet be in emap
                 _emap.lock(dequeue_rid); // ignore rid not found error
                 std::string xid((const char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false));
+                _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false, tpc_flag, false));
             }
             else
             {
@@ -424,8 +426,8 @@ wmgr::abort(data_tok* dtokp,
 
             // Delete this txn from tmap, unlock any locked records in emap
             std::string xid((const char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
 				if (!itr->enq_flag_)
 				    _emap.unlock(itr->drid_); // ignore rid not found error
@@ -523,8 +525,8 @@ wmgr::commit(data_tok* dtokp,
 
             // Delete this txn from tmap, process records into emap
             std::string xid((const char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+            for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 if (itr->enq_flag_) // txn enqueue
                 {
@@ -621,8 +623,8 @@ wmgr::flush_check(iores& res,
         }
 
         // If file is full, rotate to next file
-        uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
-        if (_pg_cntr >= fileSize_pgs)
+        uint32_t dataSize_pgs = _lfc.dataSize_sblks() / _cache_pgsize_sblks;
+        if (_pg_cntr >= dataSize_pgs)
         {
 //std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush;
             get_next_file();
@@ -638,8 +640,8 @@ iores
 wmgr::flush()
 {
     iores res = write_flush();
-    uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
-    if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) {
+    uint32_t dataSize_pgs = _lfc.dataSize_sblks() / _cache_pgsize_sblks;
+    if (res == RHM_IORES_SUCCESS && _pg_cntr >= dataSize_pgs) {
         get_next_file();
     }
     return res;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h Thu Jan 23 10:15:46 2014
@@ -97,11 +97,13 @@ public:
                   data_tok* dtokp,
                   const void* const xid_ptr,
                   const std::size_t xid_len,
+                  const bool tpc_flag,
                   const bool transient,
                   const bool external);
     iores dequeue(data_tok* dtokp,
                   const void* const xid_ptr,
                   const std::size_t xid_len,
+                  const bool tpc_flag,
                   const bool txn_coml_commit);
     iores abort(data_tok* dtokp,
                 const void* const xid_ptr,

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp Thu Jan 23 10:15:46 2014
@@ -68,7 +68,7 @@ Logger::Logger() : flags(0) {
     // Initialize myself from env variables so all programs
     // (e.g. tests) can use logging even if they don't parse
     // command line args.
-    Options opts("");
+    Options opts;
     opts.parse(0, 0);
     configure(opts);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp Thu Jan 23 10:15:46 2014
@@ -27,8 +27,6 @@
 namespace qpid {
 namespace log {
 
-using namespace std;
-
 Options::Options(const std::string& argv0_, const std::string& name_) :
     qpid::Options(name_),
     argv0(argv0_),
@@ -45,25 +43,15 @@ Options::Options(const std::string& argv
 {
     selectors.push_back("notice+");
 
-    ostringstream levels;
-    levels << LevelTraits::name(Level(0));
-    for (int i = 1; i < LevelTraits::COUNT; ++i)
-        levels << " " << LevelTraits::name(Level(i));
-
-    ostringstream categories;
-    categories << CategoryTraits::name(Category(0));
-    for (int i = 1; i < CategoryTraits::COUNT; ++i)
-        categories << " " << CategoryTraits::name(Category(i));
-
     addOptions()
         ("trace,t", optValue(trace), "Enables all logging" )
         ("log-enable", optValue(selectors, "RULE"),
          ("Enables logging for selected levels and components. "
           "RULE is in the form 'LEVEL[+-][:PATTERN]'\n"
-          "LEVEL is one of: \n\t "+levels.str()+"\n"
+          "LEVEL is one of: \n\t "+getLevels()+"\n"
           "PATTERN is a logging category name, or a namespace-qualified "
           "function name or name fragment. "
-          "Logging category names are: \n\t "+categories.str()+"\n"
+          "Logging category names are: \n\t "+getCategories()+"\n"
           "For example:\n"
           "\t'--log-enable warning+'\n"
           "logs all warning, error and critical messages.\n"
@@ -75,10 +63,10 @@ Options::Options(const std::string& argv
         ("log-disable", optValue(deselectors, "RULE"),
          ("Disables logging for selected levels and components. "
           "RULE is in the form 'LEVEL[+-][:PATTERN]'\n"
-          "LEVEL is one of: \n\t "+levels.str()+"\n"
+          "LEVEL is one of: \n\t "+getLevels()+"\n"
           "PATTERN is a logging category name, or a namespace-qualified "
           "function name or name fragment. "
-          "Logging category names are: \n\t "+categories.str()+"\n"
+          "Logging category names are: \n\t "+getCategories()+"\n"
           "For example:\n"
           "\t'--log-disable warning-'\n"
           "disables logging all warning, notice, info, debug, and trace messages.\n"
@@ -139,4 +127,22 @@ Options& Options::operator=(const Option
     return *this;
 }
 
+std::string getLevels()
+{
+    std::ostringstream levels;
+    levels << LevelTraits::name(Level(0));
+    for (int i = 1; i < LevelTraits::COUNT; ++i)
+        levels << " " << LevelTraits::name(Level(i));
+    return levels.str();
+}
+
+std::string getCategories()
+{
+    std::ostringstream categories;
+    categories << CategoryTraits::name(Category(0));
+    for (int i = 1; i < CategoryTraits::COUNT; ++i)
+        categories << " " << CategoryTraits::name(Category(i));
+    return categories.str();
+}
+
 }} // namespace qpid::log

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h Thu Jan 23 10:15:46 2014
@@ -46,6 +46,12 @@ struct Options : public qpid::Options {
     std::auto_ptr<SinkOptions> sinkOptions;
 };
 
+/** Get a string list of the allowed levels */
+QPID_COMMON_EXTERN std::string getLevels();
+
+/** Get a string list of the allowed categories */
+QPID_COMMON_EXTERN std::string getCategories();
+
 }} // namespace qpid::log
 
 #endif  /*!QPID_LOG_OPTIONS_H*/

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp Thu Jan 23 10:15:46 2014
@@ -220,6 +220,8 @@ bool Selector::isDisabled(Level level, c
 // level/function/category set from an actual QPID_LOG Statement.
 //
 bool Selector::isEnabled(Level level, const char* function, Category category) {
+    if (level==critical)
+        return true;                    // critical cannot be disabled
     if (isDisabled(level, function))
         return false;                   // Disabled by function name
     if (disableFlags[level][category])

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp Thu Jan 23 10:15:46 2014
@@ -200,7 +200,7 @@ const char* names[LevelTraits::COUNT] = 
 
 const char* catNames[CategoryTraits::COUNT] = {
     "Security", "Broker", "Management", "Protocol", "System", "HA", "Messaging",
-    "Store", "Network", "Test", "Client", "Model", "Unspecified"
+    "Store", "Network", "Test", "Client", "Application", "Model", "Unspecified"
 };
 
 } // namespace
@@ -235,4 +235,5 @@ Category CategoryTraits::category(const 
 const char* CategoryTraits::name(Category c) {
     return catNames[c];
 }
+
 }} // namespace qpid::log

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h Thu Jan 23 10:15:46 2014
@@ -71,11 +71,12 @@ struct LevelTraits {
  * Store       store
  * Network     tcp rdma AsynchIO socket epoll
  * Test
+ * External_application <no directory - signifies log message from non qpid application code>
  * Model       <not related to a directory>
  * Unspecified <must be last in enum>
  */
 enum Category { security, broker, management, protocol, system, ha, messaging,
-    store, network, test, client, model, unspecified };
+    store, network, test, client, external_application, model, unspecified };
 struct CategoryTraits {
     static const int COUNT=unspecified+1;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Jan 23 10:15:46 2014
@@ -47,6 +47,18 @@ namespace qpid {
 namespace messaging {
 namespace amqp {
 namespace {
+
+std::string asString(const std::vector<std::string>& v) {
+    std::stringstream os;
+    os << "[";
+    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+        if (i != v.begin()) os << ", ";
+        os << '"' << *i << '"';
+    }
+    os << "]";
+    return os.str();
+}
+
 //remove conditional when 0.5 is no longer supported
 #ifdef HAVE_PROTON_TRACER
 void do_trace(pn_transport_t* transport, const char* message)
@@ -437,27 +449,33 @@ void ConnectionContext::reset()
     pn_transport_bind(engine, connection);
 }
 
-void ConnectionContext::check()
-{
-    if (state == DISCONNECTED) {
+void ConnectionContext::check() {
+    if (checkDisconnected()) {
         if (ConnectionOptions::reconnect) {
-            reset();
             autoconnect();
         } else {
             throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
         }
     }
-    if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
-        pn_condition_t* error = pn_connection_remote_condition(connection);
-        std::stringstream text;
-        if (pn_condition_is_set(error)) {
-            text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
-        } else {
-            text << "Connection closed by peer";
+}
+
+bool ConnectionContext::checkDisconnected() {
+    if (state == DISCONNECTED) {
+        reset();
+    } else {
+        if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+            pn_condition_t* error = pn_connection_remote_condition(connection);
+            std::stringstream text;
+            if (pn_condition_is_set(error)) {
+                text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+            } else {
+                text << "Connection closed by peer";
+            }
+            pn_connection_close(connection);
+            throw qpid::messaging::ConnectionError(text.str());
         }
-        pn_connection_close(connection);
-        throw qpid::messaging::ConnectionError(text.str());
     }
+    return state == DISCONNECTED;
 }
 
 void ConnectionContext::wait()
@@ -843,16 +861,6 @@ void ConnectionContext::open()
 
 
 namespace {
-std::string asString(const std::vector<std::string>& v) {
-    std::stringstream os;
-    os << "[";
-    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
-        if (i != v.begin()) os << ", ";
-        os << *i;
-    }
-    os << "]";
-    return os.str();
-}
 double FOREVER(std::numeric_limits<double>::max());
 bool expired(const sys::AbsTime& start, double timeout)
 {
@@ -894,6 +902,7 @@ bool ConnectionContext::tryConnect()
             if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
                 return true;
             }
+            QPID_LOG(info, "Failed to connect to " << *i);
         } catch (const qpid::messaging::TransportFailure& e) {
             QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
         }
@@ -923,6 +932,13 @@ void ConnectionContext::reconnect()
     }
 }
 
+void ConnectionContext::waitNoReconnect() {
+    if (!checkDisconnected()) {
+        lock.wait();
+        checkDisconnected();
+    }
+}
+
 bool ConnectionContext::tryConnect(const Url& url)
 {
     if (url.getUser().size()) username = url.getUser();
@@ -934,10 +950,11 @@ bool ConnectionContext::tryConnect(const
             setCurrentUrl(*i);
             if (sasl.get()) {
                 wakeupDriver();
-                while (!sasl->authenticated()) {
+                while (!sasl->authenticated() && state != DISCONNECTED) {
                     QPID_LOG(debug, id << " Waiting to be authenticated...");
-                    wait();
+                    waitNoReconnect();
                 }
+                if (state == DISCONNECTED) continue;
                 QPID_LOG(debug, id << " Authenticated");
             }
 
@@ -945,9 +962,10 @@ bool ConnectionContext::tryConnect(const
             setProperties();
             pn_connection_open(connection);
             wakeupDriver(); //want to write
-            while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
-                wait();
-            }
+            while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
+                   state != DISCONNECTED)
+                waitNoReconnect();
+            if (state == DISCONNECTED) continue;
             if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
                 throw qpid::messaging::ConnectionError("Failed to open connection");
             }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Thu Jan 23 10:15:46 2014
@@ -150,6 +150,8 @@ class ConnectionContext : public qpid::s
     CodecAdapter codecAdapter;
 
     void check();
+    bool checkDisconnected();
+    void waitNoReconnect();
     void wait();
     void waitUntil(qpid::sys::AbsTime until);
     void wait(boost::shared_ptr<SessionContext>);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Thu Jan 23 10:15:46 2014
@@ -145,7 +145,7 @@ void EncodedMessage::populate(qpid::type
             map["x-amqp-group-id"] = groupId.str();
         }
         if (!!groupSequence) {
-            map["x-amqp-qroup-sequence"] = groupSequence.get();
+            map["x-amqp-group-sequence"] = groupSequence.get();
         }
         if (replyToGroupId) {
             map["x-amqp-reply-to-group-id"] = replyToGroupId.str();

Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests:r1549895-1558036

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt Thu Jan 23 10:15:46 2014
@@ -42,6 +42,11 @@ macro(remember_location testname)
   set (${testname}_LOCATION ${CMAKE_CURRENT_BINARY_DIR}/${testname}${CMAKE_EXECUTABLE_SUFFIX})
 endmacro(remember_location)
 
+# If we're using GCC allow variadic macros (even though they're c99 not c++01)
+if (CMAKE_COMPILER_IS_GNUCXX)
+  add_definitions(-Wno-variadic-macros)
+endif (CMAKE_COMPILER_IS_GNUCXX)
+
 # Windows uses some process-startup calls to ensure that errors, etc. don't
 # result in error boxes being thrown up. Since it's expected that most test
 # runs will be in scripts, the default is to force these outputs to stderr
@@ -175,6 +180,7 @@ set(all_unit_tests
     ManagementTest
     MessageReplayTracker
     MessageTest
+    MessagingLogger
     MessagingSessionTests
     PollableCondition
     ProxyTest
@@ -294,16 +300,12 @@ if (BUILD_SASL)
     remember_location(sasl_version)
 endif (BUILD_SASL)
 
-# This should ideally be done as part of the test run, but I don't know a way
-# to get these arguments and the working directory set like Makefile.am does,
-# and have that run during the test pass.
-#
-# Need to check to see that the python tools are included as part of the source
-# tree first and don't install them or run dependent tests if they are not there
-#
+# Always run the python install, setup.py is smart enough to do only what is needed.
 set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python)
-execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
-                WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python)
+set (python_src ${CMAKE_SOURCE_DIR}/../python)
+add_custom_target(python_bld ALL
+  COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${python_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
+  WORKING_DIRECTORY ${python_src})
 
 if (BUILD_SASL)
     add_test (sasl_fed ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/sasl_fed${test_script_suffix})

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py Thu Jan 23 10:15:46 2014
@@ -253,7 +253,7 @@ class Broker(Popen):
 
         self.test = test
         self._port=port
-        if BrokerTest.store_lib:
+        if BrokerTest.store_lib and not test_store:
             args = args + ['--load-module', BrokerTest.store_lib]
             if BrokerTest.sql_store_lib:
                 args = args + ['--load-module', BrokerTest.sql_store_lib]

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null Thu Jan 23 10:15:46 2014
@@ -1 +1,21 @@
-# empty config
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Deliberately empty configuration file for tests.
+

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 #The following four tests fail the because pure python client excludes
 #the node type for queues from the reply-to address, weheras the swigged
 #client does not (as that prevents it resolving the node on every send)

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests Thu Jan 23 10:15:46 2014
@@ -1,2 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers
 qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py Thu Jan 23 10:15:46 2014
@@ -233,6 +233,33 @@ class ReplicationTests(HaBrokerTest):
             c.close()
         finally: l.restore()
 
+
+    def test_heartbeat_python(self):
+        """Verify that a python client with a heartbeat specified disconnects
+        from a stalled broker and does not hang indefinitely."""
+
+        broker = Broker(self)
+        broker_addr = broker.host_port()
+
+        # Case 1: Connect before stalling the broker, use the connection after stalling.
+        c = Connection(broker_addr, heartbeat=1)
+        c.open()
+        os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
+        self.assertRaises(ConnectionError, c.session().sender, "foo")
+
+        # Case 2: Connect to a stalled broker
+        c = Connection(broker_addr, heartbeat=1)
+        self.assertRaises(ConnectionError, c.open)
+
+        # Case 3: Re-connect to a stalled broker.
+        broker2 = Broker(self)
+        c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
+                       reconnect=True, reconnect_urls=[broker_addr],
+                       reconnect_log=False) # Hide expected warnings
+        c.open()
+        broker2.kill()          # Cause re-connection to broker
+        self.assertRaises(ConnectionError, c.session().sender, "foo")
+
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
         cluster = HaCluster(self, 2)
@@ -253,8 +280,6 @@ class ReplicationTests(HaBrokerTest):
         """Verify that a backup broker fails over and recovers queue state"""
         brokers = HaCluster(self, 3)
         brokers[0].connect().session().sender("q;{create:always}").send("a")
-        for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
-        brokers[0].expect = EXPECT_EXIT_FAIL
         brokers.kill(0)
         brokers[1].connect().session().sender("q").send("b")
         brokers[2].assert_browse_backup("q", ["a","b"])
@@ -263,6 +288,13 @@ class ReplicationTests(HaBrokerTest):
         s.acknowledge()
         brokers[2].assert_browse_backup("q", ["b"])
 
+    def test_empty_backup_failover(self):
+        """Verify that a new primary becomes active with no queues.
+        Regression test for QPID-5430"""
+        brokers = HaCluster(self, 3)
+        brokers.kill(0)
+        brokers[1].wait_status("active")
+
     def test_qpid_config_replication(self):
         """Set up replication via qpid-config"""
         brokers = HaCluster(self,2)
@@ -272,33 +304,34 @@ class ReplicationTests(HaBrokerTest):
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
-        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
-        try:
-            primary = HaBroker(self, name="primary", ha_cluster=False,
-                               args=["--ha-queue-replication=yes"]);
-            pc = primary.connect()
-            ps = pc.session().sender("q;{create:always}")
-            pr = pc.session().receiver("q;{create:always}")
-            backup = HaBroker(self, name="backup", ha_cluster=False,
-                              args=["--ha-queue-replication=yes"])
-            br = backup.connect().session().receiver("q;{create:always}")
+        primary = HaBroker(self, name="primary", ha_cluster=False,
+                           args=["--ha-queue-replication=yes"]);
+        pc = primary.connect()
+        ps = pc.session().sender("q;{create:always}")
+        pr = pc.session().receiver("q;{create:always}")
+        backup = HaBroker(self, name="backup", ha_cluster=False,
+                          args=["--ha-queue-replication=yes"])
+        bs = backup.connect().session()
+        br = bs.receiver("q;{create:always}")
+
+        def srange(*args): return [str(i) for i in xrange(*args)]
+
+        for m in srange(3): ps.send(m)
+        # Set up replication with qpid-ha
+        backup.replicate(primary.host_port(), "q")
+        backup.assert_browse_backup("q", srange(3))
+        for m in srange(3,6): ps.send(str(m))
+        backup.assert_browse_backup("q", srange(6))
+        self.assertEqual("0", pr.fetch().content)
+        pr.session.acknowledge()
+        backup.assert_browse_backup("q", srange(1,6))
+
+        # Set up replication with qpid-config
+        ps2 = pc.session().sender("q2;{create:always}")
+        backup.config_replicate(primary.host_port(), "q2");
+        ps2.send("x", timeout=1)
+        backup.assert_browse_backup("q2", ["x"])
 
-            # Set up replication with qpid-ha
-            backup.replicate(primary.host_port(), "q")
-            ps.send("a", timeout=1)
-            backup.assert_browse_backup("q", ["a"])
-            ps.send("b", timeout=1)
-            backup.assert_browse_backup("q", ["a", "b"])
-            self.assertEqual("a", pr.fetch().content)
-            pr.session.acknowledge()
-            backup.assert_browse_backup("q", ["b"])
-
-            # Set up replication with qpid-config
-            ps2 = pc.session().sender("q2;{create:always}")
-            backup.config_replicate(primary.host_port(), "q2");
-            ps2.send("x", timeout=1)
-            backup.assert_browse_backup("q2", ["x"])
-        finally: l.restore()
 
     def test_standalone_queue_replica_failover(self):
         """Test individual queue replication from a cluster to a standalone

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh Thu Jan 23 10:15:46 2014
@@ -1,5 +1,25 @@
 #!/usr/bin/env bash
 
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
 STORE_DIR=/tmp
 LINEARSTOREDIR=~/RedHat/linearstore
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl Thu Jan 23 10:15:46 2014
@@ -1 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 acl allow all all

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp Thu Jan 23 10:15:46 2014
@@ -242,7 +242,7 @@ class GetlineContentGenerator : public C
     virtual bool setContent(Message& msg) {
         string content;
         bool got = getline(std::cin, content);
-        if (got) msg.setContent(content);
+        if (got) msg.setContentObject(content);
         return got;
     }
 };
@@ -251,7 +251,7 @@ class FixedContentGenerator   : public C
   public:
     FixedContentGenerator(const string& s) : content(s) {}
     virtual bool setContent(Message& msg) {
-        msg.setContent(content);
+        msg.setContentObject(content);
         return true;
     }
   private:

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 # An empty configuration file.
 # Used when running tests to avoid picking up configuration
 # installed in the default place.

Modified: qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile Thu Jan 23 10:15:46 2014
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming
+DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming src/jms-client-0-8
 
 
 .PHONY: all $(DIRS)

Modified: qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml Thu Jan 23 10:15:46 2014
@@ -1,4 +1,24 @@
 <?xml version="1.0" encoding="utf-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
 <section id="Java-Broker-Close-Connection-When-No-Route">
   <title>Closing client connections on unroutable mandatory messages</title>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message