aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler
diff options
context:
space:
mode:
authorKaren Arutyunov <karen@codesynthesis.com>2017-05-01 18:24:31 +0300
committerKaren Arutyunov <karen@codesynthesis.com>2017-05-01 19:30:26 +0300
commit70317569c6dcd9809ed4a8c425777e653ec6ca08 (patch)
tree07a538b296933e9e2a1f81088f8fcc8da3f749ad /build2/scheduler
parentcbec9ea8841c8a58b2d50bb628b28aea7a6fe179 (diff)
Add hxx extension for headers
Diffstat (limited to 'build2/scheduler')
-rw-r--r--build2/scheduler593
1 files changed, 0 insertions, 593 deletions
diff --git a/build2/scheduler b/build2/scheduler
deleted file mode 100644
index c6f4f7f..0000000
--- a/build2/scheduler
+++ /dev/null
@@ -1,593 +0,0 @@
-// file : build2/scheduler -*- C++ -*-
-// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd
-// license : MIT; see accompanying LICENSE file
-
-#ifndef BUILD2_SCHEDULER
-#define BUILD2_SCHEDULER
-
-#include <list>
-#include <mutex>
-#include <tuple>
-#include <atomic>
-#include <type_traits> // aligned_storage, etc
-#include <condition_variable>
-
-#include <build2/types>
-#include <build2/utility>
-
-namespace build2
-{
- // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g.,
- // running a process), where in comparison thread synchronization overhead
- // is negligible.
- //
- // A thread (called "master") may need to perform several tasks which can be
- // done in parallel (e.g., update all the prerequisites or run all the
- // tests). To acomplish this, the master, via a call to async(), can ask the
- // scheduler to run a task in another thread (called "helper"). If a helper
- // is available, then the task is executed asynchronously by such helper.
- // Otherwise, the task is (normally) executed synchronously as part of the
- // wait() call below. However, in certain cases (serial execution or full
- // queue), the task may be executed synchronously as part of the async()
- // call itself. Once the master thread has scheduled all the tasks, it calls
- // wait() to await for their completion.
- //
- // The scheduler makes sure that only a certain number of threads (for
- // example, the number of available hardware threads) are "active" at any
- // given time. When a master thread calls wait(), it is "suspended" until
- // all its asynchronous tasks are completed (at which point it becomes
- // "ready"). A suspension of a master results in either another ready master
- // being "resumed" or another helper thread becoming available.
- //
- // On completion of a task a helper thread returns to the scheduler which
- // can again lead either to a ready master being resumed (in which case the
- // helper is suspended) or the helper becoming available to perform another
- // task.
- //
- // Note that suspended threads are not reused as helpers. Rather, a new
- // helper thread is always created if none is available. This is done to
- // allow a ready master to continue as soon as possible. If it were reused
- // as a helper, then it could be blocked on a nested wait() further down the
- // stack. All this means that the number of threads created by the scheduler
- // will normally exceed the maximum active allowed.
- //
- class scheduler
- {
- public:
- using atomic_count = std::atomic<size_t>;
-
- // F should return void and not throw any exceptions. The way the result
- // of a task is communicated back to the master thread is ad hoc, usually
- // via "out" arguments. Such result(s) can only be retrieved by the master
- // once its task count reaches the start count.
- //
- // The argument passing semantics is the same as for std::thread. In
- // particular, lvalue-references are passed as copies (use ref()/cref()
- // for the by-reference semantics), except the case where the task is
- // executed synchronously and as part of the async() call itself (this
- // subtlety can become important when passing shared locks; you would
- // only want it to be copied if the task is queued).
- //
- // Return true if the task was queued and false if it was executed
- // synchronously.
- //
- // If the scheduler is shutdown, throw system_error(ECANCELED).
- //
- template <typename F, typename... A>
- bool
- async (size_t start_count, atomic_count& task_count, F&&, A&&...);
-
- template <typename F, typename... A>
- bool
- async (atomic_count& task_count, F&& f, A&&... a)
- {
- return async (0, task_count, forward<F> (f), forward<A> (a)...);
- }
-
- // Wait until the task count reaches the start count or less. If the
- // scheduler is shutdown while waiting, throw system_error(ECANCELED).
- // Return the value of task count. Note that this is a synchronizaiton
- // point (i.e., the task count is checked with memory_order_acquire).
- //
- // Note that it is valid to wait on another thread's task count (that is,
- // without making any async() calls in this thread). However, if the start
- // count differs from the one passed to async(), then whomever sets the
- // start count to this alternative value must also call resume() below
- // in order to signal waiting threads.
- //
- // Note also that in this case (waiting on someone else's start count),
- // the async() call could execute the tasks synchronously without ever
- // incrementing the task count. Thus if waiting on another thread's start
- // count starts before/during async() calls, then it must be "gated" with
- // an alternative (lower) start count.
- //
- // Finally, if waiting on someone else's start count, it may be unsafe
- // (from the deadlock's point of view) to continue working through our own
- // queue (i.e., we may block waiting on a task that has been queued before
- // us which in turn may end up waiting on "us").
- //
- enum work_queue
- {
- work_none, // Don't work own queue.
- work_one, // Work own queue rechecking the task count after every task.
- work_all // Work own queue before rechecking the task count.
- };
-
- size_t
- wait (size_t start_count,
- const atomic_count& task_count,
- work_queue = work_all);
-
- size_t
- wait (const atomic_count& task_count, work_queue wq = work_all)
- {
- return wait (0, task_count, wq);
- }
-
- // Resume threads waiting on this task count.
- //
- void
- resume (const atomic_count& task_count);
-
- // An active thread that is about to wait for potentially significant time
- // on something other than task_count (e.g., mutex, condition variable)
- // should deactivate itself with the scheduler and then reactivate once
- // done waiting.
- //
- void
- deactivate ();
-
- void
- activate (bool collision = false);
-
- // Startup and shutdown.
- //
- public:
- // Unless already shut down, call shutdown() but ignore errors.
- //
- ~scheduler ();
-
- // Create a shut down scheduler.
- //
- scheduler () = default;
-
- // Create a started up scheduler.
- //
- // The initial active argument is the number of threads to assume are
- // already active (e.g., the calling thread). It must not be 0 (since
- // someone has to schedule the first task).
- //
- // If the maximum threads or task queue depth arguments are unspecified,
- // then appropriate defaults are used.
- //
- scheduler (size_t max_active,
- size_t init_active = 1,
- size_t max_threads = 0,
- size_t queue_depth = 0)
- {
- startup (max_active, init_active, max_threads, queue_depth);
- }
-
- // Start the scheduler.
- //
- void
- startup (size_t max_active,
- size_t init_active = 1,
- size_t max_threads = 0,
- size_t queue_depth = 0);
-
- // Return true if the scheduler was started up.
- //
- // Note: can only be called from threads that have observed creation,
- // startup, or shutdown.
- //
- bool
- started () const {return !shutdown_;}
-
- // Tune a started up scheduler.
- //
- // Currently one cannot increase the number of max_active. Pass 0 to
- // restore the initial value.
- //
- // Note that tuning can only be done while the scheduler is inactive, that
- // is, no threads are executing a task or are suspended. For example, in a
- // setup with a single initial active thread that would be after a return
- // from the top-level wait() call.
- //
- void
- tune (size_t max_active);
-
- // Return true if the scheduler is running serial.
- //
- // Note: can only be called from threads that have observed startup.
- //
- bool
- serial () const {return max_active_ == 1;}
-
- // Wait for all the helper threads to terminate. Throw system_error on
- // failure. Note that the initially active threads are not waited for.
- // Return scheduling statistics.
- //
- struct stat
- {
- size_t thread_max_active = 0; // max # of active threads allowed.
- size_t thread_max_total = 0; // max # of total threads allowed.
- size_t thread_helpers = 0; // # of helper threads created.
- size_t thread_max_waiting = 0; // max # of waiters at any time.
-
- size_t task_queue_depth = 0; // # of entries in a queue (capacity).
- size_t task_queue_full = 0; // # of times task queue was full.
- size_t task_queue_remain = 0; // # of tasks remaining in queue.
-
- size_t wait_queue_slots = 0; // # of wait slots (buckets).
- size_t wait_queue_collisions = 0; // # of times slot had been occupied.
- };
-
- stat
- shutdown ();
-
- // If initially active thread(s) (besides the one that calls startup())
- // exist before the call to startup(), then they must call join() before
- // executing any tasks. The two common cases where you don't have to call
- // join are a single active thread that calls startup()/shutdown() or
- // active thread(s) that are created after startup().
- //
- void
- join ()
- {
- assert (task_queue_ = nullptr);
-
- // Lock the mutex to make sure the values set in startup() are visible
- // in this thread.
- //
- lock l (mutex_);
- }
-
- // If initially active thread(s) participate in multiple schedulers and/or
- // sessions (intervals between startup() and shutdown()), then they must
- // call leave() before joining another scheduler/session. Note that this
- // applies to the active thread that calls shutdown(). Note that a thread
- // can only participate in one scheduler at a time.
- //
- void
- leave ()
- {
- task_queue_ = nullptr;
- }
-
- // Return the number of hardware threads or 0 if unable to determine.
- //
- static size_t
- hardware_concurrency ()
- {
- return std::thread::hardware_concurrency ();
- }
-
- // Return a prime number that can be used as a lock shard size that's
- // appropriate for the scheduler's concurrency. Use power of two values
- // for mul for higher-contention shards and for div for lower-contention
- // ones. Always return 1 for serial execution.
- //
- // Note: can only be called from threads that have observed startup.
- //
- size_t
- shard_size (size_t mul = 1, size_t div = 1) const;
-
- private:
- using lock = std::unique_lock<std::mutex>;
-
- void
- activate_helper (lock&);
-
- void
- create_helper (lock&);
-
- // We restrict ourselves to a single pointer as an argument in hope of
- // a small object optimization.
- //
- static void
- helper (void*);
-
- size_t
- suspend (size_t start_count, const atomic_count& task_count);
-
- // Task encapsulation.
- //
- template <typename F, typename... A>
- struct task_type
- {
- using func_type = std::decay_t<F>;
- using args_type = std::tuple<std::decay_t<A>...>;
-
- atomic_count* task_count;
- size_t start_count;
- func_type func;
- args_type args;
-
- template <size_t... i>
- void
- thunk (std::index_sequence<i...>) noexcept
- {
- move (func) (std::get<i> (move (args))...);
- }
- };
-
- template <typename F, typename... A>
- static void
- task_thunk (scheduler&, lock&, void*);
-
- template <typename T>
- static std::decay_t<T>
- decay_copy (T&& x) {return forward<T> (x);}
-
- private:
- std::mutex mutex_;
-
- bool shutdown_ = true; // Shutdown flag.
-
- // The constraints that we must maintain:
- //
- // active <= max_active
- // (init_active + helpers) <= max_threads (soft; see activate_helper())
- //
- // Note that the first three are immutable between startup() and
- // shutdown() so can be accessed without a lock (but see join()).
- //
- size_t init_active_ = 0; // Initially active threads.
- size_t max_active_ = 0; // Maximum number of active threads.
- size_t max_threads_ = 0; // Maximum number of total threads.
-
- size_t helpers_ = 0; // Number of helper threads created so far.
-
- // Every thread that we manage must be accounted for in one of these
- // counters. And their sum should equal (init_active + helpers).
- //
- size_t active_ = 0; // Active master threads executing a task.
- size_t idle_ = 0; // Idle helper threads waiting for a task.
- size_t waiting_ = 0; // Suspended master threads waiting for their tasks.
- size_t ready_ = 0; // Ready master thread waiting to become active.
- size_t starting_ = 0; // Helper threads starting up.
-
- // Original values (as specified during startup) that can be altered via
- // tuning.
- //
- size_t orig_max_active_ = 0;
-
- std::condition_variable idle_condv_; // Idle helpers queue.
- std::condition_variable ready_condv_; // Ready masters queue.
-
- // Statistics counters.
- //
- size_t stat_max_waiters_;
- size_t stat_wait_collisions_;
-
- // Wait queue.
- //
- // A wait slot blocks a bunch of threads. When they are (all) unblocked,
- // they re-examine their respective conditions and either carry on or
- // block again.
- //
- // The wait queue is a shard of slots. A thread picks a slot based on the
- // address of its task count variable. How many slots do we need? This
- // depends on the number of waiters that we can have which cannot be
- // greater than the total number of threads.
- //
- // The pointer to the task count is used to identify the already waiting
- // group of threads for collision statistics.
- //
- struct wait_slot
- {
- std::mutex mutex;
- std::condition_variable condv;
- size_t waiters = 0;
- const atomic_count* task_count;
- bool shutdown = true;
- };
-
- size_t wait_queue_size_; // Proportional to max_threads.
- unique_ptr<wait_slot[]> wait_queue_;
-
- // Task queue.
- //
- // Each queue has its own mutex plus we have an atomic total count of the
- // queued tasks. Note that it should only be modified while holding one
- // of the queue lock.
- //
- atomic_count queued_task_count_;
-
- // For now we only support trivially-destructible tasks.
- //
- struct task_data
- {
- std::aligned_storage<sizeof (void*) * 8>::type data;
- void (*thunk) (scheduler&, lock&, void*);
- };
-
- // We have two requirements: Firstly, we want to keep the master thread
- // (the one that called wait()) busy working though its own queue for as
- // long as possible before (if at all) it "reincarnates" as a helper. The
- // main reason for this is the limited number of helpers we can create.
- //
- // Secondly, we don't want to block wait() longer than necessary since the
- // master thread can do some work with the result. Plus, overall, we want
- // to "unwind" task hierarchies as soon as possible since they hold up
- // resources such as thread's stack. All this means that the master thread
- // can only work through tasks that it has queued at this "level" of the
- // async()/wait() calls since we know that wait() cannot return until
- // they are done.
- //
- // To satisfy the first requirement, the master and helper threads get the
- // tasks from different ends of the queue: master from the back while
- // helpers from the front. And the master always adds new tasks to the
- // back.
- //
- // To satisfy the second requirement, the master thread stores the index
- // of the first task it has queued at this "level" and makes sure it
- // doesn't try to deque any task beyond that.
- //
- size_t task_queue_depth_; // Multiple of max_active.
-
- struct task_queue
- {
- std::mutex mutex;
- bool shutdown = false;
-
- size_t stat_full = 0; // Number of times push() returned NULL.
-
- // Our task queue is circular with head being the index of the first
- // element and tail -- of the last. Since this makes the empty and one
- // element cases indistinguishable, we also keep the size.
- //
- // The mark is an index somewhere between (figuratively speaking) head
- // and tail, if enabled. If the mark is hit, then it is disabled until
- // the queue becomes empty or it is reset by a push.
- //
- size_t head = 0;
- size_t mark = 0;
- size_t tail = 0;
- size_t size = 0;
-
- unique_ptr<task_data[]> data;
-
- task_queue (size_t depth): data (new task_data[depth]) {}
- };
-
- // Task queue API. Expects the queue mutex to be locked.
- //
-
- // Push a new task to the queue returning a pointer to the task data to be
- // filled or NULL if the queue is full.
- //
- task_data*
- push (task_queue& tq)
- {
- size_t& s (tq.size);
- size_t& t (tq.tail);
- size_t& m (tq.mark);
-
- if (s != task_queue_depth_)
- {
- // normal wrap empty
- // | | |
- t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
- s++;
-
- if (m == task_queue_depth_) // Enable the mark if first push.
- m = t;
-
- queued_task_count_.fetch_add (1, std::memory_order_release);
- return &tq.data[t];
- }
-
- return nullptr;
- }
-
- bool
- empty_front (task_queue& tq) const {return tq.size == 0;}
-
- void
- pop_front (task_queue& tq, lock& ql)
- {
- size_t& s (tq.size);
- size_t& h (tq.head);
- size_t& m (tq.mark);
-
- bool a (h == m); // Adjust mark?
- task_data& td (tq.data[h]);
-
- // normal wrap empty
- // | | |
- h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;
-
- if (--s == 0 || a)
- m = h; // Reset or adjust the mark.
-
- queued_task_count_.fetch_sub (1, std::memory_order_release);
-
- // The thunk moves the task data to its stack, releases the lock,
- // and continues to execute the task.
- //
- td.thunk (*this, ql, &td.data);
- ql.lock ();
- }
-
- bool
- empty_back (task_queue& tq) const
- {
- return tq.size == 0 || tq.mark == task_queue_depth_;
- }
-
- void
- pop_back (task_queue& tq, lock& ql)
- {
- size_t& s (tq.size);
- size_t& t (tq.tail);
- size_t& m (tq.mark);
-
- bool a (t == m); // Adjust mark?
-
- task_data& td (tq.data[t]);
-
- // Save the old queue mark and disable it in case the task we are about
- // to run adds sub-tasks. The first push(), if any, will reset it.
- //
- size_t om (m);
- m = task_queue_depth_;
-
- // normal wrap empty
- // | | |
- t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
- --s;
-
- queued_task_count_.fetch_sub (1, std::memory_order_release);
-
- td.thunk (*this, ql, &td.data);
- ql.lock ();
-
- // Restore the old mark (which we might have to adjust).
- //
- if (s == 0)
- m = t; // Reset the mark.
- else if (a)
- m = task_queue_depth_; // Disable the mark.
- else
- // What happens if head goes past the old mark? In this case we will
- // get into the empty queue state before we end up making any (wrong)
- // decisions based on this value. Unfortunately there is no way to
- // detect this (and do some sanity asserts) since things can wrap
- // around.
- //
- // To put it another way, the understanding here is that after the
- // task returns we will either have an empty queue or there will still
- // be tasks between the old mark and the current tail, something along
- // these lines:
- //
- // OOOOOXXXXOOO
- // | | |
- // m h t
- //
- m = om;
- }
-
- // Each thread has its own queue which are stored in this list.
- //
- std::list<task_queue> task_queues_;
-
- // TLS cache of thread's task queue.
- //
- static
-#ifdef __cpp_thread_local
- thread_local
-#else
- __thread
-#endif
- task_queue* task_queue_;
-
- task_queue&
- create_queue ();
- };
-}
-
-#include <build2/scheduler.txx>
-
-#endif // BUILD2_SCHEDULER