mesos-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benjamin Mahler <bmah...@apache.org>
Subject Re: [5/5] mesos git commit: Used `loop` in implementation of io::read and io::write.
Date Mon, 09 Jan 2017 22:52:30 GMT
Looks like there are some warnings for some of the loop changes:

../../../3rdparty/libprocess/src/io.cpp: In lambda function:
../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches end
of non-void function [-Wreturn-type]
         }();
         ^
mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo
mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo
../../../3rdparty/libprocess/src/http.cpp: In lambda function:
../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control reaches
end of non-void function [-Wreturn-type]
       },
       ^
../../../3rdparty/libprocess/src/http.cpp: In lambda function:
../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control reaches
end of non-void function [-Wreturn-type]
             }()
             ^

On Sun, Jan 8, 2017 at 7:27 PM, <benh@apache.org> wrote:

> Used `loop` in implementation of io::read and io::write.
>
> Review: https://reviews.apache.org/r/54841
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198
>
> Branch: refs/heads/master
> Commit: 2d28c198a5c09308825b2771483b70ac42839d16
> Parents: 9984449
> Author: Benjamin Hindman <benjamin.hindman@gmail.com>
> Authored: Mon Dec 5 10:48:56 2016 -0800
> Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
> Committed: Sun Jan 8 19:27:02 2017 -0800
>
> ----------------------------------------------------------------------
>  3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++----------------------
>  1 file changed, 114 insertions(+), 175 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/
> 3rdparty/libprocess/src/io.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.
> cpp
> index d0b3ba1..8aa3576 100644
> --- a/3rdparty/libprocess/src/io.cpp
> +++ b/3rdparty/libprocess/src/io.cpp
> @@ -42,132 +42,114 @@ enum ReadFlags
>  };
>
>
> -void read(
> -    int fd,
> -    void* data,
> -    size_t size,
> -    ReadFlags flags,
> -    const std::shared_ptr<Promise<size_t>>& promise,
> -    const Future<short>& future)
> +Future<size_t> read(int fd, void* data, size_t size, ReadFlags flags =
> NONE)
>  {
> -  // Ignore this function if the read operation has been discarded.
> -  if (promise->future().hasDiscard()) {
> -    CHECK(!future.isPending());
> -    promise->discard();
> -    return;
> -  }
> -
> +  // TODO(benh): Let the system calls do what ever they're supposed to
> +  // rather than return 0 here?
>    if (size == 0) {
> -    promise->set(0);
> -    return;
> +    return 0;
>    }
>
> -  if (future.isDiscarded()) {
> -    promise->fail("Failed to poll: discarded future");
> -  } else if (future.isFailed()) {
> -    promise->fail(future.failure());
> -  } else {
> -    ssize_t length;
> -    if (flags == NONE) {
> -      length = os::read(fd, data, size);
> -    } else { // PEEK.
> -      // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK
> and the
> -      // error will be propagted out.
> -      // NOTE: We cast to `char*` here because the function prototypes on
> -      // Windows use `char*` instead of `void*`.
> -      length = net::recv(fd, (char*) data, size, MSG_PEEK);
> -    }
> -
> +  return loop(
> +      None(),
> +      [=]() -> Future<Option<size_t>> {
> +        // Because the file descriptor is non-blocking, we call
> +        // read()/recv() immediately. If no data is available than
> +        // we'll call `poll` and block. We also observed that for some
> +        // combination of libev and Linux kernel versions, the poll
> +        // would block for non-deterministically long periods of
> +        // time. This may be fixed in a newer version of libev (we use
> +        // 3.8 at the time of writing this comment).
> +        ssize_t length = [=]() {
> +          switch (flags) {
> +            case PEEK:
> +              // In case `fd` is not a socket os::recv() will fail
> +              // with ENOTSOCK and the error will be returned.
> +              //
> +              // NOTE: We cast to `char*` here because the function
> +              // prototypes on Windows use `char*` instead of `void*`.
> +              return net::recv(fd, (char*) data, size, MSG_PEEK);
> +            case NONE:
> +              return os::read(fd, data, size);
> +          }
> +        }();
> +
> +        if (length < 0) {
>  #ifdef __WINDOWS__
> -    int error = WSAGetLastError();
> +          int error = WSAGetLastError();
>  #else
> -    int error = errno;
> +          int error = errno;
>  #endif // __WINDOWS__
>
> -    if (length < 0) {
> -      if (net::is_restartable_error(error) ||
> net::is_retryable_error(error)) {
> -        // Restart the read operation.
> -        Future<short> future =
> -          io::poll(fd, process::io::READ).onAny(
> -              lambda::bind(&internal::read,
> -                           fd,
> -                           data,
> -                           size,
> -                           flags,
> -                           promise,
> -                           lambda::_1));
> -
> -        // Stop polling if a discard occurs on our future.
> -        promise->future().onDiscard(
> -            lambda::bind(&process::internal::discard<short>,
> -                         WeakFuture<short>(future)));
> -      } else {
> -        // Error occurred.
> -        promise->fail(os::strerror(errno));
> -      }
> -    } else {
> -      promise->set(length);
> -    }
> -  }
> +          if (!net::is_restartable_error(error) &&
> +              !net::is_retryable_error(error)) {
> +            // TODO(benh): Confirm that `os::strerror` does the right
> +            // thing for `error` on Windows.
> +            return Failure(os::strerror(error));
> +          }
> +
> +          return None();
> +        }
> +
> +        return length;
> +      },
> +      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>>
{
> +        // Restart/retry if we don't yet have a result.
> +        if (length.isNone()) {
> +          return io::poll(fd, io::READ)
> +            .then([](short event) -> ControlFlow<size_t> {
> +              CHECK_EQ(io::READ, event);
> +              return Continue();
> +            });
> +        }
> +        return Break(length.get());
> +      });
>  }
>
>
> -void write(
> -    int fd,
> -    const void* data,
> -    size_t size,
> -    const std::shared_ptr<Promise<size_t>>& promise,
> -    const Future<short>& future)
> +Future<size_t> write(int fd, const void* data, size_t size)
>  {
> -  // Ignore this function if the write operation has been discarded.
> -  if (promise->future().hasDiscard()) {
> -    promise->discard();
> -    return;
> -  }
> -
> +  // TODO(benh): Let the system calls do what ever they're supposed to
> +  // rather than return 0 here?
>    if (size == 0) {
> -    promise->set(0);
> -    return;
> +    return 0;
>    }
>
> -  if (future.isDiscarded()) {
> -    promise->fail("Failed to poll: discarded future");
> -  } else if (future.isFailed()) {
> -    promise->fail(future.failure());
> -  } else {
> -    ssize_t length = os::write(fd, data, size);
> +  return loop(
> +      None(),
> +      [=]() -> Future<Option<size_t>> {
> +        ssize_t length = os::write(fd, data, size);
>
> +        if (length < 0) {
>  #ifdef __WINDOWS__
> -    int error = WSAGetLastError();
> +          int error = WSAGetLastError();
>  #else
> -    int error = errno;
> +          int error = errno;
>  #endif // __WINDOWS__
>
> -    if (length < 0) {
> -      if (net::is_restartable_error(error) ||
> net::is_retryable_error(error)) {
> -        // Restart the write operation.
> -        Future<short> future =
> -          io::poll(fd, process::io::WRITE).onAny(
> -              lambda::bind(&internal::write,
> -                           fd,
> -                           data,
> -                           size,
> -                           promise,
> -                           lambda::_1));
> -
> -        // Stop polling if a discard occurs on our future.
> -        promise->future().onDiscard(
> -            lambda::bind(&process::internal::discard<short>,
> -                         WeakFuture<short>(future)));
> -      } else {
> -        // Error occurred.
> -        promise->fail(os::strerror(errno));
> -      }
> -    } else {
> -      // TODO(benh): Retry if 'length' is 0?
> -      promise->set(length);
> -    }
> -  }
> +          if (!net::is_restartable_error(error) &&
> +              !net::is_retryable_error(error)) {
> +            // TODO(benh): Confirm that `os::strerror` does the right
> +            // thing for `error` on Windows.
> +            return Failure(os::strerror(error));
> +          }
> +
> +          return None();
> +        }
> +
> +        return length;
> +      },
> +      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>>
{
> +        // Restart/retry if we don't yet have a result.
> +        if (length.isNone()) {
> +          return io::poll(fd, io::WRITE)
> +            .then([](short event) -> ControlFlow<size_t> {
> +              CHECK_EQ(io::WRITE, event);
> +              return Continue();
> +            });
> +        }
> +        return Break(length.get());
> +      });
>  }
>
>  } // namespace internal {
> @@ -177,32 +159,18 @@ Future<size_t> read(int fd, void* data, size_t size)
>  {
>    process::initialize();
>
> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
>    // Check the file descriptor.
>    Try<bool> nonblock = os::isNonblock(fd);
>    if (nonblock.isError()) {
>      // The file descriptor is not valid (e.g., has been closed).
> -    promise->fail(
> -        "Failed to check if file descriptor was non-blocking: " +
> -        nonblock.error());
> -    return promise->future();
> +    return Failure("Failed to check if file descriptor was non-blocking:
> " +
> +                   nonblock.error());
>    } else if (!nonblock.get()) {
>      // The file descriptor is not non-blocking.
> -    promise->fail("Expected a non-blocking file descriptor");
> -    return promise->future();
> +    return Failure("Expected a non-blocking file descriptor");
>    }
>
> -  // Because the file descriptor is non-blocking, we call read()
> -  // immediately. The read may in turn call poll if necessary,
> -  // avoiding unnecessary polling. We also observed that for some
> -  // combination of libev and Linux kernel versions, the poll would
> -  // block for non-deterministically long periods of time. This may be
> -  // fixed in a newer version of libev (we use 3.8 at the time of
> -  // writing this comment).
> -  internal::read(fd, data, size, internal::NONE, promise, io::READ);
> -
> -  return promise->future();
> +  return internal::read(fd, data, size);
>  }
>
>
> @@ -210,32 +178,19 @@ Future<size_t> write(int fd, const void* data,
> size_t size)
>  {
>    process::initialize();
>
> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
>    // Check the file descriptor.
>    Try<bool> nonblock = os::isNonblock(fd);
>    if (nonblock.isError()) {
>      // The file descriptor is not valid (e.g., has been closed).
> -    promise->fail(
> +    return Failure(
>          "Failed to check if file descriptor was non-blocking: " +
>          nonblock.error());
> -    return promise->future();
>    } else if (!nonblock.get()) {
>      // The file descriptor is not non-blocking.
> -    promise->fail("Expected a non-blocking file descriptor");
> -    return promise->future();
> +    return Failure("Expected a non-blocking file descriptor");
>    }
>
> -  // Because the file descriptor is non-blocking, we call write()
> -  // immediately. The write may in turn call poll if necessary,
> -  // avoiding unnecessary polling. We also observed that for some
> -  // combination of libev and Linux kernel versions, the poll would
> -  // block for non-deterministically long periods of time. This may be
> -  // fixed in a newer version of libev (we use 3.8 at the time of
> -  // writing this comment).
> -  internal::write(fd, data, size, promise, io::WRITE);
> -
> -  return promise->future();
> +  return internal::write(fd, data, size);
>  }
>
>
> @@ -280,43 +235,15 @@ Future<size_t> peek(int fd, void* data, size_t size,
> size_t limit)
>          nonblock.error());
>    }
>
> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
> -  // Because the file descriptor is non-blocking, we call read()
> -  // immediately. The read may in turn call poll if necessary,
> -  // avoiding unnecessary polling. We also observed that for some
> -  // combination of libev and Linux kernel versions, the poll would
> -  // block for non-deterministically long periods of time. This may be
> -  // fixed in a newer version of libev (we use 3.8 at the time of
> -  // writing this comment).
> -  internal::read(fd, data, limit, internal::PEEK, promise, io::READ);
> -
> -  // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
> -  promise->future().onAny([fd]() { os::close(fd); });
> -
> -  return promise->future();
> +  return internal::read(fd, data, limit, internal::PEEK)
> +    .onAny([fd]() {
> +      os::close(fd);
> +    });
>  }
>
>
>  namespace internal {
>
> -Future<string> _read(
> -    int fd,
> -    const std::shared_ptr<string>& buffer,
> -    const boost::shared_array<char>& data,
> -    size_t length)
> -{
> -  return io::read(fd, data.get(), length)
> -    .then([=](size_t size) -> Future<string> {
> -      if (size == 0) { // EOF.
> -        return string(*buffer);
> -      }
> -      buffer->append(data.get(), size);
> -      return _read(fd, buffer, data, length);
> -    });
> -}
> -
> -
>  Future<Nothing> splice(
>      int from,
>      int to,
> @@ -392,9 +319,21 @@ Future<string> read(int fd)
>    std::shared_ptr<string> buffer(new string());
>    boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
>
> -  // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
> -  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
> -    .onAny([fd]() { os::close(fd); });
> +  return loop(
> +      None(),
> +      [=]() {
> +        return io::read(fd, data.get(), BUFFERED_READ_SIZE);
> +      },
> +      [=](size_t length) -> ControlFlow<string> {
> +        if (length == 0) { // EOF.
> +          return Break(std::move(*buffer));
> +        }
> +        buffer->append(data.get(), length);
> +        return Continue();
> +      })
> +    .onAny([fd]() {
> +      os::close(fd);
> +    });
>  }
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message