mesos-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Park <mp...@apache.org>
Subject Re: [5/5] mesos git commit: Used `loop` in implementation of io::read and io::write.
Date Mon, 09 Jan 2017 23:45:25 GMT
Pushed a temporary fix in:

```
commit b492d4458c97b5e025e96ca229bc87a6e2500b40
Author: Jan Schlicht <jan@mesosphere.io>
Date:   Mon Jan 9 15:39:21 2017 -0800

    Removed unsupported `friend` declaration.

    The `friend` class declaration of nested, templated classes will raise
    an error when compiling with Clang. This is a temporary fix, resolved by
    making the constructor of `ControlFlow` public.

    Review: https://reviews.apache.org/r/55339/
```

On Mon, Jan 9, 2017 at 2:52 PM, Benjamin Mahler <bmahler@apache.org> wrote:

> Looks like there are some warnings for some of the loop changes:
>
> ../../../3rdparty/libprocess/src/io.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches
> end of non-void function [-Wreturn-type]
>          }();
>          ^
> mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo
> mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo
> ../../../3rdparty/libprocess/src/http.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control
> reaches end of non-void function [-Wreturn-type]
>        },
>        ^
> ../../../3rdparty/libprocess/src/http.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control
> reaches end of non-void function [-Wreturn-type]
>              }()
>              ^
>
> On Sun, Jan 8, 2017 at 7:27 PM, <benh@apache.org> wrote:
>
>> Used `loop` in implementation of io::read and io::write.
>>
>> Review: https://reviews.apache.org/r/54841
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198
>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198
>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198
>>
>> Branch: refs/heads/master
>> Commit: 2d28c198a5c09308825b2771483b70ac42839d16
>> Parents: 9984449
>> Author: Benjamin Hindman <benjamin.hindman@gmail.com>
>> Authored: Mon Dec 5 10:48:56 2016 -0800
>> Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
>> Committed: Sun Jan 8 19:27:02 2017 -0800
>>
>> ----------------------------------------------------------------------
>>  3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++----------------
>> ------
>>  1 file changed, 114 insertions(+), 175 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/3
>> rdparty/libprocess/src/io.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/io.cpp
>> b/3rdparty/libprocess/src/io.cpp
>> index d0b3ba1..8aa3576 100644
>> --- a/3rdparty/libprocess/src/io.cpp
>> +++ b/3rdparty/libprocess/src/io.cpp
>> @@ -42,132 +42,114 @@ enum ReadFlags
>>  };
>>
>>
>> -void read(
>> -    int fd,
>> -    void* data,
>> -    size_t size,
>> -    ReadFlags flags,
>> -    const std::shared_ptr<Promise<size_t>>& promise,
>> -    const Future<short>& future)
>> +Future<size_t> read(int fd, void* data, size_t size, ReadFlags flags =
>> NONE)
>>  {
>> -  // Ignore this function if the read operation has been discarded.
>> -  if (promise->future().hasDiscard()) {
>> -    CHECK(!future.isPending());
>> -    promise->discard();
>> -    return;
>> -  }
>> -
>> +  // TODO(benh): Let the system calls do what ever they're supposed to
>> +  // rather than return 0 here?
>>    if (size == 0) {
>> -    promise->set(0);
>> -    return;
>> +    return 0;
>>    }
>>
>> -  if (future.isDiscarded()) {
>> -    promise->fail("Failed to poll: discarded future");
>> -  } else if (future.isFailed()) {
>> -    promise->fail(future.failure());
>> -  } else {
>> -    ssize_t length;
>> -    if (flags == NONE) {
>> -      length = os::read(fd, data, size);
>> -    } else { // PEEK.
>> -      // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK
>> and the
>> -      // error will be propagted out.
>> -      // NOTE: We cast to `char*` here because the function prototypes on
>> -      // Windows use `char*` instead of `void*`.
>> -      length = net::recv(fd, (char*) data, size, MSG_PEEK);
>> -    }
>> -
>> +  return loop(
>> +      None(),
>> +      [=]() -> Future<Option<size_t>> {
>> +        // Because the file descriptor is non-blocking, we call
>> +        // read()/recv() immediately. If no data is available than
>> +        // we'll call `poll` and block. We also observed that for some
>> +        // combination of libev and Linux kernel versions, the poll
>> +        // would block for non-deterministically long periods of
>> +        // time. This may be fixed in a newer version of libev (we use
>> +        // 3.8 at the time of writing this comment).
>> +        ssize_t length = [=]() {
>> +          switch (flags) {
>> +            case PEEK:
>> +              // In case `fd` is not a socket os::recv() will fail
>> +              // with ENOTSOCK and the error will be returned.
>> +              //
>> +              // NOTE: We cast to `char*` here because the function
>> +              // prototypes on Windows use `char*` instead of `void*`.
>> +              return net::recv(fd, (char*) data, size, MSG_PEEK);
>> +            case NONE:
>> +              return os::read(fd, data, size);
>> +          }
>> +        }();
>> +
>> +        if (length < 0) {
>>  #ifdef __WINDOWS__
>> -    int error = WSAGetLastError();
>> +          int error = WSAGetLastError();
>>  #else
>> -    int error = errno;
>> +          int error = errno;
>>  #endif // __WINDOWS__
>>
>> -    if (length < 0) {
>> -      if (net::is_restartable_error(error) ||
>> net::is_retryable_error(error)) {
>> -        // Restart the read operation.
>> -        Future<short> future =
>> -          io::poll(fd, process::io::READ).onAny(
>> -              lambda::bind(&internal::read,
>> -                           fd,
>> -                           data,
>> -                           size,
>> -                           flags,
>> -                           promise,
>> -                           lambda::_1));
>> -
>> -        // Stop polling if a discard occurs on our future.
>> -        promise->future().onDiscard(
>> -            lambda::bind(&process::internal::discard<short>,
>> -                         WeakFuture<short>(future)));
>> -      } else {
>> -        // Error occurred.
>> -        promise->fail(os::strerror(errno));
>> -      }
>> -    } else {
>> -      promise->set(length);
>> -    }
>> -  }
>> +          if (!net::is_restartable_error(error) &&
>> +              !net::is_retryable_error(error)) {
>> +            // TODO(benh): Confirm that `os::strerror` does the right
>> +            // thing for `error` on Windows.
>> +            return Failure(os::strerror(error));
>> +          }
>> +
>> +          return None();
>> +        }
>> +
>> +        return length;
>> +      },
>> +      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>>
{
>> +        // Restart/retry if we don't yet have a result.
>> +        if (length.isNone()) {
>> +          return io::poll(fd, io::READ)
>> +            .then([](short event) -> ControlFlow<size_t> {
>> +              CHECK_EQ(io::READ, event);
>> +              return Continue();
>> +            });
>> +        }
>> +        return Break(length.get());
>> +      });
>>  }
>>
>>
>> -void write(
>> -    int fd,
>> -    const void* data,
>> -    size_t size,
>> -    const std::shared_ptr<Promise<size_t>>& promise,
>> -    const Future<short>& future)
>> +Future<size_t> write(int fd, const void* data, size_t size)
>>  {
>> -  // Ignore this function if the write operation has been discarded.
>> -  if (promise->future().hasDiscard()) {
>> -    promise->discard();
>> -    return;
>> -  }
>> -
>> +  // TODO(benh): Let the system calls do what ever they're supposed to
>> +  // rather than return 0 here?
>>    if (size == 0) {
>> -    promise->set(0);
>> -    return;
>> +    return 0;
>>    }
>>
>> -  if (future.isDiscarded()) {
>> -    promise->fail("Failed to poll: discarded future");
>> -  } else if (future.isFailed()) {
>> -    promise->fail(future.failure());
>> -  } else {
>> -    ssize_t length = os::write(fd, data, size);
>> +  return loop(
>> +      None(),
>> +      [=]() -> Future<Option<size_t>> {
>> +        ssize_t length = os::write(fd, data, size);
>>
>> +        if (length < 0) {
>>  #ifdef __WINDOWS__
>> -    int error = WSAGetLastError();
>> +          int error = WSAGetLastError();
>>  #else
>> -    int error = errno;
>> +          int error = errno;
>>  #endif // __WINDOWS__
>>
>> -    if (length < 0) {
>> -      if (net::is_restartable_error(error) ||
>> net::is_retryable_error(error)) {
>> -        // Restart the write operation.
>> -        Future<short> future =
>> -          io::poll(fd, process::io::WRITE).onAny(
>> -              lambda::bind(&internal::write,
>> -                           fd,
>> -                           data,
>> -                           size,
>> -                           promise,
>> -                           lambda::_1));
>> -
>> -        // Stop polling if a discard occurs on our future.
>> -        promise->future().onDiscard(
>> -            lambda::bind(&process::internal::discard<short>,
>> -                         WeakFuture<short>(future)));
>> -      } else {
>> -        // Error occurred.
>> -        promise->fail(os::strerror(errno));
>> -      }
>> -    } else {
>> -      // TODO(benh): Retry if 'length' is 0?
>> -      promise->set(length);
>> -    }
>> -  }
>> +          if (!net::is_restartable_error(error) &&
>> +              !net::is_retryable_error(error)) {
>> +            // TODO(benh): Confirm that `os::strerror` does the right
>> +            // thing for `error` on Windows.
>> +            return Failure(os::strerror(error));
>> +          }
>> +
>> +          return None();
>> +        }
>> +
>> +        return length;
>> +      },
>> +      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>>
{
>> +        // Restart/retry if we don't yet have a result.
>> +        if (length.isNone()) {
>> +          return io::poll(fd, io::WRITE)
>> +            .then([](short event) -> ControlFlow<size_t> {
>> +              CHECK_EQ(io::WRITE, event);
>> +              return Continue();
>> +            });
>> +        }
>> +        return Break(length.get());
>> +      });
>>  }
>>
>>  } // namespace internal {
>> @@ -177,32 +159,18 @@ Future<size_t> read(int fd, void* data, size_t size)
>>  {
>>    process::initialize();
>>
>> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>>    // Check the file descriptor.
>>    Try<bool> nonblock = os::isNonblock(fd);
>>    if (nonblock.isError()) {
>>      // The file descriptor is not valid (e.g., has been closed).
>> -    promise->fail(
>> -        "Failed to check if file descriptor was non-blocking: " +
>> -        nonblock.error());
>> -    return promise->future();
>> +    return Failure("Failed to check if file descriptor was non-blocking:
>> " +
>> +                   nonblock.error());
>>    } else if (!nonblock.get()) {
>>      // The file descriptor is not non-blocking.
>> -    promise->fail("Expected a non-blocking file descriptor");
>> -    return promise->future();
>> +    return Failure("Expected a non-blocking file descriptor");
>>    }
>>
>> -  // Because the file descriptor is non-blocking, we call read()
>> -  // immediately. The read may in turn call poll if necessary,
>> -  // avoiding unnecessary polling. We also observed that for some
>> -  // combination of libev and Linux kernel versions, the poll would
>> -  // block for non-deterministically long periods of time. This may be
>> -  // fixed in a newer version of libev (we use 3.8 at the time of
>> -  // writing this comment).
>> -  internal::read(fd, data, size, internal::NONE, promise, io::READ);
>> -
>> -  return promise->future();
>> +  return internal::read(fd, data, size);
>>  }
>>
>>
>> @@ -210,32 +178,19 @@ Future<size_t> write(int fd, const void* data,
>> size_t size)
>>  {
>>    process::initialize();
>>
>> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>>    // Check the file descriptor.
>>    Try<bool> nonblock = os::isNonblock(fd);
>>    if (nonblock.isError()) {
>>      // The file descriptor is not valid (e.g., has been closed).
>> -    promise->fail(
>> +    return Failure(
>>          "Failed to check if file descriptor was non-blocking: " +
>>          nonblock.error());
>> -    return promise->future();
>>    } else if (!nonblock.get()) {
>>      // The file descriptor is not non-blocking.
>> -    promise->fail("Expected a non-blocking file descriptor");
>> -    return promise->future();
>> +    return Failure("Expected a non-blocking file descriptor");
>>    }
>>
>> -  // Because the file descriptor is non-blocking, we call write()
>> -  // immediately. The write may in turn call poll if necessary,
>> -  // avoiding unnecessary polling. We also observed that for some
>> -  // combination of libev and Linux kernel versions, the poll would
>> -  // block for non-deterministically long periods of time. This may be
>> -  // fixed in a newer version of libev (we use 3.8 at the time of
>> -  // writing this comment).
>> -  internal::write(fd, data, size, promise, io::WRITE);
>> -
>> -  return promise->future();
>> +  return internal::write(fd, data, size);
>>  }
>>
>>
>> @@ -280,43 +235,15 @@ Future<size_t> peek(int fd, void* data, size_t
>> size, size_t limit)
>>          nonblock.error());
>>    }
>>
>> -  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>> -  // Because the file descriptor is non-blocking, we call read()
>> -  // immediately. The read may in turn call poll if necessary,
>> -  // avoiding unnecessary polling. We also observed that for some
>> -  // combination of libev and Linux kernel versions, the poll would
>> -  // block for non-deterministically long periods of time. This may be
>> -  // fixed in a newer version of libev (we use 3.8 at the time of
>> -  // writing this comment).
>> -  internal::read(fd, data, limit, internal::PEEK, promise, io::READ);
>> -
>> -  // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
>> -  promise->future().onAny([fd]() { os::close(fd); });
>> -
>> -  return promise->future();
>> +  return internal::read(fd, data, limit, internal::PEEK)
>> +    .onAny([fd]() {
>> +      os::close(fd);
>> +    });
>>  }
>>
>>
>>  namespace internal {
>>
>> -Future<string> _read(
>> -    int fd,
>> -    const std::shared_ptr<string>& buffer,
>> -    const boost::shared_array<char>& data,
>> -    size_t length)
>> -{
>> -  return io::read(fd, data.get(), length)
>> -    .then([=](size_t size) -> Future<string> {
>> -      if (size == 0) { // EOF.
>> -        return string(*buffer);
>> -      }
>> -      buffer->append(data.get(), size);
>> -      return _read(fd, buffer, data, length);
>> -    });
>> -}
>> -
>> -
>>  Future<Nothing> splice(
>>      int from,
>>      int to,
>> @@ -392,9 +319,21 @@ Future<string> read(int fd)
>>    std::shared_ptr<string> buffer(new string());
>>    boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
>>
>> -  // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
>> -  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
>> -    .onAny([fd]() { os::close(fd); });
>> +  return loop(
>> +      None(),
>> +      [=]() {
>> +        return io::read(fd, data.get(), BUFFERED_READ_SIZE);
>> +      },
>> +      [=](size_t length) -> ControlFlow<string> {
>> +        if (length == 0) { // EOF.
>> +          return Break(std::move(*buffer));
>> +        }
>> +        buffer->append(data.get(), length);
>> +        return Continue();
>> +      })
>> +    .onAny([fd]() {
>> +      os::close(fd);
>> +    });
>>  }
>>
>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message