From 052dc48939a063b19a13c10cb2c735b4b06a4c4b Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Tue, 13 Dec 2016 12:30:14 +0200 Subject: Various scheduler improvements and fixes --- build2/scheduler.txx | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 build2/scheduler.txx (limited to 'build2/scheduler.txx') diff --git a/build2/scheduler.txx b/build2/scheduler.txx new file mode 100644 index 0000000..9202f0b --- /dev/null +++ b/build2/scheduler.txx @@ -0,0 +1,91 @@ +// file : build2/scheduler.txx -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#include + +namespace build2 +{ + template + void scheduler:: + async (atomic_count& task_count, F&& f, A&&... a) + { + using task = task_type; + + static_assert (sizeof (task) <= sizeof (task_data::data), + "insufficient space"); + + static_assert (std::is_trivially_destructible::value, + "not trivially destructible"); + + // Push into the queue unless we are running serially or the queue is + // full. + // + task_data* td (nullptr); + + if (max_active_ != 1) + { + task_queue* tq (task_queue_); // Single load. + if (tq == nullptr) + tq = &create_queue (); + + lock ql (tq->mutex); + + if (tq->shutdown) + throw system_error (ECANCELED, std::system_category ()); + + if ((td = push (*tq)) != nullptr) + { + // Package the task. + // + new (&td->data) task { + &task_count, + decay_copy (forward (f)), + typename task::args_type (decay_copy (forward (a))...)}; + + td->thunk = &task_thunk; + } + else + tq->stat_full++; + } + + // If serial/full, then run the task synchronously. In this case + // there is no need to mess with task count. + // + if (td == nullptr) + { + forward (f) (forward (a)...); + return; + } + + // Increment the task count. + // + task_count.fetch_add (1, std::memory_order_release); + + lock l (mutex_); + task_ = true; + + // If there is a spare active thread, wake up (or create) the helper. + // + if (active_ < max_active_) + activate_helper (l); + } + + template + void scheduler:: + task_thunk (scheduler& s, lock& ql, void* td) + { + using task = task_type; + + // Move the data and release the lock. + // + task t (move (*static_cast (td))); + ql.unlock (); + + t.thunk (std::index_sequence_for ()); + + atomic_count& tc (*t.task_count); + if (--tc == 0) + s.resume (tc); // Resume a waiter, if any. + } +} -- cgit v1.1