diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2017-02-03 08:17:21 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2017-02-13 12:42:41 +0200 |
commit | 21c5af3dc018ec1f2218a3df3c8195cfcfe3aefa (patch) | |
tree | d0f0ac84592ff12144b7b52dea8ac2dbb2aaa04d | |
parent | 0d3ce80a2f0cd8398225e7ef7a1abbe7e77a38fc (diff) |
Add support for passing alternative task start counts to scheduler
-rw-r--r-- | build2/scheduler | 45 | ||||
-rw-r--r-- | build2/scheduler.cxx | 13 | ||||
-rw-r--r-- | build2/scheduler.txx | 5 |
3 files changed, 45 insertions, 18 deletions
diff --git a/build2/scheduler b/build2/scheduler index 42f82e0..5b0f566 100644 --- a/build2/scheduler +++ b/build2/scheduler @@ -58,7 +58,7 @@ namespace build2 // F should return void and not throw any exceptions. The way the result // of a task is communicated back to the master thread is ad hoc, usually // via "out" arguments. Such result(s) can only be retrieved by the master - // once its task count reaches 0. + // once its task count reaches the start count. // // The argument passing semantics is the same as for std::thread. In // particular, lvalue-references are passed as copies (use ref()/cref() @@ -71,16 +71,43 @@ namespace build2 // template <typename F, typename... A> void - async (atomic_count& task_count, F&&, A&&...); + async (size_t start_count, atomic_count& task_count, F&&, A&&...); - // Wait until the task count reaches 0. If the scheduler is shutdown while - // waiting, throw system_error(ECANCELED). + template <typename F, typename... A> + void + async (atomic_count& task_count, F&& f, A&&... a) + { + async (0, task_count, forward<F> (f), forward<A> (a)...); + } + + // Wait until the task count reaches the start count. If the scheduler is + // shutdown while waiting, throw system_error(ECANCELED). // // Note that it is valid to wait on another thread's task count (that is, - // without making any async() calls in this thread). + // without making any async() calls in this thread). However, if the start + // count differs from the one passed to async(), then whomever sets the + // start count to this alternative value must also call resume() below + // in order to signal waiting threads. + // + // Note also that in this case (waiting on someone else's start count), + // the async() call could execute the tasks synchronously without ever + // incrementing the task count. Thus if waiting on another thread's start + // count starts before/during async() calls, then it must be "gated" with + // an alternative (lower) start count. // void - wait (atomic_count& task_count); + wait (size_t start_count, atomic_count& task_count); + + void + wait (atomic_count& task_count) + { + wait (0, task_count); + } + + // Resume threads waiting on this task count. + // + void + resume (atomic_count& task_count); // Startup and shutdown. // @@ -205,10 +232,7 @@ namespace build2 helper (void*); void - suspend (atomic_count& task_count); - - void - resume (atomic_count& task_count); + suspend (size_t start_count, atomic_count& task_count); // Task encapsulation. // @@ -219,6 +243,7 @@ namespace build2 using args_type = std::tuple<std::decay_t<A>...>; atomic_count* task_count; + size_t start_count; func_type func; args_type args; diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx index c151847..ee0f4a2 100644 --- a/build2/scheduler.cxx +++ b/build2/scheduler.cxx @@ -11,9 +11,9 @@ using namespace std; namespace build2 { void scheduler:: - wait (atomic_count& task_count) + wait (size_t start_count, atomic_count& task_count) { - if (task_count == 0) + if (task_count == start_count) return; // See if we can run some of our own tasks. @@ -29,15 +29,15 @@ namespace build2 // Note that empty task queue doesn't automatically mean the task count // is zero (some might still be executing asynchronously). // - if (task_count == 0) + if (task_count == start_count) return; } - suspend (task_count); + suspend (start_count, task_count); } void scheduler:: - suspend (atomic_count& tc) + suspend (size_t start_count, atomic_count& tc) { wait_slot& s ( wait_queue_[std::hash<atomic_count*> () (&tc) % wait_queue_size_]); @@ -85,7 +85,8 @@ namespace build2 // Since we use a mutex for synchronization, we can relax the atomic // access. // - while (!s.shutdown && tc.load (std::memory_order_relaxed) != 0) + while (!s.shutdown && + tc.load (std::memory_order_relaxed) != start_count) s.condv.wait (l); s.waiters--; diff --git a/build2/scheduler.txx b/build2/scheduler.txx index 127ce48..965813f 100644 --- a/build2/scheduler.txx +++ b/build2/scheduler.txx @@ -8,7 +8,7 @@ namespace build2 { template <typename F, typename... A> void scheduler:: - async (atomic_count& task_count, F&& f, A&&... a) + async (size_t start_count, atomic_count& task_count, F&& f, A&&... a) { using task = task_type<F, A...>; @@ -40,6 +40,7 @@ namespace build2 // new (&td->data) task { &task_count, + start_count, decay_copy (forward<F> (f)), typename task::args_type (decay_copy (forward<A> (a))...)}; @@ -85,7 +86,7 @@ namespace build2 t.thunk (std::index_sequence_for<A...> ()); atomic_count& tc (*t.task_count); - if (--tc == 0) + if (--tc == t.start_count) s.resume (tc); // Resume a waiter, if any. } } |