diff options
author | Karen Arutyunov <karen@codesynthesis.com> | 2019-04-15 16:08:33 +0300 |
---|---|---|
committer | Karen Arutyunov <karen@codesynthesis.com> | 2019-04-15 16:09:23 +0300 |
commit | c371e6eaea1a4b6ce3bcd9b778cd5636b3304ea4 (patch) | |
tree | f12370b7fc83968c44f017dc280d2db3637a2120 | |
parent | ea26421a8e02c17a6b59ba4565a115cbefb91370 (diff) |
Add fdread() and fdselect()
-rw-r--r-- | libbutl/fdstream.cxx | 584 | ||||
-rw-r--r-- | libbutl/fdstream.mxx | 93 | ||||
-rw-r--r-- | tests/fdstream/buildfile | 3 | ||||
-rw-r--r-- | tests/fdstream/driver.cxx | 139 |
4 files changed, 673 insertions, 146 deletions
diff --git a/libbutl/fdstream.cxx b/libbutl/fdstream.cxx index 2d3ca45..c8b120e 100644 --- a/libbutl/fdstream.cxx +++ b/libbutl/fdstream.cxx @@ -9,12 +9,14 @@ #include <errno.h> // errno, E* #ifndef _WIN32 -# include <fcntl.h> // open(), O_*, fcntl() -# include <unistd.h> // close(), read(), write(), lseek(), dup(), pipe(), - // ftruncate(), isatty(), ssize_t, STD*_FILENO -# include <sys/uio.h> // writev(), iovec -# include <sys/stat.h> // stat(), S_I* -# include <sys/types.h> // stat, off_t +# include <fcntl.h> // open(), O_*, fcntl() +# include <unistd.h> // close(), read(), write(), lseek(), dup(), pipe(), + // ftruncate(), isatty(), ssize_t, STD*_FILENO +# include <sys/uio.h> // writev(), iovec +# include <sys/stat.h> // stat(), S_I* +# include <sys/time.h> // timeval +# include <sys/types.h> // stat, off_t +# include <sys/select.h> #else # include <libbutl/win32-utility.hxx> @@ -40,6 +42,7 @@ #include <ostream> #include <utility> #include <cstdint> +#include <cstddef> #include <ios> // ios_base::openmode, ios_base::failure #include <new> // bad_alloc @@ -65,6 +68,7 @@ import std.threading; // Clang wants it in purview (see process-details.hxx). #endif import butl.path; import butl.filesystem; +import butl.small_vector; #endif #endif @@ -96,21 +100,94 @@ namespace butl } } - // fdbuf +#ifdef _WIN32 + // Resolve a file descriptor to HANDLE. On the underlying OS error throw + // ios::failure or return INVALID_HANDLE_VALUE if ignore_error is true. // - void fdbuf:: - open (auto_fd&& fd, uint64_t pos) + // Note that such a handle is closed either when CloseHandle() is called on + // the HANDLE or when _close() is called on the associated file descriptor. + // So make sure that either the original file descriptor or the resulting + // HANDLE is closed but not both of them. + // + static HANDLE + fd_to_handle (int fd, bool ignore_error = false) { - close (); + HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd))); + if (h == INVALID_HANDLE_VALUE && !ignore_error) + throw_generic_ios_failure (errno); // EBADF (POSIX value). + return h; + } + + // Return the pipe handle and blocking mode or nullopt if the file + // descriptor is not a pipe. On the underlying OS error throw ios::failure + // or return nullopt if ignore_error is true. + // + struct handle_mode + { + HANDLE handle; + fdstream_mode mode; + }; + + static optional<handle_mode> + pipe_mode (int fd, bool ignore_error = false) + { + // We don't need to close it (see fd_to_handle()). + // + HANDLE h (fd_to_handle (fd, ignore_error)); + if (h == INVALID_HANDLE_VALUE) + return nullopt; + + // If we fail to obtain a pipe state for a valid handle, then we assume + // that this is not a pipe. + // + // Note that for other handle types GetLastError() may return different + // error codes depending on the type and if we run natively or under Wine. + // + DWORD m; + if (!GetNamedPipeHandleState (h, + &m, + nullptr /* lpCurInstances */, + nullptr /* lpMaxCollectionCount */, + nullptr /* lpCollectDataTimeout */, + nullptr /* lpUserName */, + 0 /* nMaxUserNameSize */)) + return nullopt; + + return handle_mode {h, + (m & PIPE_NOWAIT) != 0 + ? fdstream_mode::non_blocking + : fdstream_mode::blocking}; + } +#endif + + // fdbuf + // + // Return true if the file descriptor is in the non-blocking mode. Throw + // ios::failure on the underlying OS error. + // + static bool + non_blocking (int fd) + { #ifndef _WIN32 - int flags (fcntl (fd.get (), F_GETFL)); + int flags (fcntl (fd, F_GETFL)); if (flags == -1) throw_generic_ios_failure (errno); - non_blocking_ = (flags & O_NONBLOCK) == O_NONBLOCK; + return (flags & O_NONBLOCK) == O_NONBLOCK; +#else + optional<handle_mode> m (pipe_mode (fd)); + return m && m->mode == fdstream_mode::non_blocking; #endif + } + + void fdbuf:: + open (auto_fd&& fd, uint64_t pos) + { + close (); + + non_blocking_ = non_blocking (fd.get ()); setg (buf_, buf_, buf_); setp (buf_, buf_ + sizeof (buf_) - 1); // Keep space for overflow's char. @@ -118,6 +195,30 @@ namespace butl fd_ = move (fd); } + bool fdbuf:: + blocking (bool m) + { + // Verify that the file descriptor is open. + // + if (!is_open ()) + throw_generic_ios_failure (EBADF); // POSIX value. + + // Bail out if we are already in the requested mode. + // + if (m != non_blocking_) + return m; + + // Change the mode. + // + fdmode (fd (), m ? fdstream_mode::blocking : fdstream_mode::non_blocking); + + // Get the effective file descriptor blocking mode (see fdmode() for + // details). + // + non_blocking_ = non_blocking (fd ()); + return !m; + } + streamsize fdbuf:: showmanyc () { @@ -129,10 +230,9 @@ namespace butl if (n > 0) return n; -#ifndef _WIN32 if (non_blocking_) { - ssize_t n (read (fd_.get (), buf_, sizeof (buf_))); + streamsize n (fdread (fd_.get (), buf_, sizeof (buf_))); if (n == -1) { @@ -150,7 +250,6 @@ namespace butl return n; } -#endif return 0; } @@ -177,14 +276,6 @@ namespace butl return r; } -#ifdef _WIN32 - static inline int - read (int fd, void* buf, size_t n) - { - return _read (fd, buf, static_cast<unsigned int> (n)); - } -#endif - bool fdbuf:: load () { @@ -192,7 +283,7 @@ namespace butl // assert (!non_blocking_); - auto n (read (fd_.get (), buf_, sizeof (buf_))); + streamsize n (fdread (fd_.get (), buf_, sizeof (buf_))); if (n == -1) throw_generic_ios_failure (errno); @@ -220,7 +311,7 @@ namespace butl for (uint64_t n (off); n != 0; ) { size_t m (n > sizeof (buf_) ? sizeof (buf_) : static_cast<size_t> (n)); - auto r (read (fd_.get (), buf_, m)); + streamsize r (fdread (fd_.get (), buf_, m)); if (r == -1) throw_generic_ios_failure (errno); @@ -626,8 +717,8 @@ namespace butl mode (auto_fd fd, fdstream_mode m) { if (fd.get () >= 0 && - (flag (m, fdstream_mode::text) || - flag (m, fdstream_mode::binary) || + (flag (m, fdstream_mode::text) || + flag (m, fdstream_mode::binary) || flag (m, fdstream_mode::blocking) || flag (m, fdstream_mode::non_blocking))) fdmode (fd.get (), m); @@ -712,7 +803,20 @@ namespace butl // Clear the exception mask to prevent ignore() from throwing. // exceptions (goodbit); - ignore (numeric_limits<streamsize>::max ()); + + // The ignore() function doesn't support the non-blocking semantics as + // it extracts and discards characters until the limit is reached or EOF + // is encountered. That's why we switch to the blocking mode. + // + // It's highly unlikely to be unable to set the blocking mode for a + // valid fd. If, however, that happens, we can't do much about it. + // + try + { + buf_.blocking (true); + ignore (numeric_limits<streamsize>::max ()); + } + catch (const ios_base::failure&) {} } // Underlying file descriptor is closed by fdbuf dtor with errors (if any) @@ -736,7 +840,13 @@ namespace butl close () { if (skip_ && is_open () && good ()) + { + // The ignore() function doesn't support the non-blocking semantics (see + // above). + // + buf_.blocking (true); ignore (numeric_limits<streamsize>::max ()); + } buf_.close (); } @@ -1012,11 +1122,11 @@ namespace butl // auto dup = [fd] () -> auto_fd { - auto_fd nfd (::dup (fd)); - if (nfd.get () == -1) + auto_fd r (::dup (fd)); + if (r.get () == -1) throw_generic_ios_failure (errno); - return nfd; + return r; }; int f (fcntl (fd, F_GETFD)); @@ -1030,13 +1140,13 @@ namespace butl return dup (); slock l (process_spawn_mutex); - auto_fd nfd (dup ()); + auto_fd r (dup ()); - f = fcntl (nfd.get (), F_GETFD); - if (f == -1 || fcntl (nfd.get (), F_SETFD, f | FD_CLOEXEC) == -1) + f = fcntl (r.get (), F_GETFD); + if (f == -1 || fcntl (r.get (), F_SETFD, f | FD_CLOEXEC) == -1) throw_generic_ios_failure (errno); - return nfd; + return r; } bool @@ -1064,6 +1174,10 @@ namespace butl if (flags == -1) throw_generic_ios_failure (errno); + fdstream_mode r ((flags & O_NONBLOCK) == O_NONBLOCK + ? fdstream_mode::non_blocking + : fdstream_mode::blocking); + if (flag (m, fdstream_mode::blocking) || flag (m, fdstream_mode::non_blocking)) { @@ -1074,19 +1188,20 @@ namespace butl if (m != fdstream_mode::blocking && m != fdstream_mode::non_blocking) throw invalid_argument ("invalid blocking mode"); - int new_flags ( - m == fdstream_mode::non_blocking - ? flags | O_NONBLOCK - : flags & ~O_NONBLOCK); + // Set the new blocking mode if it differs from the current one. + // + if (m != r) + { + int new_flags (m == fdstream_mode::non_blocking + ? flags | O_NONBLOCK + : flags & ~O_NONBLOCK); - if (fcntl (fd, F_SETFL, new_flags) == -1) - throw_generic_ios_failure (errno); + if (fcntl (fd, F_SETFL, new_flags) == -1) + throw_generic_ios_failure (errno); + } } - return fdstream_mode::binary | - ((flags & O_NONBLOCK) == O_NONBLOCK - ? fdstream_mode::non_blocking - : fdstream_mode::blocking); + return r | fdstream_mode::binary; } int @@ -1163,6 +1278,99 @@ namespace butl throw_generic_ios_failure (errno); } + pair<size_t, size_t> + fdselect (fdselect_set& read, fdselect_set& write) + { + // Copy fdselect_set into the native fd_set, updating max_fd. Also clear + // the ready flag in the source set. + // + int max_fd (-1); + + auto copy_set = [&max_fd] (fdselect_set& from, fd_set& to) + { + FD_ZERO (&to); + + for (fdselect_state& s: from) + { + if (s.fd == nullfd) + continue; + + if (s.fd < 0) + throw invalid_argument ("invalid file descriptor"); + + FD_SET (s.fd, &to); + s.ready = false; + + if (max_fd < s.fd) + max_fd = s.fd; + } + }; + + fd_set rds; + fd_set wds; + copy_set (read, rds); + copy_set (write, wds); + + if (max_fd == -1) + throw invalid_argument ("empty file descriptor set"); + + ++max_fd; + + // Repeat the select() call while getting the EINTR error and throw on + // any other error. + // + // Note that select() doesn't modify the sets on failure (according to + // POSIX standard as well as to the Linux, FreeBSD and MacOS man pages). + // + for (;;) + { + int r (select (max_fd, + &rds, + &wds, + nullptr /* exceptfds */, + nullptr /* timeout */)); + + if (r == -1) + { + if (errno == EINTR) + continue; + + throw_system_ios_failure (errno); + } + + assert (r != 0); // We don't expect the timeout to occur. + break; + } + + // Set the resulting ready states. + // + auto copy_states = [] (const fd_set& from, fdselect_set& to) + { + size_t r (0); + for (fdselect_state& s: to) + { + if (s.fd == nullfd) + continue; + + if (FD_ISSET (s.fd, &from)) + { + ++r; + s.ready = true; + } + } + + return r; + }; + + return make_pair (copy_states (rds, read), copy_states (wds, write)); + } + + streamsize + fdread (int fd, void* buf, size_t n) + { + return read (fd, buf, n); + } + #else auto_fd @@ -1173,32 +1381,21 @@ namespace butl // copy the flag. To prevent this we will acquire the process_spawn_mutex // (see process-details header) prior to duplicating the descriptor. // - // We can not ammend file descriptors directly (nor obtain the flag value), - // so need to resolve them to Windows HANDLE first. Such handles are closed - // either when CloseHandle() is called for them or when _close() is called - // for the associated file descriptors. Make sure that either the original - // file descriptor or the resulting HANDLE is closed but not both of them. - // - auto handle = [] (int fd) -> HANDLE - { - HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd))); - if (h == INVALID_HANDLE_VALUE) - throw_generic_ios_failure (errno); // EBADF (POSIX value). - - return h; - }; - auto dup = [fd] () -> auto_fd { - auto_fd nfd (_dup (fd)); - if (nfd.get () == -1) + auto_fd r (_dup (fd)); + if (r.get () == -1) throw_generic_ios_failure (errno); - return nfd; + return r; }; + // We can not ammend file descriptors directly (nor obtain the flag value), + // so need to resolve them to Windows HANDLE first. Note that we don't + // need to close them (see fd_to_handle()). + // DWORD f; - if (!GetHandleInformation (handle (fd), &f)) + if (!GetHandleInformation (fd_to_handle (fd), &f)) throw_system_ios_failure (GetLastError ()); // If the source handle is inheritable then no flag copy is required (as @@ -1209,11 +1406,13 @@ namespace butl slock l (process_spawn_mutex); - auto_fd nfd (dup ()); - if (!SetHandleInformation (handle (nfd.get ()), HANDLE_FLAG_INHERIT, 0)) + auto_fd r (dup ()); + if (!SetHandleInformation (fd_to_handle (r.get ()), + HANDLE_FLAG_INHERIT, + 0 /* dwFlags */)) throw_system_ios_failure (GetLastError ()); - return nfd; + return r; } bool @@ -1275,28 +1474,96 @@ namespace butl fdstream_mode fdmode (int fd, fdstream_mode m) { - m &= fdstream_mode::text | fdstream_mode::binary; + fdstream_mode r; - // Should be exactly one translation flag specified. + // Get the current and set the new translation mode, if requested. // + auto translation_mode = [fd] (fdstream_mode m) + { + assert (m == fdstream_mode::binary || m == fdstream_mode::text); + + // Note that _setmode() preserves the _O_NOINHERIT flag value. + // + int r (_setmode (fd, m == fdstream_mode::binary ? _O_BINARY : _O_TEXT)); + if (r == -1) + throw_generic_ios_failure (errno); // EBADF or EINVAL (POSIX values). + + return (r & _O_BINARY) == _O_BINARY + ? fdstream_mode::binary + : fdstream_mode::text; + }; + // It would have been natural not to change translation mode if none of - // text or binary flags are passed. Unfortunatelly there is no (easy) way + // text or binary flags are passed. Unfortunately there is no (easy) way // to obtain the current mode for the file descriptor without setting a - // new one. This is why not specifying one of the modes is an error. + // new one. That's why we set the text mode (as the more common) to obtain + // the current mode and revert it back if we didn't guess it right. // - if (m != fdstream_mode::binary && m != fdstream_mode::text) - throw invalid_argument ("invalid translation mode"); + if (flag (m, fdstream_mode::text) || flag (m, fdstream_mode::binary)) + { + fdstream_mode tm (m & (fdstream_mode::text | fdstream_mode::binary)); + + // Should be exactly one translation flag specified. + // + if (tm != fdstream_mode::binary && tm != fdstream_mode::text) + throw invalid_argument ("invalid translation mode"); - // Note that _setmode() preserves the _O_NOINHERIT flag value. + r = translation_mode (tm); + } + else + { + r = translation_mode (fdstream_mode::text); + + // Restore the mode if misguessed. + // + if (r == fdstream_mode::binary) + translation_mode (r); + } + + // Get the current and set the new blocking mode, if requested. // - int r (_setmode (fd, m == fdstream_mode::binary ? _O_BINARY : _O_TEXT)); - if (r == -1) - throw_generic_ios_failure (errno); // EBADF or EINVAL (POSIX values). + // Note that we always assume the blocking (synchronous) mode for file + // descriptors other than pipes. + // + optional<handle_mode> pm (pipe_mode (fd)); + + if (pm) + { + r |= pm->mode; + + if (flag (m, fdstream_mode::blocking) || + flag (m, fdstream_mode::non_blocking)) + { + fdstream_mode bm ( + m & (fdstream_mode::blocking | fdstream_mode::non_blocking)); + + // Should be exactly one blocking mode flag specified. + // + if (bm != fdstream_mode::blocking && bm != fdstream_mode::non_blocking) + throw invalid_argument ("invalid blocking mode"); + + // Set the new blocking mode if it differs from the current one. + // + if (bm != pm->mode) + { + DWORD flags (bm == fdstream_mode::non_blocking + ? PIPE_NOWAIT + : PIPE_WAIT); + + if (!SetNamedPipeHandleState (pm->handle, + &flags, + nullptr /* lpMaxCollectionCount */, + nullptr /* lpCollectDataTimeout */)) + throw_system_ios_failure (GetLastError ()); + } + } + } + else + { + r |= fdstream_mode::blocking; + } - return fdstream_mode::blocking | - ((r & _O_BINARY) == _O_BINARY - ? fdstream_mode::binary - : fdstream_mode::text); + return r; } int @@ -1354,13 +1621,9 @@ namespace butl bool fdterm (int fd) { - // Resolve file descriptor to HANDLE. Note that the handle is closed either - // when CloseHandle() is called for it or when _close() is called for the - // associated file descriptor. So we don't need to close it. + // We don't need to close it (see fd_to_handle()). // - HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd))); - if (h == INVALID_HANDLE_VALUE) - throw_generic_ios_failure (errno); + HANDLE h (fd_to_handle (fd)); // Obtain the descriptor type. // @@ -1442,5 +1705,152 @@ namespace butl return false; } + + pair<size_t, size_t> + fdselect (fdselect_set& read, fdselect_set& write) + { + if (!write.empty ()) + throw invalid_argument ("write file descriptor set is not supported"); + + // Validate and prepare the read set. + // + size_t n (0); + + for (fdselect_state& s: read) + { + if (s.fd == nullfd) + continue; + + if (s.fd < 0) + throw invalid_argument ("invalid file descriptor"); + + s.ready = false; + ++n; + } + + if (n == 0) + throw invalid_argument ("empty file descriptor set"); + + // Keep iterating through the set until at least one byte can be read from + // any of the pipes or any of them get closed (so can read eof). + // + size_t r (0); + + while (true) + { + for (fdselect_state& s: read) + { + if (s.fd == nullfd) + continue; + + // Get the pipe handle. Note that PeekNamedPipe() doesn't care about + // the blocking mode. + // + optional<handle_mode> m (pipe_mode (s.fd)); + + if (!m) + throw invalid_argument ("file descriptor is not a pipe"); + + // Probe the pipe for data presence. + // + // Note that PeekNamedPipe() can block the thread execution in a + // multi-threaded program. It is not clear from the documentation + // under which circumstances and for how long this may happen. It + // seems to relate to some internal IO synchronization mechanism and + // probably depends on the contention level. But there doesn't appear + // to be a better way. Specifically, WaitForMultipleObjects() does not + // work on pipes (at least not on the non-overlapped ones). + // + char c; + DWORD n; + DWORD e (0); + + if (!PeekNamedPipe (m->handle, + &c, + 1, + &n, + nullptr /* lpTotalBytesAvail */, + nullptr /* lpBytesLeftThisMessage */)) + e = GetLastError (); + + // If we successfully peeked a byte or failed due to the broken pipe + // (the writing end is closed), then the descriptor is ready for + // reading. + // + if ((e == 0 && n == 1) || e == ERROR_BROKEN_PIPE) + { + s.ready = true; + ++r; + continue; + } + + // If we successfully read zero bytes, then there is no data available + // for reading. + // + // Note that we never get the ERROR_NO_DATA error, but let's check for + // good measure. + // + // As an observation, upon a successful call that peeks zero bytes, + // GetLastError() returns zero (ERROR_SUCCESS) if no data were read + // yet from the pipe and ERROR_NO_DATA otherwise. + // + if ((e == 0 && n == 0) || e == ERROR_NO_DATA) + continue; + + // Both successful cases (n == 1 || n == 0) are already handled. + // + assert (e != 0); + throw_system_ios_failure (e); + } + + // Bail out if some descriptors are ready for reading and sleep a bit + // and repeat otherwise. + // + if (r != 0) + break; + + Sleep (50); + } + + return make_pair (r, 0); + } + + streamsize + fdread (int fd, void* buf, size_t n) + { + // The _read() call fails with the EINVAL errno code either because + // something is wrong with the arguments or there is no data to read from + // a non-blocking pipe fd. In the latter case we translate the errno code + // to EAGAIN. + // + // Note that this behavior is not really documented. However, it is both + // described on the web and is confirmed by the tests. The ERROR_NO_DATA + // system error is referred in the ReadFile() documentation in the context + // of non-blocking pipes and is presumably set by the underlying + // ReadFile() call. + // + // Also note that this doesn't work on Wine. We can probably implement the + // non-blocking behavior for Wine using PeekNamedPipe() that is claimed to + // also provide the number of total bytes available. More research is + // required. + + // Clear last error to make sure ERROR_NO_DATA is set by _read(). + // + SetLastError (0); + + int r (_read (fd, buf, static_cast<unsigned int> (n))); + + if (r == -1 && errno == EINVAL && GetLastError () == ERROR_NO_DATA) + { + // Let's also check that the file descriptor is really a non-blocking + // pipe. + // + optional<handle_mode> pm (pipe_mode (fd, true /* ignore_error */)); + errno = pm && pm->mode == fdstream_mode::non_blocking ? EAGAIN : EINVAL; + } + + return r; + } + #endif } diff --git a/libbutl/fdstream.mxx b/libbutl/fdstream.mxx index e608a07..894f85b 100644 --- a/libbutl/fdstream.mxx +++ b/libbutl/fdstream.mxx @@ -9,12 +9,14 @@ #include <cassert> #ifndef __cpp_lib_modules +#include <ios> // streamsize #include <vector> #include <string> #include <istream> #include <ostream> -#include <utility> // move() +#include <utility> // move(), pair #include <cstdint> // uint16_t, uint64_t +#include <cstddef> // size_t #include <iterator> #endif @@ -29,9 +31,11 @@ import std.io; #endif import butl.path; import butl.filesystem; // permissions +import butl.small_vector; #else #include <libbutl/path.mxx> #include <libbutl/filesystem.mxx> +#include <libbutl/small-vector.mxx> #endif #include <libbutl/export.hxx> @@ -44,7 +48,12 @@ LIBBUTL_MODEXPORT namespace butl // The descriptor can be negative. Such a descriptor is treated as unopened // and is not closed. // - struct nullfd_t {constexpr explicit nullfd_t (int) {}}; + struct nullfd_t + { + constexpr explicit nullfd_t (int) {} + constexpr operator int () const {return -1;} + }; + #if defined(__cpp_modules) && defined(__clang__) //@@ MOD Clang duplicate sym. inline #endif @@ -113,7 +122,7 @@ LIBBUTL_MODEXPORT namespace butl // for reasoning and consider using non-standard tellg() and seekg() in // fdbuf, instead) // - non-blocking file descriptor is supported only by showmanyc() function - // and only on POSIX + // and only for pipes on Windows, in contrast to POSIX systems // - throws ios::failure in case of open(), read(), write(), close(), // seek[gp](), or tell[gp]() errors // - exception mask has at least badbit @@ -154,6 +163,15 @@ LIBBUTL_MODEXPORT namespace butl int fd () const {return fd_.get ();} + // Set the file descriptor blocking mode returning the previous mode on + // success and throwing ios::failure otherwise (see fdmode() for details). + // + // Note that besides calling fdmode(fd()), this function also updating its + // internal state according to the new mode. + // + bool + blocking (bool); + public: using base = std::basic_streambuf<char>; @@ -253,10 +271,11 @@ LIBBUTL_MODEXPORT namespace butl // The blocking/non_blocking flags determine whether the IO operation should // block or return control if currently there is no data to read or no room // to write. Only the istream::readsome() function supports the semantics of - // non-blocking operations. We also only support this on POSIX (Windows does - // not provide means for the non-blocking reading from a file descriptor so - // these flags are noop there). IO stream operations other than readsome() - // are illegal for non_blocking mode and result in the badbit being set. + // non-blocking operations. In contrast to POSIX systems, we only support + // this for pipes on Windows, always assuming the blocking mode for other + // file descriptors. IO stream operations other than readsome() are illegal + // in the non-blocking mode and result in the badbit being set (note that + // it is not the more appropriate failbit for implementation reasons). // enum class fdstream_mode: std::uint16_t { @@ -656,17 +675,20 @@ LIBBUTL_MODEXPORT namespace butl LIBBUTL_SYMEXPORT auto_fd fddup (int fd); - // Set the translation mode for the file descriptor. Throw invalid_argument - // for an invalid combination of flags. Return the previous mode on success, - // throw ios::failure otherwise. + // Set the translation and/or blocking modes for the file descriptor. Throw + // invalid_argument for an invalid combination of flags. Return the previous + // mode on success, throw ios::failure otherwise. // - // The text and binary flags are mutually exclusive on Windows. Due to - // implementation details at least one of them should be specified. On POSIX + // The text and binary flags are mutually exclusive on Windows. On POSIX // system the two modes are the same and so no check is performed. // - // The blocking and non-blocking flags are mutually exclusive on POSIX - // system. Non-blocking mode is not supported on Windows and so the blocking - // mode is assumed regardless of the flags. + // The blocking and non-blocking flags are mutually exclusive. In contrast + // to POSIX systems, on Windows the non-blocking mode is only supported for + // pipes, with the blocking mode assumed for other file descriptors + // regardless of the flags. + // + // Note that on Wine currently pipes always behave as blocking regardless of + // the mode set. // LIBBUTL_SYMEXPORT fdstream_mode fdmode (int, fdstream_mode); @@ -781,6 +803,47 @@ LIBBUTL_MODEXPORT namespace butl // LIBBUTL_SYMEXPORT bool fdterm (int); + + // Wait until one or more file descriptors becomes ready for reading or + // writing. Return the pair of numbers of descriptors that are ready. Throw + // std::invalid_argument if anything is wrong with arguments (both sets are + // empty, invalid fd, etc). Throw ios::failure on the underlying OS error. + // + // Note that the function clears all the previously-ready entries on each + // call. Entries with nullfd are ignored. + // + // On Windows only pipes and only their read ends are supported. + // + struct fdselect_state + { + int fd; + bool ready; + + // Note: intentionally non-explicit to allow implicit initialization when + // pushing to fdselect_set. + // + fdselect_state (int fd): fd (fd), ready (false) {} + }; + + using fdselect_set = small_vector<fdselect_state, 4>; + + LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t> + fdselect (fdselect_set& read, fdselect_set& write); + + // As above but wait up to the specified timeout returning a pair of zeroes + // if none of the descriptors became ready. + // + // LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t> + // fdselect (fdselect_set&, fdselect_set&, const duration& timeout); + + // POSIX read() function wrapper. In particular, it supports the semantics + // of non-blocking read for pipes on Windows. + // + // Note that on Wine currently pipes always behave as blocking regardless of + // the mode. + // + LIBBUTL_SYMEXPORT std::streamsize + fdread (int, void*, std::size_t); } #include <libbutl/fdstream.ixx> diff --git a/tests/fdstream/buildfile b/tests/fdstream/buildfile index f494dcf..f69bc95 100644 --- a/tests/fdstream/buildfile +++ b/tests/fdstream/buildfile @@ -6,3 +6,6 @@ import libs = libbutl%lib{butl} libs += $stdmod_lib exe{driver}: {hxx cxx}{*} $libs + +if ($cxx.target.class != "windows") + cxx.libs += -lpthread diff --git a/tests/fdstream/driver.cxx b/tests/fdstream/driver.cxx index 4063ac6..ee75a31 100644 --- a/tests/fdstream/driver.cxx +++ b/tests/fdstream/driver.cxx @@ -2,17 +2,21 @@ // copyright : Copyright (c) 2014-2019 Code Synthesis Ltd // license : MIT; see accompanying LICENSE file +#ifdef _WIN32 +# include <libbutl/win32-utility.hxx> +#endif + #include <cassert> #ifndef __cpp_lib_modules #ifndef _WIN32 # include <chrono> -# include <thread> // this_thread::sleep_for() #endif #include <ios> #include <string> #include <vector> +#include <thread> #include <iomanip> #include <sstream> #include <fstream> @@ -161,19 +165,19 @@ main (int argc, const char* argv[]) string s; getline (cin, s, '\0'); - size_t n (10); + size_t n (1000); for (size_t i (0); i < s.size (); i += n) { - cout.write (s.c_str () + i, min (n, s.size () - i)); - cout.flush (); - - // @@ MINGW GCC 4.9 doesn't implement this_thread. If ifdstream - // non-blocking read will ever be implemented on Windows use Win32 - // Sleep() instead. + // MINGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep(). // #ifndef _WIN32 this_thread::sleep_for (chrono::milliseconds (50)); +#else + Sleep (50); #endif + + cout.write (s.c_str () + i, min (n, s.size () - i)); + cout.flush (); } return 0; @@ -247,8 +251,8 @@ main (int argc, const char* argv[]) { // Fail to create if the file already exists. // - fdopen ( - f, fdopen_mode::out | fdopen_mode::create | fdopen_mode::exclusive); + fdopen (f, + fdopen_mode::out | fdopen_mode::create | fdopen_mode::exclusive); assert (false); } @@ -433,54 +437,101 @@ main (int argc, const char* argv[]) { } +#else + + // Check translation modes. + // + to_file (f, text1, fdopen_mode::truncate); + assert (from_file (f, fdopen_mode::binary) == text3); + + to_file (f, text3, fdopen_mode::truncate | fdopen_mode::binary); + assert (from_file (f) == text1); + +#endif + // Test non-blocking reading. // - try { + string s; + for (size_t i (0); i < 100; ++i) + s += "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\n"; + const char* args[] = {argv[0], "-c", nullptr}; - process pr (args, -1, -1); - ofdstream os (move (pr.out_fd)); - ifdstream is (move (pr.in_ofd), fdstream_mode::non_blocking); + auto test_read = [&args, &s] () + { + try + { + process pr (args, -1, -1); + ofdstream os (move (pr.out_fd)); + ifdstream is (move (pr.in_ofd), fdstream_mode::non_blocking); - const string s ( - "0123456789\nABCDEFGHIJKLMNOPQRSTUVWXYZ\nabcdefghijklmnopqrstuvwxyz"); + os << s; + os.close (); - os << s; - os.close (); + fdselect_set rds ({fdselect_state (is.fd ())}); + fdselect_set wds; - string r; - char buf[3]; - while (!is.eof ()) - { - streamsize n (is.readsome (buf, sizeof (buf))); - r.append (buf, n); - } + string r; + char buf[300]; + while (!is.eof ()) + { + pair<size_t, size_t> nd (fdselect (rds, wds)); - is.close (); + assert (nd.first == 1 && nd.second == 0 && rds[0].ready); - assert (r == s); - } - catch (const ios::failure&) - { - assert (false); - } - catch (const process_error&) - { - assert (false); - } + for (streamsize n; (n = is.readsome (buf, sizeof (buf))) != 0; ) + r.append (buf, n); + } -#else + is.close (); - // Check translation modes. - // - to_file (f, text1, fdopen_mode::truncate); - assert (from_file (f, fdopen_mode::binary) == text3); + assert (r == s); + } + catch (const ios::failure&) + { + assert (false); + } + catch (const process_error&) + { + assert (false); + } + }; - to_file (f, text3, fdopen_mode::truncate | fdopen_mode::binary); - assert (from_file (f) == text1); + vector<thread> threads; + for (size_t i (0); i < 10; ++i) + threads.emplace_back (test_read); -#endif + // While the threads are busy, let's test the skip/non_blocking modes + // combination. + // + try + { + process pr (args, -1, -1); + ofdstream os (move (pr.out_fd)); + + ifdstream is (move (pr.in_ofd), + fdstream_mode::non_blocking | fdstream_mode::skip); + + os << s; + os.close (); + + is.close (); // Set the blocking mode, skip and close. + } + catch (const ios::failure&) + { + assert (false); + } + catch (const process_error&) + { + assert (false); + } + + // Join the non-blocking reading test threads. + // + for (thread& t: threads) + t.join (); + } // Test setting and getting position via the non-standard fdbuf interface. // |