aboutsummaryrefslogtreecommitdiff
path: root/libbuild2/scheduler.txx
diff options
context:
space:
mode:
Diffstat (limited to 'libbuild2/scheduler.txx')
-rw-r--r--libbuild2/scheduler.txx138
1 files changed, 138 insertions, 0 deletions
diff --git a/libbuild2/scheduler.txx b/libbuild2/scheduler.txx
new file mode 100644
index 0000000..805a072
--- /dev/null
+++ b/libbuild2/scheduler.txx
@@ -0,0 +1,138 @@
+// file : libbuild2/scheduler.txx -*- C++ -*-
+// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd
+// license : MIT; see accompanying LICENSE file
+
+#include <cerrno>
+
+namespace build2
+{
+ template <typename F, typename... A>
+ bool scheduler::
+ async (size_t start_count, atomic_count& task_count, F&& f, A&&... a)
+ {
+ using task = task_type<F, A...>;
+
+ static_assert (sizeof (task) <= sizeof (task_data::data),
+ "insufficient space");
+
+ static_assert (std::is_trivially_destructible<task>::value,
+ "not trivially destructible");
+
+ // If running serially, then run the task synchronously. In this case
+ // there is no need to mess with task count.
+ //
+ if (max_active_ == 1)
+ {
+ forward<F> (f) (forward<A> (a)...);
+
+ // See if we need to call the monitor (see the concurrent version in
+ // execute() for details).
+ //
+ if (monitor_count_ != nullptr)
+ {
+ size_t v (monitor_count_->load (memory_order_relaxed));
+ if (v != monitor_init_)
+ {
+ size_t t (monitor_tshold_.load (memory_order_relaxed));
+ if (v > monitor_init_ ? (v >= t) : (v <= t))
+ monitor_tshold_.store (monitor_func_ (v), memory_order_relaxed);
+ }
+ }
+
+ return false;
+ }
+
+ // Try to push the task into the queue falling back to running serially
+ // if the queue is full.
+ //
+ task_queue* tq (queue ()); // Single load.
+ if (tq == nullptr)
+ tq = &create_queue ();
+
+ {
+ lock ql (tq->mutex);
+
+ if (tq->shutdown)
+ throw_generic_error (ECANCELED);
+
+ if (task_data* td = push (*tq))
+ {
+ // Package the task (under lock).
+ //
+ new (&td->data) task {
+ &task_count,
+ start_count,
+ decay_copy (forward<F> (f)),
+ typename task::args_type (decay_copy (forward<A> (a))...)};
+
+ td->thunk = &task_thunk<F, A...>;
+
+ // Increment the task count. This has to be done under lock to prevent
+ // the task from decrementing the count before we had a chance to
+ // increment it.
+ //
+ task_count.fetch_add (1, std::memory_order_release);
+ }
+ else
+ {
+ tq->stat_full++;
+
+ // We have to perform the same mark adjust/restore as in pop_back()
+ // since the task we are about to execute synchronously may try to
+ // work the queue.
+ //
+ // It would have been cleaner to package all this logic into push()
+ // but that would require dragging function/argument types into it.
+ //
+ size_t& s (tq->size);
+ size_t& t (tq->tail);
+ size_t& m (tq->mark);
+
+ size_t om (m);
+ m = task_queue_depth_;
+
+ ql.unlock ();
+ forward<F> (f) (forward<A> (a)...); // Should not throw.
+
+ if (om != task_queue_depth_)
+ {
+ ql.lock ();
+ m = s == 0 ? t : om;
+ }
+
+ return false;
+ }
+ }
+
+ // If there is a spare active thread, wake up (or create) the helper
+ // (unless someone already snatched the task).
+ //
+ if (queued_task_count_.load (std::memory_order_consume) != 0)
+ {
+ lock l (mutex_);
+
+ if (active_ < max_active_)
+ activate_helper (l);
+ }
+
+ return true;
+ }
+
+ template <typename F, typename... A>
+ void scheduler::
+ task_thunk (scheduler& s, lock& ql, void* td)
+ {
+ using task = task_type<F, A...>;
+
+ // Move the data and release the lock.
+ //
+ task t (move (*static_cast<task*> (td)));
+ ql.unlock ();
+
+ t.thunk (std::index_sequence_for<A...> ());
+
+ atomic_count& tc (*t.task_count);
+ if (tc.fetch_sub (1, memory_order_release) - 1 <= t.start_count)
+ s.resume (tc); // Resume waiters, if any.
+ }
+}