aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2017-02-03 08:17:21 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2017-02-13 12:42:41 +0200
commit21c5af3dc018ec1f2218a3df3c8195cfcfe3aefa (patch)
treed0f0ac84592ff12144b7b52dea8ac2dbb2aaa04d
parent0d3ce80a2f0cd8398225e7ef7a1abbe7e77a38fc (diff)
Add support for passing alternative task start counts to scheduler
-rw-r--r--build2/scheduler45
-rw-r--r--build2/scheduler.cxx13
-rw-r--r--build2/scheduler.txx5
3 files changed, 45 insertions, 18 deletions
diff --git a/build2/scheduler b/build2/scheduler
index 42f82e0..5b0f566 100644
--- a/build2/scheduler
+++ b/build2/scheduler
@@ -58,7 +58,7 @@ namespace build2
// F should return void and not throw any exceptions. The way the result
// of a task is communicated back to the master thread is ad hoc, usually
// via "out" arguments. Such result(s) can only be retrieved by the master
- // once its task count reaches 0.
+ // once its task count reaches the start count.
//
// The argument passing semantics is the same as for std::thread. In
// particular, lvalue-references are passed as copies (use ref()/cref()
@@ -71,16 +71,43 @@ namespace build2
//
template <typename F, typename... A>
void
- async (atomic_count& task_count, F&&, A&&...);
+ async (size_t start_count, atomic_count& task_count, F&&, A&&...);
- // Wait until the task count reaches 0. If the scheduler is shutdown while
- // waiting, throw system_error(ECANCELED).
+ template <typename F, typename... A>
+ void
+ async (atomic_count& task_count, F&& f, A&&... a)
+ {
+ async (0, task_count, forward<F> (f), forward<A> (a)...);
+ }
+
+ // Wait until the task count reaches the start count. If the scheduler is
+ // shutdown while waiting, throw system_error(ECANCELED).
//
// Note that it is valid to wait on another thread's task count (that is,
- // without making any async() calls in this thread).
+ // without making any async() calls in this thread). However, if the start
+ // count differs from the one passed to async(), then whomever sets the
+ // start count to this alternative value must also call resume() below
+ // in order to signal waiting threads.
+ //
+ // Note also that in this case (waiting on someone else's start count),
+ // the async() call could execute the tasks synchronously without ever
+ // incrementing the task count. Thus if waiting on another thread's start
+ // count starts before/during async() calls, then it must be "gated" with
+ // an alternative (lower) start count.
//
void
- wait (atomic_count& task_count);
+ wait (size_t start_count, atomic_count& task_count);
+
+ void
+ wait (atomic_count& task_count)
+ {
+ wait (0, task_count);
+ }
+
+ // Resume threads waiting on this task count.
+ //
+ void
+ resume (atomic_count& task_count);
// Startup and shutdown.
//
@@ -205,10 +232,7 @@ namespace build2
helper (void*);
void
- suspend (atomic_count& task_count);
-
- void
- resume (atomic_count& task_count);
+ suspend (size_t start_count, atomic_count& task_count);
// Task encapsulation.
//
@@ -219,6 +243,7 @@ namespace build2
using args_type = std::tuple<std::decay_t<A>...>;
atomic_count* task_count;
+ size_t start_count;
func_type func;
args_type args;
diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx
index c151847..ee0f4a2 100644
--- a/build2/scheduler.cxx
+++ b/build2/scheduler.cxx
@@ -11,9 +11,9 @@ using namespace std;
namespace build2
{
void scheduler::
- wait (atomic_count& task_count)
+ wait (size_t start_count, atomic_count& task_count)
{
- if (task_count == 0)
+ if (task_count == start_count)
return;
// See if we can run some of our own tasks.
@@ -29,15 +29,15 @@ namespace build2
// Note that empty task queue doesn't automatically mean the task count
// is zero (some might still be executing asynchronously).
//
- if (task_count == 0)
+ if (task_count == start_count)
return;
}
- suspend (task_count);
+ suspend (start_count, task_count);
}
void scheduler::
- suspend (atomic_count& tc)
+ suspend (size_t start_count, atomic_count& tc)
{
wait_slot& s (
wait_queue_[std::hash<atomic_count*> () (&tc) % wait_queue_size_]);
@@ -85,7 +85,8 @@ namespace build2
// Since we use a mutex for synchronization, we can relax the atomic
// access.
//
- while (!s.shutdown && tc.load (std::memory_order_relaxed) != 0)
+ while (!s.shutdown &&
+ tc.load (std::memory_order_relaxed) != start_count)
s.condv.wait (l);
s.waiters--;
diff --git a/build2/scheduler.txx b/build2/scheduler.txx
index 127ce48..965813f 100644
--- a/build2/scheduler.txx
+++ b/build2/scheduler.txx
@@ -8,7 +8,7 @@ namespace build2
{
template <typename F, typename... A>
void scheduler::
- async (atomic_count& task_count, F&& f, A&&... a)
+ async (size_t start_count, atomic_count& task_count, F&& f, A&&... a)
{
using task = task_type<F, A...>;
@@ -40,6 +40,7 @@ namespace build2
//
new (&td->data) task {
&task_count,
+ start_count,
decay_copy (forward<F> (f)),
typename task::args_type (decay_copy (forward<A> (a))...)};
@@ -85,7 +86,7 @@ namespace build2
t.thunk (std::index_sequence_for<A...> ());
atomic_count& tc (*t.task_count);
- if (--tc == 0)
+ if (--tc == t.start_count)
s.resume (tc); // Resume a waiter, if any.
}
}