aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaren Arutyunov <karen@codesynthesis.com>2019-04-15 16:08:33 +0300
committerKaren Arutyunov <karen@codesynthesis.com>2019-04-15 16:09:23 +0300
commitc371e6eaea1a4b6ce3bcd9b778cd5636b3304ea4 (patch)
treef12370b7fc83968c44f017dc280d2db3637a2120
parentea26421a8e02c17a6b59ba4565a115cbefb91370 (diff)
Add fdread() and fdselect()
-rw-r--r--libbutl/fdstream.cxx584
-rw-r--r--libbutl/fdstream.mxx93
-rw-r--r--tests/fdstream/buildfile3
-rw-r--r--tests/fdstream/driver.cxx139
4 files changed, 673 insertions, 146 deletions
diff --git a/libbutl/fdstream.cxx b/libbutl/fdstream.cxx
index 2d3ca45..c8b120e 100644
--- a/libbutl/fdstream.cxx
+++ b/libbutl/fdstream.cxx
@@ -9,12 +9,14 @@
#include <errno.h> // errno, E*
#ifndef _WIN32
-# include <fcntl.h> // open(), O_*, fcntl()
-# include <unistd.h> // close(), read(), write(), lseek(), dup(), pipe(),
- // ftruncate(), isatty(), ssize_t, STD*_FILENO
-# include <sys/uio.h> // writev(), iovec
-# include <sys/stat.h> // stat(), S_I*
-# include <sys/types.h> // stat, off_t
+# include <fcntl.h> // open(), O_*, fcntl()
+# include <unistd.h> // close(), read(), write(), lseek(), dup(), pipe(),
+ // ftruncate(), isatty(), ssize_t, STD*_FILENO
+# include <sys/uio.h> // writev(), iovec
+# include <sys/stat.h> // stat(), S_I*
+# include <sys/time.h> // timeval
+# include <sys/types.h> // stat, off_t
+# include <sys/select.h>
#else
# include <libbutl/win32-utility.hxx>
@@ -40,6 +42,7 @@
#include <ostream>
#include <utility>
#include <cstdint>
+#include <cstddef>
#include <ios> // ios_base::openmode, ios_base::failure
#include <new> // bad_alloc
@@ -65,6 +68,7 @@ import std.threading; // Clang wants it in purview (see process-details.hxx).
#endif
import butl.path;
import butl.filesystem;
+import butl.small_vector;
#endif
#endif
@@ -96,21 +100,94 @@ namespace butl
}
}
- // fdbuf
+#ifdef _WIN32
+ // Resolve a file descriptor to HANDLE. On the underlying OS error throw
+ // ios::failure or return INVALID_HANDLE_VALUE if ignore_error is true.
//
- void fdbuf::
- open (auto_fd&& fd, uint64_t pos)
+ // Note that such a handle is closed either when CloseHandle() is called on
+ // the HANDLE or when _close() is called on the associated file descriptor.
+ // So make sure that either the original file descriptor or the resulting
+ // HANDLE is closed but not both of them.
+ //
+ static HANDLE
+ fd_to_handle (int fd, bool ignore_error = false)
{
- close ();
+ HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd)));
+ if (h == INVALID_HANDLE_VALUE && !ignore_error)
+ throw_generic_ios_failure (errno); // EBADF (POSIX value).
+ return h;
+ }
+
+ // Return the pipe handle and blocking mode or nullopt if the file
+ // descriptor is not a pipe. On the underlying OS error throw ios::failure
+ // or return nullopt if ignore_error is true.
+ //
+ struct handle_mode
+ {
+ HANDLE handle;
+ fdstream_mode mode;
+ };
+
+ static optional<handle_mode>
+ pipe_mode (int fd, bool ignore_error = false)
+ {
+ // We don't need to close it (see fd_to_handle()).
+ //
+ HANDLE h (fd_to_handle (fd, ignore_error));
+ if (h == INVALID_HANDLE_VALUE)
+ return nullopt;
+
+ // If we fail to obtain a pipe state for a valid handle, then we assume
+ // that this is not a pipe.
+ //
+ // Note that for other handle types GetLastError() may return different
+ // error codes depending on the type and if we run natively or under Wine.
+ //
+ DWORD m;
+ if (!GetNamedPipeHandleState (h,
+ &m,
+ nullptr /* lpCurInstances */,
+ nullptr /* lpMaxCollectionCount */,
+ nullptr /* lpCollectDataTimeout */,
+ nullptr /* lpUserName */,
+ 0 /* nMaxUserNameSize */))
+ return nullopt;
+
+ return handle_mode {h,
+ (m & PIPE_NOWAIT) != 0
+ ? fdstream_mode::non_blocking
+ : fdstream_mode::blocking};
+ }
+#endif
+
+ // fdbuf
+ //
+ // Return true if the file descriptor is in the non-blocking mode. Throw
+ // ios::failure on the underlying OS error.
+ //
+ static bool
+ non_blocking (int fd)
+ {
#ifndef _WIN32
- int flags (fcntl (fd.get (), F_GETFL));
+ int flags (fcntl (fd, F_GETFL));
if (flags == -1)
throw_generic_ios_failure (errno);
- non_blocking_ = (flags & O_NONBLOCK) == O_NONBLOCK;
+ return (flags & O_NONBLOCK) == O_NONBLOCK;
+#else
+ optional<handle_mode> m (pipe_mode (fd));
+ return m && m->mode == fdstream_mode::non_blocking;
#endif
+ }
+
+ void fdbuf::
+ open (auto_fd&& fd, uint64_t pos)
+ {
+ close ();
+
+ non_blocking_ = non_blocking (fd.get ());
setg (buf_, buf_, buf_);
setp (buf_, buf_ + sizeof (buf_) - 1); // Keep space for overflow's char.
@@ -118,6 +195,30 @@ namespace butl
fd_ = move (fd);
}
+ bool fdbuf::
+ blocking (bool m)
+ {
+ // Verify that the file descriptor is open.
+ //
+ if (!is_open ())
+ throw_generic_ios_failure (EBADF); // POSIX value.
+
+ // Bail out if we are already in the requested mode.
+ //
+ if (m != non_blocking_)
+ return m;
+
+ // Change the mode.
+ //
+ fdmode (fd (), m ? fdstream_mode::blocking : fdstream_mode::non_blocking);
+
+ // Get the effective file descriptor blocking mode (see fdmode() for
+ // details).
+ //
+ non_blocking_ = non_blocking (fd ());
+ return !m;
+ }
+
streamsize fdbuf::
showmanyc ()
{
@@ -129,10 +230,9 @@ namespace butl
if (n > 0)
return n;
-#ifndef _WIN32
if (non_blocking_)
{
- ssize_t n (read (fd_.get (), buf_, sizeof (buf_)));
+ streamsize n (fdread (fd_.get (), buf_, sizeof (buf_)));
if (n == -1)
{
@@ -150,7 +250,6 @@ namespace butl
return n;
}
-#endif
return 0;
}
@@ -177,14 +276,6 @@ namespace butl
return r;
}
-#ifdef _WIN32
- static inline int
- read (int fd, void* buf, size_t n)
- {
- return _read (fd, buf, static_cast<unsigned int> (n));
- }
-#endif
-
bool fdbuf::
load ()
{
@@ -192,7 +283,7 @@ namespace butl
//
assert (!non_blocking_);
- auto n (read (fd_.get (), buf_, sizeof (buf_)));
+ streamsize n (fdread (fd_.get (), buf_, sizeof (buf_)));
if (n == -1)
throw_generic_ios_failure (errno);
@@ -220,7 +311,7 @@ namespace butl
for (uint64_t n (off); n != 0; )
{
size_t m (n > sizeof (buf_) ? sizeof (buf_) : static_cast<size_t> (n));
- auto r (read (fd_.get (), buf_, m));
+ streamsize r (fdread (fd_.get (), buf_, m));
if (r == -1)
throw_generic_ios_failure (errno);
@@ -626,8 +717,8 @@ namespace butl
mode (auto_fd fd, fdstream_mode m)
{
if (fd.get () >= 0 &&
- (flag (m, fdstream_mode::text) ||
- flag (m, fdstream_mode::binary) ||
+ (flag (m, fdstream_mode::text) ||
+ flag (m, fdstream_mode::binary) ||
flag (m, fdstream_mode::blocking) ||
flag (m, fdstream_mode::non_blocking)))
fdmode (fd.get (), m);
@@ -712,7 +803,20 @@ namespace butl
// Clear the exception mask to prevent ignore() from throwing.
//
exceptions (goodbit);
- ignore (numeric_limits<streamsize>::max ());
+
+ // The ignore() function doesn't support the non-blocking semantics as
+ // it extracts and discards characters until the limit is reached or EOF
+ // is encountered. That's why we switch to the blocking mode.
+ //
+ // It's highly unlikely to be unable to set the blocking mode for a
+ // valid fd. If, however, that happens, we can't do much about it.
+ //
+ try
+ {
+ buf_.blocking (true);
+ ignore (numeric_limits<streamsize>::max ());
+ }
+ catch (const ios_base::failure&) {}
}
// Underlying file descriptor is closed by fdbuf dtor with errors (if any)
@@ -736,7 +840,13 @@ namespace butl
close ()
{
if (skip_ && is_open () && good ())
+ {
+ // The ignore() function doesn't support the non-blocking semantics (see
+ // above).
+ //
+ buf_.blocking (true);
ignore (numeric_limits<streamsize>::max ());
+ }
buf_.close ();
}
@@ -1012,11 +1122,11 @@ namespace butl
//
auto dup = [fd] () -> auto_fd
{
- auto_fd nfd (::dup (fd));
- if (nfd.get () == -1)
+ auto_fd r (::dup (fd));
+ if (r.get () == -1)
throw_generic_ios_failure (errno);
- return nfd;
+ return r;
};
int f (fcntl (fd, F_GETFD));
@@ -1030,13 +1140,13 @@ namespace butl
return dup ();
slock l (process_spawn_mutex);
- auto_fd nfd (dup ());
+ auto_fd r (dup ());
- f = fcntl (nfd.get (), F_GETFD);
- if (f == -1 || fcntl (nfd.get (), F_SETFD, f | FD_CLOEXEC) == -1)
+ f = fcntl (r.get (), F_GETFD);
+ if (f == -1 || fcntl (r.get (), F_SETFD, f | FD_CLOEXEC) == -1)
throw_generic_ios_failure (errno);
- return nfd;
+ return r;
}
bool
@@ -1064,6 +1174,10 @@ namespace butl
if (flags == -1)
throw_generic_ios_failure (errno);
+ fdstream_mode r ((flags & O_NONBLOCK) == O_NONBLOCK
+ ? fdstream_mode::non_blocking
+ : fdstream_mode::blocking);
+
if (flag (m, fdstream_mode::blocking) ||
flag (m, fdstream_mode::non_blocking))
{
@@ -1074,19 +1188,20 @@ namespace butl
if (m != fdstream_mode::blocking && m != fdstream_mode::non_blocking)
throw invalid_argument ("invalid blocking mode");
- int new_flags (
- m == fdstream_mode::non_blocking
- ? flags | O_NONBLOCK
- : flags & ~O_NONBLOCK);
+ // Set the new blocking mode if it differs from the current one.
+ //
+ if (m != r)
+ {
+ int new_flags (m == fdstream_mode::non_blocking
+ ? flags | O_NONBLOCK
+ : flags & ~O_NONBLOCK);
- if (fcntl (fd, F_SETFL, new_flags) == -1)
- throw_generic_ios_failure (errno);
+ if (fcntl (fd, F_SETFL, new_flags) == -1)
+ throw_generic_ios_failure (errno);
+ }
}
- return fdstream_mode::binary |
- ((flags & O_NONBLOCK) == O_NONBLOCK
- ? fdstream_mode::non_blocking
- : fdstream_mode::blocking);
+ return r | fdstream_mode::binary;
}
int
@@ -1163,6 +1278,99 @@ namespace butl
throw_generic_ios_failure (errno);
}
+ pair<size_t, size_t>
+ fdselect (fdselect_set& read, fdselect_set& write)
+ {
+ // Copy fdselect_set into the native fd_set, updating max_fd. Also clear
+ // the ready flag in the source set.
+ //
+ int max_fd (-1);
+
+ auto copy_set = [&max_fd] (fdselect_set& from, fd_set& to)
+ {
+ FD_ZERO (&to);
+
+ for (fdselect_state& s: from)
+ {
+ if (s.fd == nullfd)
+ continue;
+
+ if (s.fd < 0)
+ throw invalid_argument ("invalid file descriptor");
+
+ FD_SET (s.fd, &to);
+ s.ready = false;
+
+ if (max_fd < s.fd)
+ max_fd = s.fd;
+ }
+ };
+
+ fd_set rds;
+ fd_set wds;
+ copy_set (read, rds);
+ copy_set (write, wds);
+
+ if (max_fd == -1)
+ throw invalid_argument ("empty file descriptor set");
+
+ ++max_fd;
+
+ // Repeat the select() call while getting the EINTR error and throw on
+ // any other error.
+ //
+ // Note that select() doesn't modify the sets on failure (according to
+ // POSIX standard as well as to the Linux, FreeBSD and MacOS man pages).
+ //
+ for (;;)
+ {
+ int r (select (max_fd,
+ &rds,
+ &wds,
+ nullptr /* exceptfds */,
+ nullptr /* timeout */));
+
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ throw_system_ios_failure (errno);
+ }
+
+ assert (r != 0); // We don't expect the timeout to occur.
+ break;
+ }
+
+ // Set the resulting ready states.
+ //
+ auto copy_states = [] (const fd_set& from, fdselect_set& to)
+ {
+ size_t r (0);
+ for (fdselect_state& s: to)
+ {
+ if (s.fd == nullfd)
+ continue;
+
+ if (FD_ISSET (s.fd, &from))
+ {
+ ++r;
+ s.ready = true;
+ }
+ }
+
+ return r;
+ };
+
+ return make_pair (copy_states (rds, read), copy_states (wds, write));
+ }
+
+ streamsize
+ fdread (int fd, void* buf, size_t n)
+ {
+ return read (fd, buf, n);
+ }
+
#else
auto_fd
@@ -1173,32 +1381,21 @@ namespace butl
// copy the flag. To prevent this we will acquire the process_spawn_mutex
// (see process-details header) prior to duplicating the descriptor.
//
- // We can not ammend file descriptors directly (nor obtain the flag value),
- // so need to resolve them to Windows HANDLE first. Such handles are closed
- // either when CloseHandle() is called for them or when _close() is called
- // for the associated file descriptors. Make sure that either the original
- // file descriptor or the resulting HANDLE is closed but not both of them.
- //
- auto handle = [] (int fd) -> HANDLE
- {
- HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd)));
- if (h == INVALID_HANDLE_VALUE)
- throw_generic_ios_failure (errno); // EBADF (POSIX value).
-
- return h;
- };
-
auto dup = [fd] () -> auto_fd
{
- auto_fd nfd (_dup (fd));
- if (nfd.get () == -1)
+ auto_fd r (_dup (fd));
+ if (r.get () == -1)
throw_generic_ios_failure (errno);
- return nfd;
+ return r;
};
+ // We can not ammend file descriptors directly (nor obtain the flag value),
+ // so need to resolve them to Windows HANDLE first. Note that we don't
+ // need to close them (see fd_to_handle()).
+ //
DWORD f;
- if (!GetHandleInformation (handle (fd), &f))
+ if (!GetHandleInformation (fd_to_handle (fd), &f))
throw_system_ios_failure (GetLastError ());
// If the source handle is inheritable then no flag copy is required (as
@@ -1209,11 +1406,13 @@ namespace butl
slock l (process_spawn_mutex);
- auto_fd nfd (dup ());
- if (!SetHandleInformation (handle (nfd.get ()), HANDLE_FLAG_INHERIT, 0))
+ auto_fd r (dup ());
+ if (!SetHandleInformation (fd_to_handle (r.get ()),
+ HANDLE_FLAG_INHERIT,
+ 0 /* dwFlags */))
throw_system_ios_failure (GetLastError ());
- return nfd;
+ return r;
}
bool
@@ -1275,28 +1474,96 @@ namespace butl
fdstream_mode
fdmode (int fd, fdstream_mode m)
{
- m &= fdstream_mode::text | fdstream_mode::binary;
+ fdstream_mode r;
- // Should be exactly one translation flag specified.
+ // Get the current and set the new translation mode, if requested.
//
+ auto translation_mode = [fd] (fdstream_mode m)
+ {
+ assert (m == fdstream_mode::binary || m == fdstream_mode::text);
+
+ // Note that _setmode() preserves the _O_NOINHERIT flag value.
+ //
+ int r (_setmode (fd, m == fdstream_mode::binary ? _O_BINARY : _O_TEXT));
+ if (r == -1)
+ throw_generic_ios_failure (errno); // EBADF or EINVAL (POSIX values).
+
+ return (r & _O_BINARY) == _O_BINARY
+ ? fdstream_mode::binary
+ : fdstream_mode::text;
+ };
+
// It would have been natural not to change translation mode if none of
- // text or binary flags are passed. Unfortunatelly there is no (easy) way
+ // text or binary flags are passed. Unfortunately there is no (easy) way
// to obtain the current mode for the file descriptor without setting a
- // new one. This is why not specifying one of the modes is an error.
+ // new one. That's why we set the text mode (as the more common) to obtain
+ // the current mode and revert it back if we didn't guess it right.
//
- if (m != fdstream_mode::binary && m != fdstream_mode::text)
- throw invalid_argument ("invalid translation mode");
+ if (flag (m, fdstream_mode::text) || flag (m, fdstream_mode::binary))
+ {
+ fdstream_mode tm (m & (fdstream_mode::text | fdstream_mode::binary));
+
+ // Should be exactly one translation flag specified.
+ //
+ if (tm != fdstream_mode::binary && tm != fdstream_mode::text)
+ throw invalid_argument ("invalid translation mode");
- // Note that _setmode() preserves the _O_NOINHERIT flag value.
+ r = translation_mode (tm);
+ }
+ else
+ {
+ r = translation_mode (fdstream_mode::text);
+
+ // Restore the mode if misguessed.
+ //
+ if (r == fdstream_mode::binary)
+ translation_mode (r);
+ }
+
+ // Get the current and set the new blocking mode, if requested.
//
- int r (_setmode (fd, m == fdstream_mode::binary ? _O_BINARY : _O_TEXT));
- if (r == -1)
- throw_generic_ios_failure (errno); // EBADF or EINVAL (POSIX values).
+ // Note that we always assume the blocking (synchronous) mode for file
+ // descriptors other than pipes.
+ //
+ optional<handle_mode> pm (pipe_mode (fd));
+
+ if (pm)
+ {
+ r |= pm->mode;
+
+ if (flag (m, fdstream_mode::blocking) ||
+ flag (m, fdstream_mode::non_blocking))
+ {
+ fdstream_mode bm (
+ m & (fdstream_mode::blocking | fdstream_mode::non_blocking));
+
+ // Should be exactly one blocking mode flag specified.
+ //
+ if (bm != fdstream_mode::blocking && bm != fdstream_mode::non_blocking)
+ throw invalid_argument ("invalid blocking mode");
+
+ // Set the new blocking mode if it differs from the current one.
+ //
+ if (bm != pm->mode)
+ {
+ DWORD flags (bm == fdstream_mode::non_blocking
+ ? PIPE_NOWAIT
+ : PIPE_WAIT);
+
+ if (!SetNamedPipeHandleState (pm->handle,
+ &flags,
+ nullptr /* lpMaxCollectionCount */,
+ nullptr /* lpCollectDataTimeout */))
+ throw_system_ios_failure (GetLastError ());
+ }
+ }
+ }
+ else
+ {
+ r |= fdstream_mode::blocking;
+ }
- return fdstream_mode::blocking |
- ((r & _O_BINARY) == _O_BINARY
- ? fdstream_mode::binary
- : fdstream_mode::text);
+ return r;
}
int
@@ -1354,13 +1621,9 @@ namespace butl
bool
fdterm (int fd)
{
- // Resolve file descriptor to HANDLE. Note that the handle is closed either
- // when CloseHandle() is called for it or when _close() is called for the
- // associated file descriptor. So we don't need to close it.
+ // We don't need to close it (see fd_to_handle()).
//
- HANDLE h (reinterpret_cast<HANDLE> (_get_osfhandle (fd)));
- if (h == INVALID_HANDLE_VALUE)
- throw_generic_ios_failure (errno);
+ HANDLE h (fd_to_handle (fd));
// Obtain the descriptor type.
//
@@ -1442,5 +1705,152 @@ namespace butl
return false;
}
+
+ pair<size_t, size_t>
+ fdselect (fdselect_set& read, fdselect_set& write)
+ {
+ if (!write.empty ())
+ throw invalid_argument ("write file descriptor set is not supported");
+
+ // Validate and prepare the read set.
+ //
+ size_t n (0);
+
+ for (fdselect_state& s: read)
+ {
+ if (s.fd == nullfd)
+ continue;
+
+ if (s.fd < 0)
+ throw invalid_argument ("invalid file descriptor");
+
+ s.ready = false;
+ ++n;
+ }
+
+ if (n == 0)
+ throw invalid_argument ("empty file descriptor set");
+
+ // Keep iterating through the set until at least one byte can be read from
+ // any of the pipes or any of them get closed (so can read eof).
+ //
+ size_t r (0);
+
+ while (true)
+ {
+ for (fdselect_state& s: read)
+ {
+ if (s.fd == nullfd)
+ continue;
+
+ // Get the pipe handle. Note that PeekNamedPipe() doesn't care about
+ // the blocking mode.
+ //
+ optional<handle_mode> m (pipe_mode (s.fd));
+
+ if (!m)
+ throw invalid_argument ("file descriptor is not a pipe");
+
+ // Probe the pipe for data presence.
+ //
+ // Note that PeekNamedPipe() can block the thread execution in a
+ // multi-threaded program. It is not clear from the documentation
+ // under which circumstances and for how long this may happen. It
+ // seems to relate to some internal IO synchronization mechanism and
+ // probably depends on the contention level. But there doesn't appear
+ // to be a better way. Specifically, WaitForMultipleObjects() does not
+ // work on pipes (at least not on the non-overlapped ones).
+ //
+ char c;
+ DWORD n;
+ DWORD e (0);
+
+ if (!PeekNamedPipe (m->handle,
+ &c,
+ 1,
+ &n,
+ nullptr /* lpTotalBytesAvail */,
+ nullptr /* lpBytesLeftThisMessage */))
+ e = GetLastError ();
+
+ // If we successfully peeked a byte or failed due to the broken pipe
+ // (the writing end is closed), then the descriptor is ready for
+ // reading.
+ //
+ if ((e == 0 && n == 1) || e == ERROR_BROKEN_PIPE)
+ {
+ s.ready = true;
+ ++r;
+ continue;
+ }
+
+ // If we successfully read zero bytes, then there is no data available
+ // for reading.
+ //
+ // Note that we never get the ERROR_NO_DATA error, but let's check for
+ // good measure.
+ //
+ // As an observation, upon a successful call that peeks zero bytes,
+ // GetLastError() returns zero (ERROR_SUCCESS) if no data were read
+ // yet from the pipe and ERROR_NO_DATA otherwise.
+ //
+ if ((e == 0 && n == 0) || e == ERROR_NO_DATA)
+ continue;
+
+ // Both successful cases (n == 1 || n == 0) are already handled.
+ //
+ assert (e != 0);
+ throw_system_ios_failure (e);
+ }
+
+ // Bail out if some descriptors are ready for reading and sleep a bit
+ // and repeat otherwise.
+ //
+ if (r != 0)
+ break;
+
+ Sleep (50);
+ }
+
+ return make_pair (r, 0);
+ }
+
+ streamsize
+ fdread (int fd, void* buf, size_t n)
+ {
+ // The _read() call fails with the EINVAL errno code either because
+ // something is wrong with the arguments or there is no data to read from
+ // a non-blocking pipe fd. In the latter case we translate the errno code
+ // to EAGAIN.
+ //
+ // Note that this behavior is not really documented. However, it is both
+ // described on the web and is confirmed by the tests. The ERROR_NO_DATA
+ // system error is referred in the ReadFile() documentation in the context
+ // of non-blocking pipes and is presumably set by the underlying
+ // ReadFile() call.
+ //
+ // Also note that this doesn't work on Wine. We can probably implement the
+ // non-blocking behavior for Wine using PeekNamedPipe() that is claimed to
+ // also provide the number of total bytes available. More research is
+ // required.
+
+ // Clear last error to make sure ERROR_NO_DATA is set by _read().
+ //
+ SetLastError (0);
+
+ int r (_read (fd, buf, static_cast<unsigned int> (n)));
+
+ if (r == -1 && errno == EINVAL && GetLastError () == ERROR_NO_DATA)
+ {
+ // Let's also check that the file descriptor is really a non-blocking
+ // pipe.
+ //
+ optional<handle_mode> pm (pipe_mode (fd, true /* ignore_error */));
+ errno = pm && pm->mode == fdstream_mode::non_blocking ? EAGAIN : EINVAL;
+ }
+
+ return r;
+ }
+
#endif
}
diff --git a/libbutl/fdstream.mxx b/libbutl/fdstream.mxx
index e608a07..894f85b 100644
--- a/libbutl/fdstream.mxx
+++ b/libbutl/fdstream.mxx
@@ -9,12 +9,14 @@
#include <cassert>
#ifndef __cpp_lib_modules
+#include <ios> // streamsize
#include <vector>
#include <string>
#include <istream>
#include <ostream>
-#include <utility> // move()
+#include <utility> // move(), pair
#include <cstdint> // uint16_t, uint64_t
+#include <cstddef> // size_t
#include <iterator>
#endif
@@ -29,9 +31,11 @@ import std.io;
#endif
import butl.path;
import butl.filesystem; // permissions
+import butl.small_vector;
#else
#include <libbutl/path.mxx>
#include <libbutl/filesystem.mxx>
+#include <libbutl/small-vector.mxx>
#endif
#include <libbutl/export.hxx>
@@ -44,7 +48,12 @@ LIBBUTL_MODEXPORT namespace butl
// The descriptor can be negative. Such a descriptor is treated as unopened
// and is not closed.
//
- struct nullfd_t {constexpr explicit nullfd_t (int) {}};
+ struct nullfd_t
+ {
+ constexpr explicit nullfd_t (int) {}
+ constexpr operator int () const {return -1;}
+ };
+
#if defined(__cpp_modules) && defined(__clang__) //@@ MOD Clang duplicate sym.
inline
#endif
@@ -113,7 +122,7 @@ LIBBUTL_MODEXPORT namespace butl
// for reasoning and consider using non-standard tellg() and seekg() in
// fdbuf, instead)
// - non-blocking file descriptor is supported only by showmanyc() function
- // and only on POSIX
+ // and only for pipes on Windows, in contrast to POSIX systems
// - throws ios::failure in case of open(), read(), write(), close(),
// seek[gp](), or tell[gp]() errors
// - exception mask has at least badbit
@@ -154,6 +163,15 @@ LIBBUTL_MODEXPORT namespace butl
int
fd () const {return fd_.get ();}
+ // Set the file descriptor blocking mode returning the previous mode on
+ // success and throwing ios::failure otherwise (see fdmode() for details).
+ //
+ // Note that besides calling fdmode(fd()), this function also updating its
+ // internal state according to the new mode.
+ //
+ bool
+ blocking (bool);
+
public:
using base = std::basic_streambuf<char>;
@@ -253,10 +271,11 @@ LIBBUTL_MODEXPORT namespace butl
// The blocking/non_blocking flags determine whether the IO operation should
// block or return control if currently there is no data to read or no room
// to write. Only the istream::readsome() function supports the semantics of
- // non-blocking operations. We also only support this on POSIX (Windows does
- // not provide means for the non-blocking reading from a file descriptor so
- // these flags are noop there). IO stream operations other than readsome()
- // are illegal for non_blocking mode and result in the badbit being set.
+ // non-blocking operations. In contrast to POSIX systems, we only support
+ // this for pipes on Windows, always assuming the blocking mode for other
+ // file descriptors. IO stream operations other than readsome() are illegal
+ // in the non-blocking mode and result in the badbit being set (note that
+ // it is not the more appropriate failbit for implementation reasons).
//
enum class fdstream_mode: std::uint16_t
{
@@ -656,17 +675,20 @@ LIBBUTL_MODEXPORT namespace butl
LIBBUTL_SYMEXPORT auto_fd
fddup (int fd);
- // Set the translation mode for the file descriptor. Throw invalid_argument
- // for an invalid combination of flags. Return the previous mode on success,
- // throw ios::failure otherwise.
+ // Set the translation and/or blocking modes for the file descriptor. Throw
+ // invalid_argument for an invalid combination of flags. Return the previous
+ // mode on success, throw ios::failure otherwise.
//
- // The text and binary flags are mutually exclusive on Windows. Due to
- // implementation details at least one of them should be specified. On POSIX
+ // The text and binary flags are mutually exclusive on Windows. On POSIX
// system the two modes are the same and so no check is performed.
//
- // The blocking and non-blocking flags are mutually exclusive on POSIX
- // system. Non-blocking mode is not supported on Windows and so the blocking
- // mode is assumed regardless of the flags.
+ // The blocking and non-blocking flags are mutually exclusive. In contrast
+ // to POSIX systems, on Windows the non-blocking mode is only supported for
+ // pipes, with the blocking mode assumed for other file descriptors
+ // regardless of the flags.
+ //
+ // Note that on Wine currently pipes always behave as blocking regardless of
+ // the mode set.
//
LIBBUTL_SYMEXPORT fdstream_mode
fdmode (int, fdstream_mode);
@@ -781,6 +803,47 @@ LIBBUTL_MODEXPORT namespace butl
//
LIBBUTL_SYMEXPORT bool
fdterm (int);
+
+ // Wait until one or more file descriptors becomes ready for reading or
+ // writing. Return the pair of numbers of descriptors that are ready. Throw
+ // std::invalid_argument if anything is wrong with arguments (both sets are
+ // empty, invalid fd, etc). Throw ios::failure on the underlying OS error.
+ //
+ // Note that the function clears all the previously-ready entries on each
+ // call. Entries with nullfd are ignored.
+ //
+ // On Windows only pipes and only their read ends are supported.
+ //
+ struct fdselect_state
+ {
+ int fd;
+ bool ready;
+
+ // Note: intentionally non-explicit to allow implicit initialization when
+ // pushing to fdselect_set.
+ //
+ fdselect_state (int fd): fd (fd), ready (false) {}
+ };
+
+ using fdselect_set = small_vector<fdselect_state, 4>;
+
+ LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t>
+ fdselect (fdselect_set& read, fdselect_set& write);
+
+ // As above but wait up to the specified timeout returning a pair of zeroes
+ // if none of the descriptors became ready.
+ //
+ // LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t>
+ // fdselect (fdselect_set&, fdselect_set&, const duration& timeout);
+
+ // POSIX read() function wrapper. In particular, it supports the semantics
+ // of non-blocking read for pipes on Windows.
+ //
+ // Note that on Wine currently pipes always behave as blocking regardless of
+ // the mode.
+ //
+ LIBBUTL_SYMEXPORT std::streamsize
+ fdread (int, void*, std::size_t);
}
#include <libbutl/fdstream.ixx>
diff --git a/tests/fdstream/buildfile b/tests/fdstream/buildfile
index f494dcf..f69bc95 100644
--- a/tests/fdstream/buildfile
+++ b/tests/fdstream/buildfile
@@ -6,3 +6,6 @@ import libs = libbutl%lib{butl}
libs += $stdmod_lib
exe{driver}: {hxx cxx}{*} $libs
+
+if ($cxx.target.class != "windows")
+ cxx.libs += -lpthread
diff --git a/tests/fdstream/driver.cxx b/tests/fdstream/driver.cxx
index 4063ac6..ee75a31 100644
--- a/tests/fdstream/driver.cxx
+++ b/tests/fdstream/driver.cxx
@@ -2,17 +2,21 @@
// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd
// license : MIT; see accompanying LICENSE file
+#ifdef _WIN32
+# include <libbutl/win32-utility.hxx>
+#endif
+
#include <cassert>
#ifndef __cpp_lib_modules
#ifndef _WIN32
# include <chrono>
-# include <thread> // this_thread::sleep_for()
#endif
#include <ios>
#include <string>
#include <vector>
+#include <thread>
#include <iomanip>
#include <sstream>
#include <fstream>
@@ -161,19 +165,19 @@ main (int argc, const char* argv[])
string s;
getline (cin, s, '\0');
- size_t n (10);
+ size_t n (1000);
for (size_t i (0); i < s.size (); i += n)
{
- cout.write (s.c_str () + i, min (n, s.size () - i));
- cout.flush ();
-
- // @@ MINGW GCC 4.9 doesn't implement this_thread. If ifdstream
- // non-blocking read will ever be implemented on Windows use Win32
- // Sleep() instead.
+ // MINGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep().
//
#ifndef _WIN32
this_thread::sleep_for (chrono::milliseconds (50));
+#else
+ Sleep (50);
#endif
+
+ cout.write (s.c_str () + i, min (n, s.size () - i));
+ cout.flush ();
}
return 0;
@@ -247,8 +251,8 @@ main (int argc, const char* argv[])
{
// Fail to create if the file already exists.
//
- fdopen (
- f, fdopen_mode::out | fdopen_mode::create | fdopen_mode::exclusive);
+ fdopen (f,
+ fdopen_mode::out | fdopen_mode::create | fdopen_mode::exclusive);
assert (false);
}
@@ -433,54 +437,101 @@ main (int argc, const char* argv[])
{
}
+#else
+
+ // Check translation modes.
+ //
+ to_file (f, text1, fdopen_mode::truncate);
+ assert (from_file (f, fdopen_mode::binary) == text3);
+
+ to_file (f, text3, fdopen_mode::truncate | fdopen_mode::binary);
+ assert (from_file (f) == text1);
+
+#endif
+
// Test non-blocking reading.
//
- try
{
+ string s;
+ for (size_t i (0); i < 100; ++i)
+ s += "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\n";
+
const char* args[] = {argv[0], "-c", nullptr};
- process pr (args, -1, -1);
- ofdstream os (move (pr.out_fd));
- ifdstream is (move (pr.in_ofd), fdstream_mode::non_blocking);
+ auto test_read = [&args, &s] ()
+ {
+ try
+ {
+ process pr (args, -1, -1);
+ ofdstream os (move (pr.out_fd));
+ ifdstream is (move (pr.in_ofd), fdstream_mode::non_blocking);
- const string s (
- "0123456789\nABCDEFGHIJKLMNOPQRSTUVWXYZ\nabcdefghijklmnopqrstuvwxyz");
+ os << s;
+ os.close ();
- os << s;
- os.close ();
+ fdselect_set rds ({fdselect_state (is.fd ())});
+ fdselect_set wds;
- string r;
- char buf[3];
- while (!is.eof ())
- {
- streamsize n (is.readsome (buf, sizeof (buf)));
- r.append (buf, n);
- }
+ string r;
+ char buf[300];
+ while (!is.eof ())
+ {
+ pair<size_t, size_t> nd (fdselect (rds, wds));
- is.close ();
+ assert (nd.first == 1 && nd.second == 0 && rds[0].ready);
- assert (r == s);
- }
- catch (const ios::failure&)
- {
- assert (false);
- }
- catch (const process_error&)
- {
- assert (false);
- }
+ for (streamsize n; (n = is.readsome (buf, sizeof (buf))) != 0; )
+ r.append (buf, n);
+ }
-#else
+ is.close ();
- // Check translation modes.
- //
- to_file (f, text1, fdopen_mode::truncate);
- assert (from_file (f, fdopen_mode::binary) == text3);
+ assert (r == s);
+ }
+ catch (const ios::failure&)
+ {
+ assert (false);
+ }
+ catch (const process_error&)
+ {
+ assert (false);
+ }
+ };
- to_file (f, text3, fdopen_mode::truncate | fdopen_mode::binary);
- assert (from_file (f) == text1);
+ vector<thread> threads;
+ for (size_t i (0); i < 10; ++i)
+ threads.emplace_back (test_read);
-#endif
+ // While the threads are busy, let's test the skip/non_blocking modes
+ // combination.
+ //
+ try
+ {
+ process pr (args, -1, -1);
+ ofdstream os (move (pr.out_fd));
+
+ ifdstream is (move (pr.in_ofd),
+ fdstream_mode::non_blocking | fdstream_mode::skip);
+
+ os << s;
+ os.close ();
+
+ is.close (); // Set the blocking mode, skip and close.
+ }
+ catch (const ios::failure&)
+ {
+ assert (false);
+ }
+ catch (const process_error&)
+ {
+ assert (false);
+ }
+
+ // Join the non-blocking reading test threads.
+ //
+ for (thread& t: threads)
+ t.join ();
+ }
// Test setting and getting position via the non-standard fdbuf interface.
//