diff options
-rw-r--r-- | libbutl/fdstream.cxx | 107 | ||||
-rw-r--r-- | libbutl/fdstream.ixx | 19 | ||||
-rw-r--r-- | libbutl/fdstream.mxx | 24 | ||||
-rw-r--r-- | libbutl/filesystem.cxx | 2 | ||||
-rw-r--r-- | tests/fdstream/driver.cxx | 37 |
5 files changed, 168 insertions, 21 deletions
diff --git a/libbutl/fdstream.cxx b/libbutl/fdstream.cxx index fad65f7..4948052 100644 --- a/libbutl/fdstream.cxx +++ b/libbutl/fdstream.cxx @@ -37,6 +37,7 @@ #ifndef __cpp_lib_modules_ts #include <vector> #include <string> +#include <chrono> #include <istream> #include <ostream> #include <utility> @@ -72,8 +73,10 @@ import butl.small_vector; #endif import butl.utility; // throw_*_ios_failure(), function_cast() +import butl.timestamp; #else #include <libbutl/utility.mxx> +#include <libbutl/timestamp.mxx> #endif using namespace std; @@ -1389,9 +1392,13 @@ namespace butl throw_generic_ios_failure (errno); } - pair<size_t, size_t> - fdselect (fdselect_set& read, fdselect_set& write) + static pair<size_t, size_t> + fdselect (fdselect_set& read, + fdselect_set& write, + const chrono::milliseconds* timeout) { + using namespace chrono; + // Copy fdselect_set into the native fd_set, updating max_fd. Also clear // the ready flag in the source set. // @@ -1427,29 +1434,64 @@ namespace butl ++max_fd; + // Note that if the timeout is not NULL, then the select timeout needs to + // be recalculated for each select() call (of which we can potentially + // have multiple due to EINTR). So the timeout can be used as bool. + // + timestamp now; + timestamp deadline; + + if (timeout) + { + now = system_clock::now (); + deadline = now + *timeout; + } + // Repeat the select() call while getting the EINTR error and throw on // any other error. // // Note that select() doesn't modify the sets on failure (according to // POSIX standard as well as to the Linux, FreeBSD and MacOS man pages). // - for (;;) + for (timeval tv;;) { + if (timeout) + { + if (now < deadline) + { + microseconds t (duration_cast<microseconds> (deadline - now)); + tv.tv_sec = t.count () / 1000000; + tv.tv_usec = t.count () % 1000000; + } + else + { + tv.tv_sec = 0; + tv.tv_usec = 0; + } + } + int r (select (max_fd, &rds, &wds, nullptr /* exceptfds */, - nullptr /* timeout */)); + timeout ? &tv : nullptr)); if (r == -1) { if (errno == EINTR) + { + if (timeout) + now = system_clock::now (); + continue; + } throw_system_ios_failure (errno); } - assert (r != 0); // We don't expect the timeout to occur. + if (!timeout) + assert (r != 0); // We don't expect the timeout to occur. + break; } @@ -1825,9 +1867,13 @@ namespace butl return false; } - pair<size_t, size_t> - fdselect (fdselect_set& read, fdselect_set& write) + static pair<size_t, size_t> + fdselect (fdselect_set& read, + fdselect_set& write, + const chrono::milliseconds* timeout) { + using namespace chrono; + if (!write.empty ()) throw invalid_argument ("write file descriptor set is not supported"); @@ -1850,6 +1896,15 @@ namespace butl if (n == 0) throw invalid_argument ("empty file descriptor set"); + // Note that if the timeout is not NULL, then the deadline needs to be + // checked prior to re-probing the pipe for data presence. So the timeout + // can be used as bool. + // + timestamp deadline; + + if (timeout) + deadline = system_clock::now () + *timeout; + // Keep iterating through the set until at least one byte can be read from // any of the pipes or any of them get closed (so can read eof). // @@ -1922,13 +1977,30 @@ namespace butl throw_system_ios_failure (e); } - // Bail out if some descriptors are ready for reading and sleep a bit - // and repeat otherwise. + // Bail out if some descriptors are ready for reading or the deadline + // has been reached, if specified, and sleep a bit and repeat otherwise. // if (r != 0) break; - Sleep (50); + DWORD t (50); + + if (timeout) + { + timestamp now (system_clock::now ()); + + if (now < deadline) + { + milliseconds tm (duration_cast<milliseconds> (deadline - now)); + + if (t > tm.count ()) + t = static_cast<DWORD> (tm.count ()); + } + else + break; + } + + Sleep (t); } return make_pair (r, 0); @@ -1972,4 +2044,19 @@ namespace butl } #endif + + pair<size_t, size_t> + fdselect (fdselect_set& read, fdselect_set& write) + { + return fdselect (read, write, nullptr /* timeout */); + } + + template <> + pair<size_t, size_t> + fdselect (fdselect_set& read, + fdselect_set& write, + const chrono::milliseconds& timeout) + { + return fdselect (read, write, &timeout); + } } diff --git a/libbutl/fdstream.ixx b/libbutl/fdstream.ixx index fb57b99..4ef5b1d 100644 --- a/libbutl/fdstream.ixx +++ b/libbutl/fdstream.ixx @@ -353,4 +353,23 @@ namespace butl { return fdmode (stderr_fd (), m); } + + // fdselect + // + // Implement fdselect() function templates in terms of their milliseconds + // specialization. + // + template <> + LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t> + fdselect (fdselect_set&, fdselect_set&, const std::chrono::milliseconds&); + + template <typename R, typename P> + inline std::pair<std::size_t, std::size_t> + fdselect (fdselect_set& ifds, + fdselect_set& ofds, + const std::chrono::duration<R, P>& timeout) + { + using namespace std::chrono; + return fdselect (ifds, ofds, duration_cast<milliseconds> (timeout)); + } } diff --git a/libbutl/fdstream.mxx b/libbutl/fdstream.mxx index 86ada51..c863d2c 100644 --- a/libbutl/fdstream.mxx +++ b/libbutl/fdstream.mxx @@ -11,6 +11,7 @@ #include <ios> // streamsize #include <vector> #include <string> +#include <chrono> #include <istream> #include <ostream> #include <utility> // move(), pair @@ -911,10 +912,25 @@ LIBBUTL_MODEXPORT namespace butl // As above but wait up to the specified timeout returning a pair of zeroes // if none of the descriptors became ready. // - // @@ Maybe merge it with the above via a default/optional value? - // - // LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t> - // fdselect (fdselect_set&, fdselect_set&, const duration& timeout); + template <typename R, typename P> + std::pair<std::size_t, std::size_t> + fdselect (fdselect_set&, fdselect_set&, const std::chrono::duration<R, P>&); + + template <typename R, typename P> + inline std::size_t + ifdselect (fdselect_set& ifds, const std::chrono::duration<R, P>& timeout) + { + fdselect_set ofds; + return fdselect (ifds, ofds, timeout).first; + } + + template <typename R, typename P> + inline std::size_t + ofdselect (fdselect_set& ofds, const std::chrono::duration<R, P>& timeout) + { + fdselect_set ifds; + return fdselect (ifds, ofds, timeout).second; + } // POSIX read() function wrapper. In particular, it supports the semantics // of non-blocking read for pipes on Windows. diff --git a/libbutl/filesystem.cxx b/libbutl/filesystem.cxx index 9e8a232..18be8a9 100644 --- a/libbutl/filesystem.cxx +++ b/libbutl/filesystem.cxx @@ -1447,7 +1447,7 @@ namespace butl path r (p); bool exists; - for (size_t i (0); true; ++i) + for (size_t i (0);; ++i) { pair<bool, entry_stat> pe (path_entry (r)); diff --git a/tests/fdstream/driver.cxx b/tests/fdstream/driver.cxx index 3215e02..76fc64a 100644 --- a/tests/fdstream/driver.cxx +++ b/tests/fdstream/driver.cxx @@ -470,12 +470,12 @@ main (int argc, const char* argv[]) // { string s; - for (size_t i (0); i < 100; ++i) + for (size_t i (0); i < 300; ++i) s += "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\n"; const char* args[] = {argv[0], "-c", nullptr}; - auto test_read = [&args, &s] () + auto test_read = [&args, &s] (bool timeout) { try { @@ -491,11 +491,29 @@ main (int argc, const char* argv[]) string r; char buf[300]; + bool timedout (false); while (!is.eof ()) { - pair<size_t, size_t> nd (fdselect (rds, wds)); - - assert (nd.first == 1 && nd.second == 0 && rds[0].ready); + if (timeout) + { + pair<size_t, size_t> nd ( + fdselect (rds, wds, chrono::milliseconds (3))); + + assert (((nd.first == 0 && !rds[0].ready) || + (nd.first == 1 && rds[0].ready)) && + nd.second == 0); + + if (nd.first == 0) + { + timedout = true; + continue; + } + } + else + { + pair<size_t, size_t> nd (fdselect (rds, wds)); + assert (nd.first == 1 && nd.second == 0 && rds[0].ready); + } for (streamsize n; (n = is.readsome (buf, sizeof (buf))) != 0; ) r.append (buf, static_cast<size_t> (n)); @@ -504,6 +522,10 @@ main (int argc, const char* argv[]) is.close (); assert (r == s); + + // If timeout is used, then it most likely timedout, at least once. + // + assert (timedout == timeout); } catch (const ios::failure&) { @@ -517,7 +539,10 @@ main (int argc, const char* argv[]) vector<thread> threads; for (size_t i (0); i < 10; ++i) - threads.emplace_back (test_read); + { + threads.emplace_back ([&test_read] {test_read (true /* timeout */);}); + threads.emplace_back ([&test_read] {test_read (false /* timeout */);}); + } // While the threads are busy, let's test the skip/non_blocking modes // combination. |