Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A096D200C04 for ; Tue, 10 Jan 2017 00:45:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9D7D0160B49; Mon, 9 Jan 2017 23:45:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9BB34160B3E for ; Tue, 10 Jan 2017 00:45:49 +0100 (CET) Received: (qmail 76663 invoked by uid 500); 9 Jan 2017 23:45:48 -0000 Mailing-List: contact dev-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list dev@mesos.apache.org Received: (qmail 76625 invoked by uid 99); 9 Jan 2017 23:45:48 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 23:45:48 +0000 Received: from mail-wm0-f42.google.com (mail-wm0-f42.google.com [74.125.82.42]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 7DFE21A0649; Mon, 9 Jan 2017 23:45:47 +0000 (UTC) Received: by mail-wm0-f42.google.com with SMTP id k184so140416288wme.1; Mon, 09 Jan 2017 15:45:47 -0800 (PST) X-Gm-Message-State: AIkVDXKO8GQutWfp8RqyJpCqMV6ZIYPquKVLYWKKS44hVrb/GGfmGUr4bjhuP43NxbmkwU9X5jA/6HN1gPQZkQ== X-Received: by 10.223.146.97 with SMTP id 88mr37516wrj.81.1484005545897; Mon, 09 Jan 2017 15:45:45 -0800 (PST) MIME-Version: 1.0 Received: by 10.194.173.136 with HTTP; Mon, 9 Jan 2017 15:45:25 -0800 (PST) In-Reply-To: References: <76c885d11f494e7e944e32f52546d409@git.apache.org> <12979ef6d24a4d59904feb8286958808@git.apache.org> From: Michael Park Date: Mon, 9 Jan 2017 15:45:25 -0800 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [5/5] mesos git commit: Used `loop` in implementation of io::read and io::write. To: dev Cc: benh@apache.org, commits@mesos.apache.org Content-Type: multipart/alternative; boundary=94eb2c098c8e6885350545b1f756 archived-at: Mon, 09 Jan 2017 23:45:50 -0000 --94eb2c098c8e6885350545b1f756 Content-Type: text/plain; charset=UTF-8 Pushed a temporary fix in: ``` commit b492d4458c97b5e025e96ca229bc87a6e2500b40 Author: Jan Schlicht Date: Mon Jan 9 15:39:21 2017 -0800 Removed unsupported `friend` declaration. The `friend` class declaration of nested, templated classes will raise an error when compiling with Clang. This is a temporary fix, resolved by making the constructor of `ControlFlow` public. Review: https://reviews.apache.org/r/55339/ ``` On Mon, Jan 9, 2017 at 2:52 PM, Benjamin Mahler wrote: > Looks like there are some warnings for some of the loop changes: > > ../../../3rdparty/libprocess/src/io.cpp: In lambda function: > ../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches > end of non-void function [-Wreturn-type] > }(); > ^ > mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo > mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo > ../../../3rdparty/libprocess/src/http.cpp: In lambda function: > ../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control > reaches end of non-void function [-Wreturn-type] > }, > ^ > ../../../3rdparty/libprocess/src/http.cpp: In lambda function: > ../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control > reaches end of non-void function [-Wreturn-type] > }() > ^ > > On Sun, Jan 8, 2017 at 7:27 PM, wrote: > >> Used `loop` in implementation of io::read and io::write. >> >> Review: https://reviews.apache.org/r/54841 >> >> >> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo >> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198 >> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198 >> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198 >> >> Branch: refs/heads/master >> Commit: 2d28c198a5c09308825b2771483b70ac42839d16 >> Parents: 9984449 >> Author: Benjamin Hindman >> Authored: Mon Dec 5 10:48:56 2016 -0800 >> Committer: Benjamin Hindman >> Committed: Sun Jan 8 19:27:02 2017 -0800 >> >> ---------------------------------------------------------------------- >> 3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++---------------- >> ------ >> 1 file changed, 114 insertions(+), 175 deletions(-) >> ---------------------------------------------------------------------- >> >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/3 >> rdparty/libprocess/src/io.cpp >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/io.cpp >> b/3rdparty/libprocess/src/io.cpp >> index d0b3ba1..8aa3576 100644 >> --- a/3rdparty/libprocess/src/io.cpp >> +++ b/3rdparty/libprocess/src/io.cpp >> @@ -42,132 +42,114 @@ enum ReadFlags >> }; >> >> >> -void read( >> - int fd, >> - void* data, >> - size_t size, >> - ReadFlags flags, >> - const std::shared_ptr>& promise, >> - const Future& future) >> +Future read(int fd, void* data, size_t size, ReadFlags flags = >> NONE) >> { >> - // Ignore this function if the read operation has been discarded. >> - if (promise->future().hasDiscard()) { >> - CHECK(!future.isPending()); >> - promise->discard(); >> - return; >> - } >> - >> + // TODO(benh): Let the system calls do what ever they're supposed to >> + // rather than return 0 here? >> if (size == 0) { >> - promise->set(0); >> - return; >> + return 0; >> } >> >> - if (future.isDiscarded()) { >> - promise->fail("Failed to poll: discarded future"); >> - } else if (future.isFailed()) { >> - promise->fail(future.failure()); >> - } else { >> - ssize_t length; >> - if (flags == NONE) { >> - length = os::read(fd, data, size); >> - } else { // PEEK. >> - // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK >> and the >> - // error will be propagted out. >> - // NOTE: We cast to `char*` here because the function prototypes on >> - // Windows use `char*` instead of `void*`. >> - length = net::recv(fd, (char*) data, size, MSG_PEEK); >> - } >> - >> + return loop( >> + None(), >> + [=]() -> Future> { >> + // Because the file descriptor is non-blocking, we call >> + // read()/recv() immediately. If no data is available than >> + // we'll call `poll` and block. We also observed that for some >> + // combination of libev and Linux kernel versions, the poll >> + // would block for non-deterministically long periods of >> + // time. This may be fixed in a newer version of libev (we use >> + // 3.8 at the time of writing this comment). >> + ssize_t length = [=]() { >> + switch (flags) { >> + case PEEK: >> + // In case `fd` is not a socket os::recv() will fail >> + // with ENOTSOCK and the error will be returned. >> + // >> + // NOTE: We cast to `char*` here because the function >> + // prototypes on Windows use `char*` instead of `void*`. >> + return net::recv(fd, (char*) data, size, MSG_PEEK); >> + case NONE: >> + return os::read(fd, data, size); >> + } >> + }(); >> + >> + if (length < 0) { >> #ifdef __WINDOWS__ >> - int error = WSAGetLastError(); >> + int error = WSAGetLastError(); >> #else >> - int error = errno; >> + int error = errno; >> #endif // __WINDOWS__ >> >> - if (length < 0) { >> - if (net::is_restartable_error(error) || >> net::is_retryable_error(error)) { >> - // Restart the read operation. >> - Future future = >> - io::poll(fd, process::io::READ).onAny( >> - lambda::bind(&internal::read, >> - fd, >> - data, >> - size, >> - flags, >> - promise, >> - lambda::_1)); >> - >> - // Stop polling if a discard occurs on our future. >> - promise->future().onDiscard( >> - lambda::bind(&process::internal::discard, >> - WeakFuture(future))); >> - } else { >> - // Error occurred. >> - promise->fail(os::strerror(errno)); >> - } >> - } else { >> - promise->set(length); >> - } >> - } >> + if (!net::is_restartable_error(error) && >> + !net::is_retryable_error(error)) { >> + // TODO(benh): Confirm that `os::strerror` does the right >> + // thing for `error` on Windows. >> + return Failure(os::strerror(error)); >> + } >> + >> + return None(); >> + } >> + >> + return length; >> + }, >> + [=](const Option& length) -> Future> { >> + // Restart/retry if we don't yet have a result. >> + if (length.isNone()) { >> + return io::poll(fd, io::READ) >> + .then([](short event) -> ControlFlow { >> + CHECK_EQ(io::READ, event); >> + return Continue(); >> + }); >> + } >> + return Break(length.get()); >> + }); >> } >> >> >> -void write( >> - int fd, >> - const void* data, >> - size_t size, >> - const std::shared_ptr>& promise, >> - const Future& future) >> +Future write(int fd, const void* data, size_t size) >> { >> - // Ignore this function if the write operation has been discarded. >> - if (promise->future().hasDiscard()) { >> - promise->discard(); >> - return; >> - } >> - >> + // TODO(benh): Let the system calls do what ever they're supposed to >> + // rather than return 0 here? >> if (size == 0) { >> - promise->set(0); >> - return; >> + return 0; >> } >> >> - if (future.isDiscarded()) { >> - promise->fail("Failed to poll: discarded future"); >> - } else if (future.isFailed()) { >> - promise->fail(future.failure()); >> - } else { >> - ssize_t length = os::write(fd, data, size); >> + return loop( >> + None(), >> + [=]() -> Future> { >> + ssize_t length = os::write(fd, data, size); >> >> + if (length < 0) { >> #ifdef __WINDOWS__ >> - int error = WSAGetLastError(); >> + int error = WSAGetLastError(); >> #else >> - int error = errno; >> + int error = errno; >> #endif // __WINDOWS__ >> >> - if (length < 0) { >> - if (net::is_restartable_error(error) || >> net::is_retryable_error(error)) { >> - // Restart the write operation. >> - Future future = >> - io::poll(fd, process::io::WRITE).onAny( >> - lambda::bind(&internal::write, >> - fd, >> - data, >> - size, >> - promise, >> - lambda::_1)); >> - >> - // Stop polling if a discard occurs on our future. >> - promise->future().onDiscard( >> - lambda::bind(&process::internal::discard, >> - WeakFuture(future))); >> - } else { >> - // Error occurred. >> - promise->fail(os::strerror(errno)); >> - } >> - } else { >> - // TODO(benh): Retry if 'length' is 0? >> - promise->set(length); >> - } >> - } >> + if (!net::is_restartable_error(error) && >> + !net::is_retryable_error(error)) { >> + // TODO(benh): Confirm that `os::strerror` does the right >> + // thing for `error` on Windows. >> + return Failure(os::strerror(error)); >> + } >> + >> + return None(); >> + } >> + >> + return length; >> + }, >> + [=](const Option& length) -> Future> { >> + // Restart/retry if we don't yet have a result. >> + if (length.isNone()) { >> + return io::poll(fd, io::WRITE) >> + .then([](short event) -> ControlFlow { >> + CHECK_EQ(io::WRITE, event); >> + return Continue(); >> + }); >> + } >> + return Break(length.get()); >> + }); >> } >> >> } // namespace internal { >> @@ -177,32 +159,18 @@ Future read(int fd, void* data, size_t size) >> { >> process::initialize(); >> >> - std::shared_ptr> promise(new Promise()); >> - >> // Check the file descriptor. >> Try nonblock = os::isNonblock(fd); >> if (nonblock.isError()) { >> // The file descriptor is not valid (e.g., has been closed). >> - promise->fail( >> - "Failed to check if file descriptor was non-blocking: " + >> - nonblock.error()); >> - return promise->future(); >> + return Failure("Failed to check if file descriptor was non-blocking: >> " + >> + nonblock.error()); >> } else if (!nonblock.get()) { >> // The file descriptor is not non-blocking. >> - promise->fail("Expected a non-blocking file descriptor"); >> - return promise->future(); >> + return Failure("Expected a non-blocking file descriptor"); >> } >> >> - // Because the file descriptor is non-blocking, we call read() >> - // immediately. The read may in turn call poll if necessary, >> - // avoiding unnecessary polling. We also observed that for some >> - // combination of libev and Linux kernel versions, the poll would >> - // block for non-deterministically long periods of time. This may be >> - // fixed in a newer version of libev (we use 3.8 at the time of >> - // writing this comment). >> - internal::read(fd, data, size, internal::NONE, promise, io::READ); >> - >> - return promise->future(); >> + return internal::read(fd, data, size); >> } >> >> >> @@ -210,32 +178,19 @@ Future write(int fd, const void* data, >> size_t size) >> { >> process::initialize(); >> >> - std::shared_ptr> promise(new Promise()); >> - >> // Check the file descriptor. >> Try nonblock = os::isNonblock(fd); >> if (nonblock.isError()) { >> // The file descriptor is not valid (e.g., has been closed). >> - promise->fail( >> + return Failure( >> "Failed to check if file descriptor was non-blocking: " + >> nonblock.error()); >> - return promise->future(); >> } else if (!nonblock.get()) { >> // The file descriptor is not non-blocking. >> - promise->fail("Expected a non-blocking file descriptor"); >> - return promise->future(); >> + return Failure("Expected a non-blocking file descriptor"); >> } >> >> - // Because the file descriptor is non-blocking, we call write() >> - // immediately. The write may in turn call poll if necessary, >> - // avoiding unnecessary polling. We also observed that for some >> - // combination of libev and Linux kernel versions, the poll would >> - // block for non-deterministically long periods of time. This may be >> - // fixed in a newer version of libev (we use 3.8 at the time of >> - // writing this comment). >> - internal::write(fd, data, size, promise, io::WRITE); >> - >> - return promise->future(); >> + return internal::write(fd, data, size); >> } >> >> >> @@ -280,43 +235,15 @@ Future peek(int fd, void* data, size_t >> size, size_t limit) >> nonblock.error()); >> } >> >> - std::shared_ptr> promise(new Promise()); >> - >> - // Because the file descriptor is non-blocking, we call read() >> - // immediately. The read may in turn call poll if necessary, >> - // avoiding unnecessary polling. We also observed that for some >> - // combination of libev and Linux kernel versions, the poll would >> - // block for non-deterministically long periods of time. This may be >> - // fixed in a newer version of libev (we use 3.8 at the time of >> - // writing this comment). >> - internal::read(fd, data, limit, internal::PEEK, promise, io::READ); >> - >> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows. >> - promise->future().onAny([fd]() { os::close(fd); }); >> - >> - return promise->future(); >> + return internal::read(fd, data, limit, internal::PEEK) >> + .onAny([fd]() { >> + os::close(fd); >> + }); >> } >> >> >> namespace internal { >> >> -Future _read( >> - int fd, >> - const std::shared_ptr& buffer, >> - const boost::shared_array& data, >> - size_t length) >> -{ >> - return io::read(fd, data.get(), length) >> - .then([=](size_t size) -> Future { >> - if (size == 0) { // EOF. >> - return string(*buffer); >> - } >> - buffer->append(data.get(), size); >> - return _read(fd, buffer, data, length); >> - }); >> -} >> - >> - >> Future splice( >> int from, >> int to, >> @@ -392,9 +319,21 @@ Future read(int fd) >> std::shared_ptr buffer(new string()); >> boost::shared_array data(new char[BUFFERED_READ_SIZE]); >> >> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows. >> - return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE) >> - .onAny([fd]() { os::close(fd); }); >> + return loop( >> + None(), >> + [=]() { >> + return io::read(fd, data.get(), BUFFERED_READ_SIZE); >> + }, >> + [=](size_t length) -> ControlFlow { >> + if (length == 0) { // EOF. >> + return Break(std::move(*buffer)); >> + } >> + buffer->append(data.get(), length); >> + return Continue(); >> + }) >> + .onAny([fd]() { >> + os::close(fd); >> + }); >> } >> >> >> >> > --94eb2c098c8e6885350545b1f756--