qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1560618 [3/5] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.receiver/ qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.sender/ qpid/cpp/bindings/...
Date Thu, 23 Jan 2014 10:15:49 GMT
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jan 23 10:15:46 2014
@@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const Bro
     QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
     std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
 
-    broker::QueueRegistry& queues = hb.getBroker().getQueues();
-    queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
     if (expect.empty()) {
         QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
     }
@@ -140,15 +138,6 @@ Primary::~Primary() {
     haBroker.getObserver()->reset();
 }
 
-void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
-    if (replicationTest.useLevel(*q) == ALL) {
-        boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
-        ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
-        q->getMessageInterceptors().add(
-            boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
-    }
-}
-
 void Primary::checkReady() {
     bool activate = false;
     {
@@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr
     if (level) {
         QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
                  << " replication: " << printable(level));
-        initializeQueue(q);
         // Give each queue a unique id. Used by backups to avoid confusion of
         // same-named queues.
         q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -348,6 +336,7 @@ void Primary::opened(broker::Connection&
         } else {
             QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
             i->second->setConnection(&connection);
+            backup = i->second;
         }
         if (info.getStatus() == JOINING) {
             info.setStatus(CATCHUP);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h Thu Jan 23 10:15:46 2014
@@ -125,7 +125,6 @@ class Primary : public Role
     RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
     void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
 
-    void initializeQueue(boost::shared_ptr<broker::Queue>);
     void checkReady();
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Jan 23 10:15:46 2014
@@ -55,8 +55,8 @@ QueueGuard::QueueGuard(broker::Queue& q,
     info.printId(os) << ": ";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
-    queue.addObserver(observer);
-    // Set first after calling addObserver so we know that the back of the
+    queue.getObservers().add(observer);
+    // Set first after adding the observer so we know that the back of the
     // queue+1 is (or will be) a guarded position.
     QueuePosition front, back;
     q.getRange(front, back, broker::REPLICATOR);
@@ -86,7 +86,7 @@ void QueueGuard::dequeued(const Message&
 }
 
 void QueueGuard::cancel() {
-    queue.removeObserver(observer);
+    queue.getObservers().remove(observer);
     Mutex::ScopedLock l(lock);
     if (cancelled) return;
     QPID_LOG(debug, logPrefix << "Cancelled");

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan 23 10:15:46 2014
@@ -21,8 +21,9 @@
 
 #include "Event.h"
 #include "HaBroker.h"
+#include "IdSetter.h"
 #include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroke
       settings(hb.getSettings()),
       nextId(0), maxId(0)
 {
+    // The QueueReplicator will take over setting replication IDs.
+    boost::shared_ptr<IdSetter> setter =
+        q->getMessageInterceptors().findType<IdSetter>();
+    if (setter) q->getMessageInterceptors().remove(setter);
+
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -175,7 +181,7 @@ void QueueReplicator::activate() {
         boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
 
     // Enable callback to destroy()
-    queue->addObserver(
+    queue->getObservers().add(
         boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
 }
 
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(B
     arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
     arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
-    arguments.setString(ReplicatingSubscription::QPID_ID_SET,
-                        encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+    boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+    if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
     try {
         peer.getMessage().subscribe(
             args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const
 }
 
 // Called in connection thread of the queues bridge to primary.
+
 void QueueReplicator::route(Deliverable& deliverable)
 {
     try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const stri
     nextId = decodeStr<IdEvent>(data).id;
 }
 
-ReplicationId QueueReplicator::getMaxId() {
-    Mutex::ScopedLock l(lock);
-    return maxId;
-}
-
 void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
     if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
         // If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { re
 std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
 
 void QueueReplicator::promoted() {
-    // Promoted to primary, deal with auto-delete now.
-    if (queue && queue->isAutoDelete() && subscribed) {
-        // Make a temporary shared_ptr to prevent premature deletion of queue.
-        // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
-        // which could delete the queue while it's still running it's destroyed logic.
-        boost::shared_ptr<Queue> q(queue);
-        q->releaseFromUse();
-        q->scheduleAutoDelete();
+    if (queue) {
+        // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+        queue->getMessageInterceptors().add(
+            boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+        // Process auto-deletes
+        if (queue->isAutoDelete() && subscribed) {
+            // Make a temporary shared_ptr to prevent premature deletion of queue.
+            // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+            // which could delete the queue while it's still running it's destroyed logic.
+            boost::shared_ptr<Queue> q(queue);
+            q->releaseFromUse();
+            q->scheduleAutoDelete();
+        }
     }
 }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 23 10:15:46 2014
@@ -85,8 +85,6 @@ class QueueReplicator : public broker::E
 
     boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
 
-    ReplicationId getMaxId();
-
     // No-op unused Exchange virtual functions.
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h Thu Jan 23 10:15:46 2014
@@ -53,7 +53,7 @@ class  QueueSnapshot : public broker::Qu
 
     void requeued(const broker::Message&) {}
 
-    ReplicationIdSet snapshot() {
+    ReplicationIdSet getSnapshot() {
         sys::Mutex::ScopedLock l(lock);
         return set;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jan 23 10:15:46 2014
@@ -22,7 +22,7 @@
 #include "Event.h"
 #include "IdSetter.h"
 #include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
 #include "TxReplicatingSubscription.h"
 #include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize
         info.printId(os) << ": ";
         logPrefix = os.str();
 
-        // If this is a non-cluster standalone replication then we need to
-        // set up an IdSetter if there is not already one.
-        boost::shared_ptr<IdSetter> idSetter;
-        queue->getMessageInterceptors().each(
-            boost::bind(&copyIf, _1, boost::ref(idSetter)));
-        if (!idSetter) {
-            QPID_LOG(debug, logPrefix << "Standalone replication");
-            queue->getMessageInterceptors().add(
-                boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
-        }
-
         // If there's already a guard (we are in failover) use it, else create one.
         if (primary) guard = primary->getGuard(queue, info);
         if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -150,16 +139,16 @@ void ReplicatingSubscription::initialize
         // However we must attach the observer _before_ we snapshot for
         // initial dequeues to be sure we don't miss any dequeues
         // between the snapshot and attaching the observer.
-        queue->addObserver(
+        queue->getObservers().add(
             boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
-        boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+        boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
         // There may be no snapshot if the queue is being deleted concurrently.
         if (!snapshot) {
-            queue->removeObserver(
+            queue->getObservers().remove(
                 boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
             throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
         }
-        ReplicationIdSet primaryIds = snapshot->snapshot();
+        ReplicationIdSet primaryIds = snapshot->getSnapshot();
         std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
         ReplicationIdSet backupIds;
         if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
@@ -254,7 +243,7 @@ void ReplicatingSubscription::cancel()
     }
     QPID_LOG(debug, logPrefix << "Cancelled");
     if (primary) primary->removeReplica(*this);
-    getQueue()->removeObserver(
+    getQueue()->getObservers().remove(
         boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
     guard->cancel();
     ConsumerImpl::cancel();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jan 23 10:15:46 2014
@@ -68,89 +68,89 @@ class Primary;
  *  ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
  */
 class ReplicatingSubscription :
-    public broker::SemanticState::ConsumerImpl,
-    public broker::QueueObserver
+        public broker::SemanticState::ConsumerImpl,
+        public broker::QueueObserver
 {
-public:
-typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+  public:
+    typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
 
-class Factory : public broker::ConsumerFactory {
-public:
-Factory(HaBroker& hb) : haBroker(hb) {}
-
-HaBroker& getHaBroker() const { return haBroker; }
-
-boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
-broker::SemanticState* parent,
-    const std::string& name, boost::shared_ptr<broker::Queue> ,
-    bool ack, bool acquire, bool exclusive, const std::string& tag,
-    const std::string& resumeId, uint64_t resumeTtl,
-    const framing::FieldTable& arguments);
-private:
-HaBroker& haBroker;
-};
-
-// Argument names for consume command.
-static const std::string QPID_REPLICATING_SUBSCRIPTION;
-static const std::string QPID_BROKER_INFO;
-static const std::string QPID_ID_SET;
-// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
-static const std::string QPID_QUEUE_REPLICATOR;
-static const std::string QPID_TX_REPLICATOR;
+    class Factory : public broker::ConsumerFactory {
+      public:
+        Factory(HaBroker& hb) : haBroker(hb) {}
+
+        HaBroker& getHaBroker() const { return haBroker; }
+
+        boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+            broker::SemanticState* parent,
+            const std::string& name, boost::shared_ptr<broker::Queue> ,
+            bool ack, bool acquire, bool exclusive, const std::string& tag,
+            const std::string& resumeId, uint64_t resumeTtl,
+            const framing::FieldTable& arguments);
+      private:
+        HaBroker& haBroker;
+    };
+
+    // Argument names for consume command.
+    static const std::string QPID_REPLICATING_SUBSCRIPTION;
+    static const std::string QPID_BROKER_INFO;
+    static const std::string QPID_ID_SET;
+    // Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
+    static const std::string QPID_QUEUE_REPLICATOR;
+    static const std::string QPID_TX_REPLICATOR;
 
-ReplicatingSubscription(HaBroker& haBroker,
+    ReplicatingSubscription(HaBroker& haBroker,
                             broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const std::string& tag,
                             const std::string& resumeId, uint64_t resumeTtl,
                             const framing::FieldTable& arguments);
 
-~ReplicatingSubscription();
-
-
-// Consumer overrides.
-bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
-void cancel();
-void acknowledged(const broker::DeliveryRecord&);
-bool browseAcquired() const { return true; }
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool hideDeletedError() { return true; }
-
-// QueueObserver overrides
-void enqueued(const broker::Message&) {}
-void dequeued(const broker::Message&);
-void acquired(const broker::Message&) {}
-void requeued(const broker::Message&) {}
-
-/** A ReplicatingSubscription is a passive observer, not counted for auto
- * deletion and immediate message purposes.
- */
-bool isCounted() { return false; }
-
-/** Initialization that must be done separately from construction
- * because it requires a shared_ptr to this to exist.
- */
-void initialize();
-
-BrokerInfo getBrokerInfo() const { return info; }
+    ~ReplicatingSubscription();
 
-/** Skip replicating enqueue of of ids. */
-void addSkip(const ReplicationIdSet& ids);
 
-protected:
-bool doDispatch();
-
-private:
-std::string logPrefix;
-QueuePosition position;
-ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
-ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
-ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
-bool ready;
-bool cancelled;
-BrokerInfo info;
-boost::shared_ptr<QueueGuard> guard;
+    // Consumer overrides.
+    bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
+    void cancel();
+    void acknowledged(const broker::DeliveryRecord&);
+    bool browseAcquired() const { return true; }
+    // Hide the "queue deleted" error for a ReplicatingSubscription when a
+    // queue is deleted, this is normal and not an error.
+    bool hideDeletedError() { return true; }
+
+    // QueueObserver overrides
+    void enqueued(const broker::Message&) {}
+    void dequeued(const broker::Message&);
+    void acquired(const broker::Message&) {}
+    void requeued(const broker::Message&) {}
+
+    /** A ReplicatingSubscription is a passive observer, not counted for auto
+     * deletion and immediate message purposes.
+     */
+    bool isCounted() { return false; }
+
+    /** Initialization that must be done separately from construction
+     * because it requires a shared_ptr to this to exist.
+     */
+    void initialize();
+
+    BrokerInfo getBrokerInfo() const { return info; }
+
+    /** Skip replicating enqueue of of ids. */
+    void addSkip(const ReplicationIdSet& ids);
+
+  protected:
+    bool doDispatch();
+
+  private:
+    std::string logPrefix;
+    QueuePosition position;
+    ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
+    ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
+    ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
+    bool ready;
+    bool cancelled;
+    BrokerInfo info;
+    boost::shared_ptr<QueueGuard> guard;
     HaBroker& haBroker;
     boost::shared_ptr<Primary> primary;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 LinearStore issues:
 
 Store:
@@ -27,12 +46,23 @@ Store:
    * Store analysis and status
    * Recovery/reading of message content
 
+8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
+
+9. Complete exceptions - several exceptions thrown using jexception have no exception numbers
+
 Current bugs and performance issues:
 ------------------------------------
-1. RH Bugzilla 1035843 - Slow performance for producers
+1. BZ 1035843 - Slow performance for producers
 2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
 3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue
-4. RH Bugzilla 1036026 - Unable to create durable queue - framing error
+4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
+5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
+6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
+7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
+8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
+10. QPID-5464 - Incompletely created journal files accumulate in EFP
+11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash
 
 Code tidy-up
 ------------

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Thu Jan 23 10:15:46 2014
@@ -30,23 +30,36 @@
 namespace qpid {
 namespace linearstore {
 
-InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-    qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+InactivityFireEvent::InactivityFireEvent(JournalImpl* p,
+                                         const ::qpid::sys::Duration timeout):
+        ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
 
-void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
+void InactivityFireEvent::fire() {
+    ::qpid::sys::Mutex::ScopedLock sl(_ife_lock);
+    if (_parent) {
+        _parent->flushFire();
+    }
+}
 
-GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-    qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
+GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p,
+                                       const ::qpid::sys::Duration timeout):
+        ::qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p)
+{}
 
-void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() {
+    ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock);
+    if (_parent) {
+        _parent->getEventsFire();
+    }
+}
 
-JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+JournalImpl::JournalImpl(::qpid::sys::Timer& timer_,
                          const std::string& journalId,
                          const std::string& journalDirectory,
                          JournalLogImpl& journalLogRef,
-                         const qpid::sys::Duration getEventsTimeout,
-                         const qpid::sys::Duration flushTimeout,
-                         qpid::management::ManagementAgent* a,
+                         const ::qpid::sys::Duration getEventsTimeout,
+                         const ::qpid::sys::Duration flushTimeout,
+                         ::qpid::management::ManagementAgent* a,
                          DeleteCallback onDelete):
                          jcntl(journalId, journalDirectory, journalLogRef),
                          timer(timer_),
@@ -76,7 +89,7 @@ JournalImpl::~JournalImpl()
     if (deleteCallback) deleteCallback(*this);
     if (_init_flag && !_stop_flag){
     	try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
-        catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
+        catch (const ::qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
 	}
     getEventsFireEventsPtr->cancel();
     inactivityFireEventPtr->cancel();
@@ -90,7 +103,7 @@ JournalImpl::~JournalImpl()
 }
 
 void
-JournalImpl::initManagement(qpid::management::ManagementAgent* a)
+JournalImpl::initManagement(::qpid::management::ManagementAgent* a)
 {
     _agent = a;
     if (_agent != 0)
@@ -117,10 +130,10 @@ JournalImpl::initManagement(qpid::manage
 
 
 void
-JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
+JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_,
                         const uint16_t wcache_num_pages,
                         const uint32_t wcache_pgsize_sblks,
-                        qpid::linearstore::journal::aio_callback* const cbp)
+                        ::qpid::linearstore::journal::aio_callback* const cbp)
 {
 //    efpp->createJournal(_jdir);
 //    QLS_LOG2(notice, _jid, "Initialized");
@@ -152,10 +165,10 @@ JournalImpl::initialize(qpid::linearstor
 }
 
 void
-JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
                      const uint16_t wcache_num_pages,
                      const uint32_t wcache_pgsize_sblks,
-                     qpid::linearstore::journal::aio_callback* const cbp,
+                     ::qpid::linearstore::journal::aio_callback* const cbp,
                      boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
                      uint64_t& highest_rid,
                      uint64_t queue_id)
@@ -180,6 +193,7 @@ JournalImpl::recover(boost::shared_ptr<q
     }
 */
 
+    // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly
     if (prep_tx_list_ptr) {
         // Create list of prepared xids
         std::vector<std::string> prep_xid_list;
@@ -196,8 +210,8 @@ JournalImpl::recover(boost::shared_ptr<q
     if (prep_tx_list_ptr)
     {
         for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
-            qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
-            for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+            ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+            for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
                 if (tdl_itr->enq_flag_) { // enqueue op
                     i->enqueues->add(queue_id, tdl_itr->rid_);
                 } else { // dequeue op
@@ -237,8 +251,11 @@ JournalImpl::recover_complete()
 
 
 void
-JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-        const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient)
+JournalImpl::enqueue_data_record(const void* const data_buff,
+                                 const size_t tot_data_len,
+                                 const size_t this_data_len,
+                                 ::qpid::linearstore::journal::data_tok* dtokp,
+                                 const bool transient)
 {
     handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
 
@@ -250,8 +267,9 @@ JournalImpl::enqueue_data_record(const v
 }
 
 void
-JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
-        const bool transient)
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len,
+                                        ::qpid::linearstore::journal::data_tok* dtokp,
+                                        const bool transient)
 {
     handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
 
@@ -263,12 +281,17 @@ JournalImpl::enqueue_extern_data_record(
 }
 
 void
-JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-        const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient)
+JournalImpl::enqueue_txn_data_record(const void* const data_buff,
+                                     const size_t tot_data_len,
+                                     const size_t this_data_len,
+                                     ::qpid::linearstore::journal::data_tok* dtokp,
+                                     const std::string& xid,
+                                     const bool tpc_flag,
+                                     const bool transient)
 {
     bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
-    handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
+    handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, tpc_flag, transient));
 
     if (_mgmtObject.get() != 0)
     {
@@ -281,12 +304,15 @@ JournalImpl::enqueue_txn_data_record(con
 }
 
 void
-JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
-        const std::string& xid, const bool transient)
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len,
+                                            ::qpid::linearstore::journal::data_tok* dtokp,
+                                            const std::string& xid,
+                                            const bool tpc_flag,
+                                            const bool transient)
 {
     bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
-    handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
+    handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, tpc_flag, transient));
 
     if (_mgmtObject.get() != 0)
     {
@@ -299,7 +325,8 @@ JournalImpl::enqueue_extern_txn_data_rec
 }
 
 void
-JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit)
+JournalImpl::dequeue_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+                                 const bool txn_coml_commit)
 {
     handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
 
@@ -312,11 +339,14 @@ JournalImpl::dequeue_data_record(qpid::l
 }
 
 void
-JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+JournalImpl::dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+                                     const std::string& xid,
+                                     const bool tpc_flag,
+                                     const bool txn_coml_commit)
 {
     bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
-    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
+    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, tpc_flag, txn_coml_commit));
 
     if (_mgmtObject.get() != 0)
     {
@@ -329,7 +359,8 @@ JournalImpl::dequeue_txn_data_record(qpi
 }
 
 void
-JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_abort(::qpid::linearstore::journal::data_tok* const dtokp,
+                       const std::string& xid)
 {
     handleIoResult(jcntl::txn_abort(dtokp, xid));
 
@@ -341,7 +372,8 @@ JournalImpl::txn_abort(qpid::linearstore
 }
 
 void
-JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_commit(::qpid::linearstore::journal::data_tok* const dtokp,
+                        const std::string& xid)
 {
     handleIoResult(jcntl::txn_commit(dtokp, xid));
 
@@ -366,12 +398,12 @@ JournalImpl::stop(bool block_till_aio_cm
     }
 }
 
-qpid::linearstore::journal::iores
+::qpid::linearstore::journal::iores
 JournalImpl::flush(const bool block_till_aio_cmpl)
 {
-    const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
+    const ::qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
     {
-        qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+        ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
         if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
     }
     return res;
@@ -380,7 +412,7 @@ JournalImpl::flush(const bool block_till
 void
 JournalImpl::getEventsFire()
 {
-    qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+    ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -394,7 +426,7 @@ JournalImpl::flushFire()
         flushTriggeredFlag = false;
     } else {
         if (!flushTriggeredFlag) {
-            flush();
+            flush(false);
             flushTriggeredFlag = true;
         }
     }
@@ -405,20 +437,20 @@ JournalImpl::flushFire()
 }
 
 void
-JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl)
+JournalImpl::wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl)
 {
-    for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+    for (std::vector< ::qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
     {
         DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
 	    if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
 	    {
 		    switch (dtokp->wstate())
 		    {
- 			    case qpid::linearstore::journal::data_tok::ENQ:
+ 			    case ::qpid::linearstore::journal::data_tok::ENQ:
 //std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
              	    dtokp->getSourceMessage()->enqueueComplete();
  				    break;
-			    case qpid::linearstore::journal::data_tok::DEQ:
+			    case ::qpid::linearstore::journal::data_tok::DEQ:
 //std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
 /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
                     dtokp->getSourceMessage()->dequeueComplete();
@@ -443,25 +475,25 @@ JournalImpl::createStore() {
 }
 
 void
-JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r)
+JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r)
 {
     writeActivityFlag = true;
     switch (r)
     {
-        case qpid::linearstore::journal::RHM_IORES_SUCCESS:
+        case ::qpid::linearstore::journal::RHM_IORES_SUCCESS:
             return;
         default:
             {
                 std::ostringstream oss;
-                oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ").";
+                oss << "Unexpected I/O response (" << ::qpid::linearstore::journal::iores_str(r) << ").";
                 QLS_LOG2(error, _jid, oss.str());
                 THROW_STORE_FULL_EXCEPTION(oss.str());
             }
     }
 }
 
-qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
-                                                                      qpid::management::Args& /*args*/,
+::qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
+                                                                      ::qpid::management::Args& /*args*/,
                                                                       std::string& /*text*/)
 {
     Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h Thu Jan 23 10:15:46 2014
@@ -44,86 +44,90 @@ namespace journal {
 class JournalImpl;
 class JournalLogImpl;
 
-class InactivityFireEvent : public qpid::sys::TimerTask
+class InactivityFireEvent : public ::qpid::sys::TimerTask
 {
     JournalImpl* _parent;
-    qpid::sys::Mutex _ife_lock;
+    ::qpid::sys::Mutex _ife_lock;
 
   public:
-    InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+    InactivityFireEvent(JournalImpl* p,
+                        const ::qpid::sys::Duration timeout);
     virtual ~InactivityFireEvent() {}
     void fire();
-    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+    inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
 };
 
-class GetEventsFireEvent : public qpid::sys::TimerTask
+class GetEventsFireEvent : public ::qpid::sys::TimerTask
 {
     JournalImpl* _parent;
-    qpid::sys::Mutex _gefe_lock;
+    ::qpid::sys::Mutex _gefe_lock;
 
   public:
-    GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+    GetEventsFireEvent(JournalImpl* p,
+                       const ::qpid::sys::Duration timeout);
     virtual ~GetEventsFireEvent() {}
     void fire();
-    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+    inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
 };
 
-class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback
+class JournalImpl : public ::qpid::broker::ExternalQueueStore,
+                    public ::qpid::linearstore::journal::jcntl,
+                    public ::qpid::linearstore::journal::aio_callback
 {
   public:
     typedef boost::function<void (JournalImpl&)> DeleteCallback;
 
   protected:
-    qpid::sys::Timer& timer;
+    ::qpid::sys::Timer& timer;
     JournalLogImpl& _journalLogRef;
     bool getEventsTimerSetFlag;
-    boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-    qpid::sys::Mutex _getf_lock;
-    qpid::sys::Mutex _read_lock;
+    boost::intrusive_ptr< ::qpid::sys::TimerTask> getEventsFireEventsPtr;
+    ::qpid::sys::Mutex _getf_lock;
+    ::qpid::sys::Mutex _read_lock;
 
     bool writeActivityFlag;
     bool flushTriggeredFlag;
-    boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+    boost::intrusive_ptr< ::qpid::sys::TimerTask> inactivityFireEventPtr;
 
-    qpid::management::ManagementAgent* _agent;
-    qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
+    ::qpid::management::ManagementAgent* _agent;
+    ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
     DeleteCallback deleteCallback;
 
   public:
 
-    JournalImpl(qpid::sys::Timer& timer,
+    JournalImpl(::qpid::sys::Timer& timer,
                 const std::string& journalId,
                 const std::string& journalDirectory,
                 JournalLogImpl& journalLogRef,
-                const qpid::sys::Duration getEventsTimeout,
-                const qpid::sys::Duration flushTimeout,
-                qpid::management::ManagementAgent* agent,
+                const ::qpid::sys::Duration getEventsTimeout,
+                const ::qpid::sys::Duration flushTimeout,
+                ::qpid::management::ManagementAgent* agent,
                 DeleteCallback deleteCallback=DeleteCallback() );
 
     virtual ~JournalImpl();
 
-    void initManagement(qpid::management::ManagementAgent* agent);
+    void initManagement(::qpid::management::ManagementAgent* agent);
 
-    void initialize(qpid::linearstore::journal::EmptyFilePool* efp,
+    void initialize(::qpid::linearstore::journal::EmptyFilePool* efp,
                     const uint16_t wcache_num_pages,
                     const uint32_t wcache_pgsize_sblks,
-                    qpid::linearstore::journal::aio_callback* const cbp);
+                    ::qpid::linearstore::journal::aio_callback* const cbp);
 
-    inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp,
+    inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp,
                            const uint16_t wcache_num_pages,
                            const uint32_t wcache_pgsize_sblks) {
         initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
     }
 
-    void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+    void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
                  const uint16_t wcache_num_pages,
                  const uint32_t wcache_pgsize_sblks,
-                 qpid::linearstore::journal::aio_callback* const cbp,
+                 ::qpid::linearstore::journal::aio_callback* const cbp,
                  boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
                  uint64_t& highest_rid,
                  uint64_t queue_id);
 
-    inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+    inline void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
                         const uint16_t wcache_num_pages,
                         const uint32_t wcache_pgsize_sblks,
                         boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
@@ -135,47 +139,62 @@ class JournalImpl : public qpid::broker:
     void recover_complete();
 
     // Overrides for write inactivity timer
-    void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-                             const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp,
-                             const bool transient = false);
+    void enqueue_data_record(const void* const data_buff,
+                             const size_t tot_data_len,
+                             const size_t this_data_len,
+                             ::qpid::linearstore::journal::data_tok* dtokp,
+                             const bool transient);
+
+    void enqueue_extern_data_record(const size_t tot_data_len,
+                                    ::qpid::linearstore::journal::data_tok* dtokp,
+                                    const bool transient);
+
+    void enqueue_txn_data_record(const void* const data_buff,
+                                 const size_t tot_data_len,
+                                 const size_t this_data_len,
+                                 ::qpid::linearstore::journal::data_tok* dtokp,
+                                 const std::string& xid,
+                                 const bool tpc_flag,
+                                 const bool transient);
+
+    void enqueue_extern_txn_data_record(const size_t tot_data_len,
+                                        ::qpid::linearstore::journal::data_tok* dtokp,
+                                        const std::string& xid,
+                                        const bool tpc_flag,
+                                        const bool transient);
+
+    void dequeue_data_record(::qpid::linearstore::journal::data_tok*
+                             const dtokp,
+                             const bool txn_coml_commit);
+
+    void dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+                                 const std::string& xid,
+                                 const bool tpc_flag,
+                                 const bool txn_coml_commit);
 
-    void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
-                                    const bool transient = false);
+    void txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
 
-    void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-                                 const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid,
-                                 const bool transient = false);
-
-    void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
-                                        const std::string& xid, const bool transient = false);
-
-    void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
-
-    void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
-
-    void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
-
-    void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
+    void txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
 
     void stop(bool block_till_aio_cmpl = false);
 
     // Overrides for get_events timer
-    qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false);
+    ::qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl);
 
     // TimerTask callback
     void getEventsFire();
     void flushFire();
 
     // AIO callbacks
-    virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl);
+    virtual void wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl);
     virtual void rd_aio_cb(std::vector<uint16_t>& pil);
 
-    qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+    ::qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
     { return _mgmtObject; }
 
-    qpid::management::Manageable::status_t ManagementMethod (uint32_t,
-                                                             qpid::management::Args&,
-                                                             std::string&);
+    ::qpid::management::Manageable::status_t ManagementMethod(uint32_t,
+                                                              ::qpid::management::Args&,
+                                                              std::string&);
 
     void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
 
@@ -188,7 +207,7 @@ class JournalImpl : public qpid::broker:
         timer.add(getEventsFireEventsPtr);
         getEventsTimerSetFlag = true;
     }
-    void handleIoResult(const qpid::linearstore::journal::iores r);
+    void handleIoResult(const ::qpid::linearstore::journal::iores r);
 
     // Management instrumentation callbacks overridden from jcntl
     inline void instr_incr_outstanding_aio_cnt() {
@@ -203,23 +222,27 @@ class JournalImpl : public qpid::broker:
 class TplJournalImpl : public JournalImpl
 {
   public:
-    TplJournalImpl(qpid::sys::Timer& timer,
+    TplJournalImpl(::qpid::sys::Timer& timer,
                    const std::string& journalId,
                    const std::string& journalDirectory,
                    JournalLogImpl& journalLogRef,
-                   const qpid::sys::Duration getEventsTimeout,
-                   const qpid::sys::Duration flushTimeout,
-                   qpid::management::ManagementAgent* agent) :
+                   const ::qpid::sys::Duration getEventsTimeout,
+                   const ::qpid::sys::Duration flushTimeout,
+                   ::qpid::management::ManagementAgent* agent) :
         JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
     {}
 
     virtual ~TplJournalImpl() {}
 
     // Special version of read_data_record that ignores transactions - needed when reading the TPL
-    inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
-                                                void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
-                                                qpid::linearstore::journal::data_tok* const dtokp) {
-        return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+    inline ::qpid::linearstore::journal::iores read_data_record(void** const datapp,
+                                                                std::size_t& dsize,
+                                                                void** const xidpp,
+                                                                std::size_t& xidsize,
+                                                                bool& transient,
+                                                                bool& external,
+                                                                ::qpid::linearstore::journal::data_tok* const dtokp) {
+        return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, false);
     }
 }; // class TplJournalImpl
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan 23 10:15:46 2014
@@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::de
 qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_,
-                                                     const bool deq_flag_,
-                                                     const bool commit_flag_,
-                                                     const bool tpc_flag_) :
-                                                     rid(rid_),
-                                                     deq_flag(deq_flag_),
-                                                     commit_flag(commit_flag_),
-                                                     tpc_flag(tpc_flag_)
-{}
-
 MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
                                    defaultEfpPartitionNumber(0),
                                    defaultEfpFileSize_kib(0),
@@ -351,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit()
     qpid::sys::Mutex::ScopedLock sl(tplInitLock);
     if (!tplStorePtr->is_ready()) {
         qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
-        tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+        tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
         if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
@@ -594,6 +584,13 @@ void MessageStoreImpl::recover(qpid::bro
     txn_list prepared;
     recoverLockedMappings(prepared);
 
+    std::ostringstream oss;
+    oss << "Recovered transaction prepared list:";
+    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+        oss << std::endl << "     " << str2hexnum(i->xid);
+    }
+    QLS_LOG(debug, oss.str());
+
     queue_index queues;//id->queue
     exchange_index exchanges;//id->exchange
     message_index messages;//id->message
@@ -601,7 +598,7 @@ void MessageStoreImpl::recover(qpid::bro
     TxnCtxt txn;
     txn.begin(dbenv.get(), false);
     try {
-        //read all queues, calls recoversMessages
+        //read all queues, calls recoversMessages for each queue
         recoverQueues(txn, registry_, queues, prepared, messages);
 
         //recover exchange & bindings:
@@ -621,6 +618,7 @@ void MessageStoreImpl::recover(qpid::bro
     }
 
     //recover transactions:
+    qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
         const PreparedTransaction pt = *i;
         if (mgmtObject.get() != 0) {
@@ -629,20 +627,20 @@ void MessageStoreImpl::recover(qpid::bro
         }
 
         std::string xid = pt.xid;
-
-        // Restore data token state in TxnCtxt
-        TplRecoverMapCitr citr = tplRecoverMap.find(xid);
-        if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+        qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
+        if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+        qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+        bool commitFlag = txn_op_stats.abortCnt == 0;
 
         // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
-        bool incomplTplTxnFlag = citr->second.deq_flag;
+        bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
 
-        if (citr->second.tpc_flag) {
+        if (txn_op_stats.tpcCnt > 0) {
             // Dtx (2PC) transaction
             TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
             std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
-            tpcc->recoverDtok(citr->second.rid, xid);
+            tpcc->recoverDtok(txn_op_stats.rid, xid);
             tpcc->prepare(tplStorePtr.get());
 
             qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -661,12 +659,12 @@ void MessageStoreImpl::recover(qpid::bro
             }
 
             if (incomplTplTxnFlag) {
-                tpcc->complete(citr->second.commit_flag);
+                tpcc->complete(commitFlag);
             }
         } else {
             // Local (1PC) transaction
             boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
-            opcc->recoverDtok(citr->second.rid, xid);
+            opcc->recoverDtok(txn_op_stats.rid, xid);
             opcc->prepare(tplStorePtr.get());
 
             if (pt.enqueues.get()) {
@@ -680,11 +678,12 @@ void MessageStoreImpl::recover(qpid::bro
                 }
             }
             if (incomplTplTxnFlag) {
-                opcc->complete(citr->second.commit_flag);
+                opcc->complete(commitFlag);
             } else {
-                completed(*opcc.get(), citr->second.commit_flag);
+                completed(*opcc.get(), commitFlag);
             }
         }
+
     }
     registry_.recoveryComplete();
 }
@@ -888,12 +887,13 @@ void MessageStoreImpl::recoverMessages(T
     bool externalFlag = false;
     DataTokenImpl dtok;
     dtok.set_wstate(DataTokenImpl::NONE);
+    qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
 
     // Read the message from the Journal.
     try {
         unsigned aio_sleep_cnt = 0;
         while (read) {
-            qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+            qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false);
 
             switch (res)
             {
@@ -907,7 +907,7 @@ void MessageStoreImpl::recoverMessages(T
                     msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
                 } else {
                     headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
-                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
                     msg = recovery.recoverMessage(headerBuff);
                 }
                 msg->setPersistenceId(dtok.rid());
@@ -932,30 +932,30 @@ void MessageStoreImpl::recoverMessages(T
                 } else {
                     uint64_t rid = dtok.rid();
                     std::string xid(i->xid);
-                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
-                    if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-
-                    // deq present in prepared list: this xid is part of incomplete txn commit/abort
-                    // or this is a 1PC txn that must be rolled forward
-                    if (citr->second.deq_flag || !citr->second.tpc_flag) {
+                    qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
+                    if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+                    qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+                    if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
                         if (jc->is_enqueued(rid, true)) {
                             // Enqueue is non-tx, dequeue tx
                             assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
-                            if (!citr->second.commit_flag) {
+                            if (txn_op_stats.abortCnt > 0) {
                                 rcnt++;
                                 queue->recover(msg); // recover message in abort case only
                             }
                         } else {
                             // Enqueue and/or dequeue tx
                             qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
-                            qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+                            qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
                             bool enq = false;
                             bool deq = false;
-                            for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                                if (j->enq_flag_ && j->rid_ == rid) enq = true;
-                                else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
+                            for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
+                                if (j->enq_flag_ && j->rid_ == rid)
+                                    enq = true;
+                                else if (!j->enq_flag_ && j->drid_ == rid)
+                                    deq = true;
                             }
-                            if (enq && !deq && citr->second.commit_flag) {
+                            if (enq && !deq && txn_op_stats.abortCnt == 0) {
                                 rcnt++;
                                 queue->recover(msg); // recover txn message in commit case only
                             }
@@ -969,10 +969,14 @@ void MessageStoreImpl::recoverMessages(T
                 dtok.reset();
                 dtok.set_wstate(DataTokenImpl::NONE);
 
-                if (xidbuff)
+                if (xidbuff) {
                     ::free(xidbuff);
-                else if (dbuff)
+                    xidbuff = NULL;
+                }
+                if (dbuff) {
                     ::free(dbuff);
+                    dbuff = NULL;
+                }
                 aio_sleep_cnt = 0;
                 break;
               }
@@ -1033,77 +1037,6 @@ int MessageStoreImpl::enqueueMessage(Txn
     return count;
 }
 
-void MessageStoreImpl::readTplStore()
-{
-    tplRecoverMap.clear();
-    qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
-    DataTokenImpl dtok;
-    void* dbuff = NULL; size_t dbuffSize = 0;
-    void* xidbuff = NULL; size_t xidbuffSize = 0;
-    bool transientFlag = false;
-    bool externalFlag = false;
-    bool done = false;
-    try {
-        unsigned aio_sleep_cnt = 0;
-        while (!done) {
-            dtok.reset();
-            dtok.set_wstate(DataTokenImpl::ENQ);
-            qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
-            switch (res) {
-              case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
-                // Every TPL record contains both data and an XID
-                assert(dbuffSize>0);
-                assert(xidbuffSize>0);
-                std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
-                bool is2PC = *(static_cast<char*>(dbuff)) != 0;
-
-                // Check transaction details; add to recover map
-                qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); //  txnList will be empty if xid not found
-                if (!txnList.empty()) { // xid found in tmap
-                    unsigned enqCnt = 0;
-                    unsigned deqCnt = 0;
-                    uint64_t rid = 0;
-
-                    // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
-                    // Note: will apply to both 1PC and 2PC transactions.
-                    bool commitFlag = true;
-
-                    for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                        if (j->enq_flag_) {
-                            rid = j->rid_;
-                            enqCnt++;
-                        } else {
-                            commitFlag = j->commit_flag_;
-                            deqCnt++;
-                        }
-                    }
-                    assert(enqCnt == 1);
-                    assert(deqCnt <= 1);
-                    tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
-                }
-
-                ::free(xidbuff);
-                aio_sleep_cnt = 0;
-                break;
-                }
-              case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
-                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                    THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
-                ::usleep(AIO_SLEEP_TIME_US);
-                break;
-              case qpid::linearstore::journal::RHM_IORES_EMPTY:
-                done = true;
-                break; // done with all messages. (add call in jrnl to test that _emap is empty.)
-              default:
-                std::ostringstream oss;
-                oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
-                THROW_STORE_EXCEPTION(oss.str());
-            } // switch
-        }
-    } catch (const qpid::linearstore::journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
-    }
-}
 
 void MessageStoreImpl::recoverTplStore()
 {
@@ -1114,11 +1047,7 @@ void MessageStoreImpl::recoverTplStore()
             highestRid = thisHighestRid;
         else if (thisHighestRid - highestRid  < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
             highestRid = thisHighestRid;
-
-        // Load tplRecoverMap by reading the TPL store
-        readTplStore();
-
-        tplStorePtr->recover_complete(); // start journal.
+        tplStorePtr->recover_complete(); // start TPL
     }
 }
 
@@ -1126,28 +1055,32 @@ void MessageStoreImpl::recoverLockedMapp
 {
     if (!tplStorePtr->is_ready())
         recoverTplStore();
-
-    // Abort unprepared xids and populate the locked maps
-    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
+    std::vector<std::string> xidList;
+    tplStorePtr->get_txn_map().xid_list(xidList);
+    for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
         LockedMappings::shared_ptr enq_ptr;
         enq_ptr.reset(new LockedMappings);
         LockedMappings::shared_ptr deq_ptr;
         deq_ptr.reset(new LockedMappings);
-        txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
+        txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
     }
 }
 
 void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
 {
-    if (tplStorePtr->is_ready()) {
-        readTplStore();
-    } else {
+    if (!tplStorePtr->is_ready()) {
         recoverTplStore();
     }
-    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
-        // Discard all txns that are to be rolled forward/back and 1PC transactions
-        if (!i->second.deq_flag && i->second.tpc_flag)
-            xids.insert(i->first);
+    std::vector<std::string> xidList;
+    tplStorePtr->get_txn_map().xid_list(xidList);
+    for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
+        qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+        qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+        if (txn_op_stats.tpcCnt > 0) {
+            if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) {
+                xids.insert(*i);
+            }
+        }
     }
 }
 
@@ -1186,7 +1119,7 @@ void MessageStoreImpl::flush(const qpid:
         JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
         if (jc) {
             // TODO: check if this result should be used...
-            /*mrg::journal::iores res =*/ jc->flush();
+            /*mrg::journal::iores res =*/ jc->flush(false);
         }
     } catch (const qpid::linearstore::journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
@@ -1258,7 +1191,7 @@ void MessageStoreImpl::store(const qpid:
             if (txn_->getXid().empty()) {
                 jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
             } else {
-                jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
+                jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent());
             }
         } else {
             THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
@@ -1309,9 +1242,10 @@ void MessageStoreImpl::async_dequeue(qpi
     ddtokp->set_rid(messageIdSequence.next());
     ddtokp->set_dequeue_rid(msg_->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
+    TxnCtxt* txn = 0;
     std::string tid;
     if (ctxt_) {
-        TxnCtxt* txn = check(ctxt_);
+        txn = check(ctxt_);
         tid = txn->getXid();
     }
     // Manually increase the ref count, as raw pointers are used beyond this point
@@ -1319,9 +1253,9 @@ void MessageStoreImpl::async_dequeue(qpi
     try {
         JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
         if (tid.empty()) {
-            jc->dequeue_data_record(ddtokp.get());
+            jc->dequeue_data_record(ddtokp.get(), false);
         } else {
-            jc->dequeue_txn_data_record(ddtokp.get(), tid);
+            jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false);
         }
     } catch (const qpid::linearstore::journal::jexception& e) {
         ddtokp->release();
@@ -1341,7 +1275,7 @@ void MessageStoreImpl::completed(TxnCtxt
             DataTokenImpl* dtokp = txn_.getDtok();
             dtokp->set_dequeue_rid(dtokp->rid());
             dtokp->set_rid(messageIdSequence.next());
-            tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_);
+            tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_);
         }
         txn_.complete(commit_);
         if (mgmtObject.get() != 0) {
@@ -1376,12 +1310,16 @@ void MessageStoreImpl::prepare(qpid::bro
 {
     checkInit();
     TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_);
+//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
     if(!txn) throw qpid::broker::InvalidTransactionContextException();
     localPrepare(txn);
 }
 
 void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
 {
+//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
     try {
         chkTplStoreInit(); // Late initialize (if needed)
 
@@ -1394,7 +1332,7 @@ void MessageStoreImpl::localPrepare(TxnC
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
         char tpcFlag = static_cast<char>(ctxt_->isTPC());
-        tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false);
+        tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false);
         ctxt_->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt_->sync();
@@ -1572,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(Jo
     journalList.erase(j_.id());
 }
 
+std::string MessageStoreImpl::str2hexnum(const std::string& str) {
+    std::ostringstream oss;
+    oss << "(" << str.size() << ")0x" << std::hex;
+    for (unsigned i=str.size(); i>0; --i) {
+        oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+    }
+    return oss.str();
+}
+
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
                                              qpid::Options(name_),
                                              truncateFlag(defTruncateFlag),

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Thu Jan 23 10:15:46 2014
@@ -89,19 +89,6 @@ class MessageStoreImpl : public qpid::br
     typedef LockedMappings::map txn_lock_map;
     typedef boost::ptr_list<PreparedTransaction> txn_list;
 
-    // Structs for Transaction Recover List (TPL) recover state
-    struct TplRecoverStruct {
-        uint64_t rid; // rid of TPL record
-        bool deq_flag;
-        bool commit_flag;
-        bool tpc_flag;
-        TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
-    };
-    typedef TplRecoverStruct TplRecover;
-    typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
-    typedef std::map<std::string, TplRecover> TplRecoverMap;
-    typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
-
     typedef std::map<std::string, JournalImpl*> JournalListMap;
     typedef JournalListMap::iterator JournalListMapItr;
 
@@ -127,7 +114,6 @@ class MessageStoreImpl : public qpid::br
 
     // Pointer to Transaction Prepared List (TPL) journal instance
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
-    TplRecoverMap tplRecoverMap;
     qpid::sys::Mutex tplInitLock;
     JournalListMap journalList;
     qpid::sys::Mutex journalListLock;
@@ -202,7 +188,6 @@ class MessageStoreImpl : public qpid::br
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void readTplStore();
     void recoverTplStore();
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
@@ -250,18 +235,7 @@ class MessageStoreImpl : public qpid::br
     }
     void chkTplStoreInit();
 
-    // debug aid for printing XIDs that may contain non-printable chars
-    static std::string xid2str(const std::string xid) {
-        std::ostringstream oss;
-        oss << std::hex << std::setfill('0');
-        for (unsigned i=0; i<xid.size(); i++) {
-            if (isprint(xid[i]))
-                oss << xid[i];
-            else
-                oss << "/" << std::setw(2) << (int)((char)xid[i]);
-        }
-        return oss.str();
-    }
+    static std::string str2hexnum(const std::string& str);
 
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp Thu Jan 23 10:15:46 2014
@@ -114,7 +114,7 @@ void TxnCtxt::sync() {
 
 void TxnCtxt::jrnl_flush(JournalImpl* jc) {
     if (jc && !(jc->is_txn_synced(getXid())))
-        jc->flush();
+        jc->flush(false);
 }
 
 void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp Thu Jan 23 10:15:46 2014
@@ -51,6 +51,10 @@ void LinearFileController::initialize(co
 }
 
 void LinearFileController::finalize() {
+    if (currentJournalFilePtr_) {
+        currentJournalFilePtr_->close();
+        currentJournalFilePtr_ = NULL;
+    }
     while (!journalFileList_.empty()) {
         delete journalFileList_.front();
         journalFileList_.pop_front();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message