mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bmah...@apache.org
Subject [4/4] mesos git commit: Refactored http::internal::request to use http::connect.
Date Tue, 06 Oct 2015 00:11:26 GMT
Refactored http::internal::request to use http::connect.

Review: https://reviews.apache.org/r/38609


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2199a599
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2199a599
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2199a599

Branch: refs/heads/master
Commit: 2199a599d4e57cce0c9209660e488f530156e07b
Parents: 0064543
Author: Benjamin Mahler <benjamin.mahler@gmail.com>
Authored: Mon Sep 21 18:45:21 2015 -0700
Committer: Benjamin Mahler <benjamin.mahler@gmail.com>
Committed: Mon Oct 5 16:41:16 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp |   3 +
 3rdparty/libprocess/src/http.cpp             | 173 ++--------------------
 2 files changed, 19 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2199a599/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index dfcc188..591c1a9 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -769,6 +769,9 @@ Future<Connection> connect(const URL& url);
 // TODO(bmahler): Consolidate these functions into a single
 // http::request function that takes a 'Request' object.
 
+// TODO(bmahler): Support discarding the future responses;
+// discarding should disconnect from the server.
+
 // TODO(joerg84): Make names consistent (see Mesos-3256).
 
 // Asynchronously sends an HTTP GET request to the specified URL

http://git-wip-us.apache.org/repos/asf/mesos/blob/2199a599/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index d1ff13e..ebf7609 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1070,169 +1070,28 @@ Future<Connection> connect(const URL& url)
 
 namespace internal {
 
-// Forward declaration.
-void _decode(
-    Socket socket,
-    Owned<StreamingResponseDecoder> decoder,
-    const Future<string>& data);
-
-
-Future<Response> decode(
-    Socket socket,
-    Owned<StreamingResponseDecoder> decoder,
-    const string& data)
-{
-  deque<Response*> responses = decoder->decode(data.c_str(), data.length());
-
-  if (decoder->failed() || responses.size() > 1) {
-    foreach (Response* response, responses) {
-      delete response;
-    }
-    return Failure(string("Failed to decode HTTP response") +
-        (responses.size() > 1 ? ": more than one response received" : ""));
-  }
-
-  if (responses.empty()) {
-    // Keep reading until the headers are complete.
-    return socket.recv(None())
-      .then(lambda::bind(&decode, socket, decoder, lambda::_1));
-  }
-
-  // Keep feeding data to the decoder until EOF or a 'recv' failure.
-  if (!data.empty()) {
-    socket.recv(None())
-      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
-  }
-
-  Response response = *responses[0];
-  delete responses[0];
-  return response;
-}
-
-
-void _decode(
-    Socket socket,
-    Owned<StreamingResponseDecoder> decoder,
-    const Future<string>& data)
-{
-  deque<Response*> responses;
-
-  if (!data.isReady()) {
-    // Let the decoder process EOF if a failure
-    // or discard is encountered.
-    responses = decoder->decode("", 0);
-  } else {
-    responses = decoder->decode(data.get().c_str(), data.get().length());
-  }
-
-  // We're not expecting more responses to arrive on this socket!
-  if (!responses.empty() || decoder->failed()) {
-    VLOG(1) << "Failed to decode HTTP response: "
-            << (responses.size() > 1
-                ? ": more than one response received"
-                : "");
-
-    foreach (Response* response, responses) {
-      delete response;
-    }
-
-    return;
-  }
-
-  // Keep reading if the socket has more data.
-  if (data.isReady() && !data.get().empty()) {
-    socket.recv(None())
-      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
-  }
-}
-
-
-// Forward declaration.
-Future<Response> _request(
-    Socket socket,
-    const Address& address,
-    const Request& request,
-    bool streamedResponse);
-
-
 Future<Response> request(const Request& request, bool streamedResponse)
 {
-  Try<Socket> socket = [&request]() -> Try<Socket> {
-    // Default to 'http' if no scheme was specified.
-    if (request.url.scheme.isNone() ||
-        request.url.scheme == string("http")) {
-      return Socket::create(Socket::POLL);
-    }
-
-    if (request.url.scheme == string("https")) {
-#ifdef USE_SSL_SOCKET
-      return Socket::create(Socket::SSL);
-#else
-      return Error("'https' scheme requires SSL enabled");
-#endif
-    }
-
-    return Error("Unsupported URL scheme");
-  }();
-
-  if (socket.isError()) {
-    return Failure("Failed to create socket: " + socket.error());
-  }
-
-  Address address;
-
-  if (request.url.ip.isSome()) {
-    address.ip = request.url.ip.get();
-  } else if (request.url.domain.isNone()) {
-    return Failure("Expecting url.ip or url.domain to be set");
-  } else {
-    Try<net::IP> ip = net::getIP(request.url.domain.get(), AF_INET);
-
-    if (ip.isError()) {
-      return Failure("Failed to determine IP of domain '" +
-                     request.url.domain.get() + "': " + ip.error());
-    }
-
-    address.ip = ip.get();
-  }
-
-  if (request.url.port.isNone()) {
-    return Failure("Expecting url.port to be set");
-  }
-
-  address.port = request.url.port.get();
-
-  return socket->connect(address)
-    .then(lambda::bind(&_request,
-                       socket.get(),
-                       address,
-                       request,
-                       streamedResponse));
-}
+  // We rely on the connection closing after the response.
+  CHECK(!request.keepAlive);
 
+  // This is a one time request which will close the connection when
+  // the response is received. Since 'Connection' is reference-counted,
+  // we must keep a copy around until the disconnection occurs. Note
+  // that in order to avoid a deadlock (Connection destruction occuring
+  // from the ConnectionProcess execution context), we use 'async'.
+  return http::connect(request.url)
+    .then([=](Connection connection) {
+      Future<Response> response = connection.send(request, streamedResponse);
 
-Future<Response> _request(
-    Socket socket,
-    const Address& address,
-    const Request& request,
-    bool streamedResponse)
-{
-  // Need to disambiguate the Socket::recv for binding below.
-  Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
+      Connection* copy = new Connection(std::move(connection));
+      auto deleter = [copy](){ delete copy; };
 
-  Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder());
+      copy->disconnected()
+        .onAny([=]() { async(deleter); });
 
-  Future<Response> pipeResponse = socket.send(encode(request))
-    .then(lambda::function<Future<string>(void)>(
-              lambda::bind(recv, socket, None())))
-    .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1));
-
-  if (streamedResponse) {
-    return pipeResponse;
-  } else {
-    return pipeResponse
-      .then(lambda::bind(&internal::convert, lambda::_1));
-  }
+      return response;
+    });
 }
 
 } // namespace internal {


Mime
View raw message