qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r667603 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: SessionState.cpp amqp_0_10/SessionHandler.cpp broker/Broker.cpp framing/SequenceNumber.cpp log/Logger.cpp
Date Fri, 13 Jun 2008 17:36:24 GMT
Author: aconway
Date: Fri Jun 13 10:36:23 2008
New Revision: 667603

URL: http://svn.apache.org/viewvc?rev=667603&view=rev
Log:
Fix for broker wraparound problem.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Fri Jun 13 10:36:23 2008
@@ -124,18 +124,20 @@
         throw ResourceLimitExceededException("Replay buffer exceeeded hard limit");
 }
 
+static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536; 
+
 bool SessionState::senderNeedFlush() const {
-    return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit;
+    return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+        (config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit);
 }
 
 void SessionState::senderRecordFlush() {
-    assert(sender.flushPoint <= sender.sendPoint);
     sender.flushPoint = sender.sendPoint;
     sender.unflushedSize = 0;
 }
 
 bool SessionState::senderNeedKnownCompleted() const {
-    return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+    return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
 }
 
 void SessionState::senderRecordKnownCompleted() {
@@ -214,7 +216,8 @@
 }
 
 bool SessionState::receiverNeedKnownCompleted() const {
-    return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+    return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+        (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= config.replayFlushLimit);
 }
         
 const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected;
}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Fri Jun 13 10:36:23
2008
@@ -75,6 +75,8 @@
                 throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not
ready to receive data"));
             if (!getState()->receiverRecord(f))
                 return; // Ignore duplicates.
+            if (getState()->receiverNeedKnownCompleted())
+                sendCompletion();
             getInHandler()->handle(f);
         }
     }
@@ -94,13 +96,22 @@
     }
 }
 
+namespace {
+bool isControl(const AMQFrame& f) {
+    return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+    return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+}
+} // namespace
+
 void SessionHandler::handleOut(AMQFrame& f) {
     checkAttached();
     if (!sendReady)
         throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready
to send data"));
     getState()->senderRecord(f); 
-    if (getState()->senderNeedFlush()) {
-        peer.flush(false, true, true); 
+    if (isCommand(f) && getState()->senderNeedFlush()) {
+        peer.flush(false, false, true); 
         getState()->senderRecordFlush();
     }
     channel.handle(f);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jun 13 10:36:23 2008
@@ -85,7 +85,7 @@
     mgmtPubInterval(10),
     auth(AUTH_DEFAULT),
     realm("QPID"),
-    replayFlushLimit(1024),
+    replayFlushLimit(0),
     replayHardLimit(0)
 {
     int c = sys::SystemInfo::concurrency();
@@ -109,9 +109,7 @@
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming
connections will be trusted")
-        ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
        
-        ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when
the replay buffer reaches this limit. 0 means no limit.")
-        ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay
buffer exceeds this limit. 0 means no limit.");
+        ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication");
 }
 
 const std::string empty;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Fri Jun 13 10:36:23
2008
@@ -26,7 +26,7 @@
 using qpid::framing::SequenceNumber;
 using qpid::framing::Buffer;
 
-SequenceNumber::SequenceNumber() : value(0 - 1) {}
+SequenceNumber::SequenceNumber() : value(0) {}
 
 SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jun 13 10:36:23 2008
@@ -48,7 +48,8 @@
     OstreamOutput(std::ostream& o) : out(&o) {}
 
     OstreamOutput(const string& file)
-        : out(new ofstream(file.c_str())), mine(out)
+        : out(new ofstream(file.c_str(), ios_base::out | ios_base::app)),
+          mine(out)
     {
         if (!out->good())
             throw std::runtime_error("Can't open log file: "+file);



Mime
View raw message