qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1243579 - in /qpid/branches/qpid-3603-2/qpid/cpp: include/qpid/messaging/ include/qpid/types/ src/qpid/client/amqp0_10/ src/qpid/types/ src/tests/
Date Mon, 13 Feb 2012 16:18:14 GMT
Author: aconway
Date: Mon Feb 13 16:18:13 2012
New Revision: 1243579

URL: http://svn.apache.org/viewvc?rev=1243579&view=rev
Log:
QPID-3603: c++ messaging API: allow floating point reconnect durations in seconds.

Allow sub-second intervals, e.g. reconnect_interval_min=0.001 for a
millisecond interval.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/messaging/Connection.h
    qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/types/Variant.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/types/Variant.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/messaging/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/messaging/Connection.h?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/messaging/Connection.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/messaging/Connection.h Mon Feb 13 16:18:13
2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ template <class> class PrivateImplRef;
 class ConnectionImpl;
 class Session;
 
-/**  \ingroup messaging 
+/**  \ingroup messaging
  * A connection represents a network connection to a remote endpoint.
  */
 
@@ -48,40 +48,42 @@ class QPID_MESSAGING_CLASS_EXTERN Connec
     QPID_MESSAGING_EXTERN Connection(ConnectionImpl* impl);
     QPID_MESSAGING_EXTERN Connection(const Connection&);
     QPID_MESSAGING_EXTERN Connection();
-    /**  
+    /**
      * Current implementation supports the following options:
-     * 
-     *     username
-     *     password
-     *     heartbeat
-     *     tcp_nodelay
-     *     sasl_mechanisms
-     *     sasl_service
-     *     sasl_min_ssf
-     *     sasl_max_ssf
-     *     transport
-     * 
+     *
+     * - username
+     * - password
+     * - heartbeat
+     * - tcp_nodelay
+     * - sasl_mechanisms
+     * - sasl_service
+     * - sasl_min_ssf
+     * - sasl_max_ssf
+     * - transport
+     *
      * Reconnect behaviour can be controlled through the following options:
-     * 
-     *     reconnect: true/false (enables/disables reconnect entirely)
-     *     reconnect_timeout: number of seconds (give up and report failure after specified
time)
-     *     reconnect_limit: n (give up and report failure after specified number of attempts)
-     *     reconnect_interval_min: number of seconds (initial delay between failed reconnection
attempts)
-     *     reconnect_interval_max: number of seconds (maximum delay between failed reconnection
attempts)
-     *     reconnect_interval: shorthand for setting the same reconnect_interval_min/max
-     *     reconnect_urls: list of alternate urls to try when connecting
-     *
-     *     The reconnect_interval is the time that the client waits
-     *     for after a failed attempt to reconnect before retrying. It
-     *     starts at the value of the min_retry_interval and is
-     *     doubled every failure until the value of max_retry_interval
-     *     is reached.
+     *
+     * - reconnect: true/false (enables/disables reconnect entirely)
+     * - reconnect_timeout: seconds (give up and report failure after specified time)
+     * - reconnect_limit: n (give up and report failure after specified number of attempts)
+     * - reconnect_interval_min: seconds (initial delay between failed reconnection attempts)
+     * - reconnect_interval_max: seconds (maximum delay between failed reconnection attempts)
+     * - reconnect_interval: shorthand for setting the same reconnect_interval_min/max
+     * - reconnect_urls: list of alternate urls to try when connecting
+     *
+     * The reconnect_interval is the time that the client waits for
+     * after a failed attempt to reconnect before retrying. It starts
+     * at the value of the min_retry_interval and is doubled every
+     * failure until the value of max_retry_interval is reached.
+     *
+     * Values in seconds can be fractional, for example 0.001 is a
+     * millisecond delay.
      */
     QPID_MESSAGING_EXTERN Connection(const std::string& url, const qpid::types::Variant::Map&
options = qpid::types::Variant::Map());
     /**
      * Creates a connection using an option string of the form
      * {name:value,name2:value2...}, see above for options supported.
-     * 
+     *
      * @exception InvalidOptionString if the string does not match the correct syntax
      */
     QPID_MESSAGING_EXTERN Connection(const std::string& url, const std::string& options);

Modified: qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/types/Variant.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/types/Variant.h?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/types/Variant.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/include/qpid/types/Variant.h Mon Feb 13 16:18:13 2012
@@ -62,6 +62,8 @@ enum VariantType {
 
 std::string getTypeName(VariantType type);
 
+bool isIntegerType(VariantType type);
+
 class VariantImpl;
 
 /**

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Feb
13 16:18:13 2012
@@ -29,6 +29,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include <vector>
 #include <sstream>
+#include <limits>
 
 namespace qpid {
 namespace client {
@@ -39,6 +40,16 @@ using qpid::types::VAR_LIST;
 using qpid::framing::Uuid;
 
 namespace {
+
+double FOREVER(std::numeric_limits<double>::max());
+
+// Time values in seconds can be specified as integer or floating point values.
+double timeValue(const Variant& value) {
+    if (types::isIntegerType(value.getType()))
+        return double(value.asInt64());
+    return value.asDouble();
+}
+
 void merge(const std::string& value, std::vector<std::string>& list) {
     if (std::find(list.begin(), list.end(), value) == list.end())
         list.push_back(value);
@@ -60,11 +71,21 @@ std::string asString(const std::vector<s
     os << "]";
     return os.str();
 }
+
+bool expired(const sys::AbsTime& start, double timeout)
+{
+    if (timeout == 0) return true;
+    if (timeout == FOREVER) return false;
+    sys::Duration used(start, sys::now());
+    sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+    return allowed < used;
 }
 
+} // namespace
+
 ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
:
-    replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
-    minReconnectInterval(3), maxReconnectInterval(60),
+    replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+    minReconnectInterval(0.001), maxReconnectInterval(2),
     retries(0), reconnectOnLimitExceeded(true)
 {
     setOptions(options);
@@ -85,15 +106,15 @@ void ConnectionImpl::setOption(const std
     if (name == "reconnect") {
         reconnect = value;
     } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
-        timeout = value;
+        timeout = timeValue(value);
     } else if (name == "reconnect-limit" || name == "reconnect_limit") {
         limit = value;
     } else if (name == "reconnect-interval" || name == "reconnect_interval") {
-        maxReconnectInterval = minReconnectInterval = value;
+        maxReconnectInterval = minReconnectInterval = timeValue(value);
     } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
-        minReconnectInterval = value;
+        minReconnectInterval = timeValue(value);
     } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
-        maxReconnectInterval = value;
+        maxReconnectInterval = timeValue(value);
     } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
         replaceUrls = value.asBool();
     } else if (name == "reconnect-urls" || name == "reconnect_urls") {
@@ -236,18 +257,9 @@ void ConnectionImpl::reopen()
 }
 
 
-bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
-{
-    if (timeout == 0) return true;
-    if (timeout < 0) return false;
-    qpid::sys::Duration used(start, qpid::sys::now());
-    qpid::sys::Duration allowed = timeout * qpid::sys::TIME_SEC;
-    return allowed < used;
-}
-
 void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
 {
-    for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval))
{
+    for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval))
{
         if (!reconnect) {
             throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
         }
@@ -257,7 +269,7 @@ void ConnectionImpl::connect(const qpid:
         if (expired(started, timeout)) {
             throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
         }
-        else qpid::sys::sleep(i);
+        qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
     }
     retries = 0;
 }
@@ -320,6 +332,7 @@ bool ConnectionImpl::backoff()
         return false;
     }
 }
+
 std::string ConnectionImpl::getAuthenticatedUsername()
 {
     return connection.getNegotiatedSettings().username;

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Mon Feb 13
16:18:13 2012
@@ -64,10 +64,10 @@ class ConnectionImpl : public qpid::mess
     std::vector<std::string> urls;
     qpid::client::ConnectionSettings settings;
     bool reconnect;
-    int64_t timeout;
+    double timeout;
     int32_t limit;
-    int64_t minReconnectInterval;
-    int64_t maxReconnectInterval;
+    double minReconnectInterval;
+    double maxReconnectInterval;
     int32_t retries;
     bool reconnectOnLimitExceeded;
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/types/Variant.cpp?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/types/Variant.cpp Mon Feb 13 16:18:13 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@ class VariantImpl
     bool isEqualTo(VariantImpl&) const;
     bool isEquivalentTo(VariantImpl&) const;
 
-    static VariantImpl* create(const Variant&);    
+    static VariantImpl* create(const Variant&);
   private:
     const VariantType type;
     union {
@@ -150,7 +150,7 @@ VariantImpl::VariantImpl(const Variant::
 VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.v = new Variant::List(l);
}
 VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.v = new Uuid(u); }
 
-VariantImpl::~VariantImpl() { 
+VariantImpl::~VariantImpl() {
     switch (type) {
       case VAR_STRING:
         delete reinterpret_cast<std::string*>(value.v);
@@ -173,7 +173,7 @@ VariantType VariantImpl::getType() const
 
 namespace {
 
-bool same_char(char a, char b) 
+bool same_char(char a, char b)
 {
     return toupper(a) == toupper(b);
 }
@@ -191,7 +191,7 @@ bool toBool(const std::string& s)
     if (caseInsensitiveMatch(s, TRUE)) return true;
     if (caseInsensitiveMatch(s, FALSE)) return false;
     try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&)
{}
-    throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
   
+    throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
 }
 
 template <class T> std::string toString(const T& t)
@@ -531,9 +531,9 @@ bool VariantImpl::isEqualTo(VariantImpl&
           case VAR_INT64: return value.i64 == other.value.i64;
           case VAR_DOUBLE: return value.d == other.value.d;
           case VAR_FLOAT: return value.f == other.value.f;
-          case VAR_STRING: return *reinterpret_cast<std::string*>(value.v) 
+          case VAR_STRING: return *reinterpret_cast<std::string*>(value.v)
                 == *reinterpret_cast<std::string*>(other.value.v);
-          case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v) 
+          case VAR_UUID: return *reinterpret_cast<Uuid*>(value.v)
                 == *reinterpret_cast<Uuid*>(other.value.v);
           case VAR_LIST: return equal(asList(), other.asList());
           case VAR_MAP: return equal(asMap(), other.asMap());
@@ -616,7 +616,25 @@ std::string getTypeName(VariantType type
     return "<unknown>";//should never happen
 }
 
-VariantImpl* VariantImpl::create(const Variant& v) 
+bool isIntegerType(VariantType type)
+{
+    switch (type) {
+      case VAR_BOOL:
+      case VAR_UINT8:
+      case VAR_UINT16:
+      case VAR_UINT32:
+      case VAR_UINT64:
+      case VAR_INT8:
+      case VAR_INT16:
+      case VAR_INT32:
+      case VAR_INT64:
+        return true;
+      default:
+        return false;
+    }
+}
+
+VariantImpl* VariantImpl::create(const Variant& v)
 {
     switch (v.getType()) {
       case VAR_BOOL: return new VariantImpl(v.asBool());
@@ -815,9 +833,9 @@ const Variant::List& Variant::asList() c
 Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't convert
VOID to LIST"); return impl->asList(); }
 const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't
convert VOID to STRING"); return impl->getString(); }
 std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert
VOID to STRING"); return impl->getString(); }
-void Variant::setEncoding(const std::string& s) { 
+void Variant::setEncoding(const std::string& s) {
     if (!impl) impl = new VariantImpl();
-    impl->setEncoding(s); 
+    impl->setEncoding(s);
 }
 const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding()
: EMPTY; }
 
@@ -873,7 +891,7 @@ std::ostream& operator<<(std::ostream& o
         out << value.asString();
         break;
     }
-    return out;    
+    return out;
 }
 
 bool operator==(const Variant& a, const Variant& b)

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1243579&r1=1243578&r2=1243579&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Mon Feb 13 16:18:13 2012
@@ -236,7 +236,7 @@ class ShortTests(BrokerTest):
             print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
             raise
 
-    def test_failover(self):
+    def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in python client"""
         getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
@@ -263,6 +263,7 @@ class ShortTests(BrokerTest):
         c.close()
 
     def test_failover_cpp(self):
+        """Verify that failover works in the C++ client."""
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -281,8 +282,7 @@ class ShortTests(BrokerTest):
         assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
         backup.promote()
         n = receiver.received       # Make sure we are still running
-        # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long timeout
-        assert retry(lambda: receiver.received > n + 10, timeout=5)
+        assert retry(lambda: receiver.received > n + 10)
         sender.stop()
         receiver.stop()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message