aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libbutl/fdstream.cxx107
-rw-r--r--libbutl/fdstream.ixx19
-rw-r--r--libbutl/fdstream.mxx24
-rw-r--r--libbutl/filesystem.cxx2
-rw-r--r--tests/fdstream/driver.cxx37
5 files changed, 168 insertions, 21 deletions
diff --git a/libbutl/fdstream.cxx b/libbutl/fdstream.cxx
index fad65f7..4948052 100644
--- a/libbutl/fdstream.cxx
+++ b/libbutl/fdstream.cxx
@@ -37,6 +37,7 @@
#ifndef __cpp_lib_modules_ts
#include <vector>
#include <string>
+#include <chrono>
#include <istream>
#include <ostream>
#include <utility>
@@ -72,8 +73,10 @@ import butl.small_vector;
#endif
import butl.utility; // throw_*_ios_failure(), function_cast()
+import butl.timestamp;
#else
#include <libbutl/utility.mxx>
+#include <libbutl/timestamp.mxx>
#endif
using namespace std;
@@ -1389,9 +1392,13 @@ namespace butl
throw_generic_ios_failure (errno);
}
- pair<size_t, size_t>
- fdselect (fdselect_set& read, fdselect_set& write)
+ static pair<size_t, size_t>
+ fdselect (fdselect_set& read,
+ fdselect_set& write,
+ const chrono::milliseconds* timeout)
{
+ using namespace chrono;
+
// Copy fdselect_set into the native fd_set, updating max_fd. Also clear
// the ready flag in the source set.
//
@@ -1427,29 +1434,64 @@ namespace butl
++max_fd;
+ // Note that if the timeout is not NULL, then the select timeout needs to
+ // be recalculated for each select() call (of which we can potentially
+ // have multiple due to EINTR). So the timeout can be used as bool.
+ //
+ timestamp now;
+ timestamp deadline;
+
+ if (timeout)
+ {
+ now = system_clock::now ();
+ deadline = now + *timeout;
+ }
+
// Repeat the select() call while getting the EINTR error and throw on
// any other error.
//
// Note that select() doesn't modify the sets on failure (according to
// POSIX standard as well as to the Linux, FreeBSD and MacOS man pages).
//
- for (;;)
+ for (timeval tv;;)
{
+ if (timeout)
+ {
+ if (now < deadline)
+ {
+ microseconds t (duration_cast<microseconds> (deadline - now));
+ tv.tv_sec = t.count () / 1000000;
+ tv.tv_usec = t.count () % 1000000;
+ }
+ else
+ {
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ }
+ }
+
int r (select (max_fd,
&rds,
&wds,
nullptr /* exceptfds */,
- nullptr /* timeout */));
+ timeout ? &tv : nullptr));
if (r == -1)
{
if (errno == EINTR)
+ {
+ if (timeout)
+ now = system_clock::now ();
+
continue;
+ }
throw_system_ios_failure (errno);
}
- assert (r != 0); // We don't expect the timeout to occur.
+ if (!timeout)
+ assert (r != 0); // We don't expect the timeout to occur.
+
break;
}
@@ -1825,9 +1867,13 @@ namespace butl
return false;
}
- pair<size_t, size_t>
- fdselect (fdselect_set& read, fdselect_set& write)
+ static pair<size_t, size_t>
+ fdselect (fdselect_set& read,
+ fdselect_set& write,
+ const chrono::milliseconds* timeout)
{
+ using namespace chrono;
+
if (!write.empty ())
throw invalid_argument ("write file descriptor set is not supported");
@@ -1850,6 +1896,15 @@ namespace butl
if (n == 0)
throw invalid_argument ("empty file descriptor set");
+ // Note that if the timeout is not NULL, then the deadline needs to be
+ // checked prior to re-probing the pipe for data presence. So the timeout
+ // can be used as bool.
+ //
+ timestamp deadline;
+
+ if (timeout)
+ deadline = system_clock::now () + *timeout;
+
// Keep iterating through the set until at least one byte can be read from
// any of the pipes or any of them get closed (so can read eof).
//
@@ -1922,13 +1977,30 @@ namespace butl
throw_system_ios_failure (e);
}
- // Bail out if some descriptors are ready for reading and sleep a bit
- // and repeat otherwise.
+ // Bail out if some descriptors are ready for reading or the deadline
+ // has been reached, if specified, and sleep a bit and repeat otherwise.
//
if (r != 0)
break;
- Sleep (50);
+ DWORD t (50);
+
+ if (timeout)
+ {
+ timestamp now (system_clock::now ());
+
+ if (now < deadline)
+ {
+ milliseconds tm (duration_cast<milliseconds> (deadline - now));
+
+ if (t > tm.count ())
+ t = static_cast<DWORD> (tm.count ());
+ }
+ else
+ break;
+ }
+
+ Sleep (t);
}
return make_pair (r, 0);
@@ -1972,4 +2044,19 @@ namespace butl
}
#endif
+
+ pair<size_t, size_t>
+ fdselect (fdselect_set& read, fdselect_set& write)
+ {
+ return fdselect (read, write, nullptr /* timeout */);
+ }
+
+ template <>
+ pair<size_t, size_t>
+ fdselect (fdselect_set& read,
+ fdselect_set& write,
+ const chrono::milliseconds& timeout)
+ {
+ return fdselect (read, write, &timeout);
+ }
}
diff --git a/libbutl/fdstream.ixx b/libbutl/fdstream.ixx
index fb57b99..4ef5b1d 100644
--- a/libbutl/fdstream.ixx
+++ b/libbutl/fdstream.ixx
@@ -353,4 +353,23 @@ namespace butl
{
return fdmode (stderr_fd (), m);
}
+
+ // fdselect
+ //
+ // Implement fdselect() function templates in terms of their milliseconds
+ // specialization.
+ //
+ template <>
+ LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t>
+ fdselect (fdselect_set&, fdselect_set&, const std::chrono::milliseconds&);
+
+ template <typename R, typename P>
+ inline std::pair<std::size_t, std::size_t>
+ fdselect (fdselect_set& ifds,
+ fdselect_set& ofds,
+ const std::chrono::duration<R, P>& timeout)
+ {
+ using namespace std::chrono;
+ return fdselect (ifds, ofds, duration_cast<milliseconds> (timeout));
+ }
}
diff --git a/libbutl/fdstream.mxx b/libbutl/fdstream.mxx
index 86ada51..c863d2c 100644
--- a/libbutl/fdstream.mxx
+++ b/libbutl/fdstream.mxx
@@ -11,6 +11,7 @@
#include <ios> // streamsize
#include <vector>
#include <string>
+#include <chrono>
#include <istream>
#include <ostream>
#include <utility> // move(), pair
@@ -911,10 +912,25 @@ LIBBUTL_MODEXPORT namespace butl
// As above but wait up to the specified timeout returning a pair of zeroes
// if none of the descriptors became ready.
//
- // @@ Maybe merge it with the above via a default/optional value?
- //
- // LIBBUTL_SYMEXPORT std::pair<std::size_t, std::size_t>
- // fdselect (fdselect_set&, fdselect_set&, const duration& timeout);
+ template <typename R, typename P>
+ std::pair<std::size_t, std::size_t>
+ fdselect (fdselect_set&, fdselect_set&, const std::chrono::duration<R, P>&);
+
+ template <typename R, typename P>
+ inline std::size_t
+ ifdselect (fdselect_set& ifds, const std::chrono::duration<R, P>& timeout)
+ {
+ fdselect_set ofds;
+ return fdselect (ifds, ofds, timeout).first;
+ }
+
+ template <typename R, typename P>
+ inline std::size_t
+ ofdselect (fdselect_set& ofds, const std::chrono::duration<R, P>& timeout)
+ {
+ fdselect_set ifds;
+ return fdselect (ifds, ofds, timeout).second;
+ }
// POSIX read() function wrapper. In particular, it supports the semantics
// of non-blocking read for pipes on Windows.
diff --git a/libbutl/filesystem.cxx b/libbutl/filesystem.cxx
index 9e8a232..18be8a9 100644
--- a/libbutl/filesystem.cxx
+++ b/libbutl/filesystem.cxx
@@ -1447,7 +1447,7 @@ namespace butl
path r (p);
bool exists;
- for (size_t i (0); true; ++i)
+ for (size_t i (0);; ++i)
{
pair<bool, entry_stat> pe (path_entry (r));
diff --git a/tests/fdstream/driver.cxx b/tests/fdstream/driver.cxx
index 3215e02..76fc64a 100644
--- a/tests/fdstream/driver.cxx
+++ b/tests/fdstream/driver.cxx
@@ -470,12 +470,12 @@ main (int argc, const char* argv[])
//
{
string s;
- for (size_t i (0); i < 100; ++i)
+ for (size_t i (0); i < 300; ++i)
s += "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz\n";
const char* args[] = {argv[0], "-c", nullptr};
- auto test_read = [&args, &s] ()
+ auto test_read = [&args, &s] (bool timeout)
{
try
{
@@ -491,11 +491,29 @@ main (int argc, const char* argv[])
string r;
char buf[300];
+ bool timedout (false);
while (!is.eof ())
{
- pair<size_t, size_t> nd (fdselect (rds, wds));
-
- assert (nd.first == 1 && nd.second == 0 && rds[0].ready);
+ if (timeout)
+ {
+ pair<size_t, size_t> nd (
+ fdselect (rds, wds, chrono::milliseconds (3)));
+
+ assert (((nd.first == 0 && !rds[0].ready) ||
+ (nd.first == 1 && rds[0].ready)) &&
+ nd.second == 0);
+
+ if (nd.first == 0)
+ {
+ timedout = true;
+ continue;
+ }
+ }
+ else
+ {
+ pair<size_t, size_t> nd (fdselect (rds, wds));
+ assert (nd.first == 1 && nd.second == 0 && rds[0].ready);
+ }
for (streamsize n; (n = is.readsome (buf, sizeof (buf))) != 0; )
r.append (buf, static_cast<size_t> (n));
@@ -504,6 +522,10 @@ main (int argc, const char* argv[])
is.close ();
assert (r == s);
+
+ // If timeout is used, then it most likely timedout, at least once.
+ //
+ assert (timedout == timeout);
}
catch (const ios::failure&)
{
@@ -517,7 +539,10 @@ main (int argc, const char* argv[])
vector<thread> threads;
for (size_t i (0); i < 10; ++i)
- threads.emplace_back (test_read);
+ {
+ threads.emplace_back ([&test_read] {test_read (true /* timeout */);});
+ threads.emplace_back ([&test_read] {test_read (false /* timeout */);});
+ }
// While the threads are busy, let's test the skip/non_blocking modes
// combination.