diff options
Diffstat (limited to 'libbuild2/scheduler.hxx')
-rw-r--r-- | libbuild2/scheduler.hxx | 209 |
1 files changed, 165 insertions, 44 deletions
diff --git a/libbuild2/scheduler.hxx b/libbuild2/scheduler.hxx index e1bb715..3cc206e 100644 --- a/libbuild2/scheduler.hxx +++ b/libbuild2/scheduler.hxx @@ -5,11 +5,10 @@ #define LIBBUILD2_SCHEDULER_HXX #include <list> -#include <mutex> #include <tuple> #include <atomic> -#include <type_traits> // aligned_storage, etc -#include <condition_variable> +#include <cstddef> // max_align_t +#include <type_traits> // decay, etc #include <libbuild2/types.hxx> #include <libbuild2/utility.hxx> @@ -119,11 +118,49 @@ namespace build2 const atomic_count& task_count, work_queue = work_all); + // As above but assume 0 start_count. + // + size_t + wait (const atomic_count& task_count, work_queue wq = work_all); + + // As above but call lock.unlock() before suspending (can be used to + // unlock the phase). + // + template <typename L> size_t - wait (const atomic_count& task_count, work_queue wq = work_all) + wait (size_t start_count, + const atomic_count& task_count, + L& lock, + work_queue = work_all); + + // Sub-phases. + // + // Note that these functions should be called while holding the lock + // protecting the phase transition, when there are no longer any threads + // in the old phase nor yet any threads in the new phase (or, equivalent, + // for example, if the old phase does not utilize the scheduler). + // + // In particular, for push, while we don't expect any further calls to + // async() or wait() in the old phase (until pop), there could still be + // active threads that haven't had a chance to deactivated themselves yet. + // For pop there should be no remaining tasks queued corresponding to the + // phase being popped. + // + void + push_phase (); + + void + pop_phase (); + + struct phase_guard { - return wait (0, task_count, wq); - } + explicit + phase_guard (scheduler& s): s_ (s) {s_.push_phase ();} + ~phase_guard () {s_.pop_phase ();} + + private: + scheduler& s_; + }; // Mark the queue so that we don't work any tasks that may already be // there. In the normal "bunch of acync() calls followed by wait()" @@ -155,13 +192,15 @@ namespace build2 // // The external flag indicates whether the wait is for an event external // to the scheduler, that is, triggered by something other than one of the - // threads managed by the scheduler. + // threads managed by the scheduler. This is used to suspend deadlock + // detection (which is progress-based and which cannot be measured for + // external events). // void deactivate (bool external); void - activate (bool external, bool = false); + activate (bool external); // Sleep for the specified duration, deactivating the thread before going // to sleep and re-activating it after waking up (which means this @@ -180,7 +219,7 @@ namespace build2 // Allocate additional active thread count to the current active thread, // for example, to be "passed" to an external program: // - // scheduler::alloc_guard ag (ctx.sched, ctx.sched.max_active () / 2); + // scheduler::alloc_guard ag (*ctx.sched, ctx.sched->max_active () / 2); // args.push_back ("-flto=" + to_string (1 + ag.n)); // run (args); // ag.deallocate (); @@ -205,14 +244,38 @@ namespace build2 void deallocate (size_t); + // Similar to allocate() but reserve all the available threads blocking + // until this becomes possible. Call unlock() on the specified lock before + // deactivating and lock() after activating (can be used to unlock the + // phase). Typical usage: + // + // scheduler::alloc_guard ag (*ctx.sched, + // phase_unlock (ctx, true /* delay */)); + // + // Or, without unlocking the phase: + // + // scheduler::alloc_guard ag (*ctx.sched, phase_unlock (nullptr)); + // + template <typename L> + size_t + serialize (L& lock); + struct alloc_guard { size_t n; alloc_guard (): n (0), s_ (nullptr) {} alloc_guard (scheduler& s, size_t m): n (s.allocate (m)), s_ (&s) {} - alloc_guard (alloc_guard&& x): n (x.n), s_ (x.s_) {x.s_ = nullptr;} - alloc_guard& operator= (alloc_guard&& x) + + template <typename L, + typename std::enable_if<!std::is_integral<L>::value, int>::type = 0> + alloc_guard (scheduler& s, L&& l): n (s.serialize (l)), s_ (&s) {} + + alloc_guard (alloc_guard&& x) noexcept + : n (x.n), s_ (x.s_) {x.s_ = nullptr;} + + alloc_guard& + operator= (alloc_guard&& x) noexcept { if (&x != this) { @@ -263,14 +326,25 @@ namespace build2 // If the maximum threads or task queue depth arguments are unspecified, // then appropriate defaults are used. // + // Passing non-zero orig_max_active (normally the real max active) allows + // starting up a pre-tuned scheduler. In particular, starting a pre-tuned + // to serial scheduler is relatively cheap since starting the deadlock + // detection thread is delayed until the scheduler is re-tuned. + // explicit scheduler (size_t max_active, size_t init_active = 1, size_t max_threads = 0, size_t queue_depth = 0, - optional<size_t> max_stack = nullopt) + optional<size_t> max_stack = nullopt, + size_t orig_max_active = 0) { - startup (max_active, init_active, max_threads, queue_depth, max_stack); + startup (max_active, + init_active, + max_threads, + queue_depth, + max_stack, + orig_max_active); } // Start the scheduler. @@ -280,7 +354,8 @@ namespace build2 size_t init_active = 1, size_t max_threads = 0, size_t queue_depth = 0, - optional<size_t> max_stack = nullopt); + optional<size_t> max_stack = nullopt, + size_t orig_max_active = 0); // Return true if the scheduler was started up. // @@ -305,12 +380,19 @@ namespace build2 size_t tune (size_t max_active); + bool + tuned () const {return max_active_ != orig_max_active_;} + struct tune_guard { tune_guard (): s_ (nullptr), o_ (0) {} tune_guard (scheduler& s, size_t ma): s_ (&s), o_ (s_->tune (ma)) {} - tune_guard (tune_guard&& x): s_ (x.s_), o_ (x.o_) {x.s_ = nullptr;} - tune_guard& operator= (tune_guard&& x) + + tune_guard (tune_guard&& x) noexcept + : s_ (x.s_), o_ (x.o_) {x.s_ = nullptr;} + + tune_guard& + operator= (tune_guard&& x) noexcept { if (&x != this) { @@ -378,8 +460,8 @@ namespace build2 { explicit monitor_guard (scheduler* s = nullptr): s_ (s) {} - monitor_guard (monitor_guard&& x): s_ (x.s_) {x.s_ = nullptr;} - monitor_guard& operator= (monitor_guard&& x) + monitor_guard (monitor_guard&& x) noexcept: s_ (x.s_) {x.s_ = nullptr;} + monitor_guard& operator= (monitor_guard&& x) noexcept { if (&x != this) { @@ -442,7 +524,7 @@ namespace build2 static size_t hardware_concurrency () { - return std::thread::hardware_concurrency (); + return build2::thread::hardware_concurrency (); } // Return a prime number that can be used as a lock shard size that's @@ -459,7 +541,7 @@ namespace build2 // to become idle. Return the lock over the scheduler mutex. Normally you // don't need to call this function directly. // - using lock = std::unique_lock<std::mutex>; + using lock = build2::mlock; lock wait_idle (); @@ -495,8 +577,8 @@ namespace build2 atomic_count* task_count; size_t start_count; - func_type func; args_type args; + func_type func; template <size_t... i> void @@ -521,7 +603,7 @@ namespace build2 size_t monitor_init_; // Initial count. function<size_t (size_t)> monitor_func_; - std::mutex mutex_; + build2::mutex mutex_; bool shutdown_ = true; // Shutdown flag. optional<size_t> max_stack_; @@ -561,8 +643,8 @@ namespace build2 // size_t orig_max_active_ = 0; - std::condition_variable idle_condv_; // Idle helpers queue. - std::condition_variable ready_condv_; // Ready masters queue. + build2::condition_variable idle_condv_; // Idle helpers queue. + build2::condition_variable ready_condv_; // Ready masters queue. // Statistics counters. // @@ -581,8 +663,8 @@ namespace build2 // Deadlock detection. // - std::thread dead_thread_; - std::condition_variable dead_condv_; + build2::thread dead_thread_; + build2::condition_variable dead_condv_; static void* deadlock_monitor (void*); @@ -603,8 +685,8 @@ namespace build2 // struct wait_slot { - std::mutex mutex; - std::condition_variable condv; + build2::mutex mutex; + build2::condition_variable condv; size_t waiters = 0; const atomic_count* task_count; bool shutdown = true; @@ -625,7 +707,11 @@ namespace build2 // struct task_data { - std::aligned_storage<sizeof (void*) * 8>::type data; + static const size_t data_size = (sizeof (void*) == 4 + ? sizeof (void*) * 16 + : sizeof (void*) * 8); + + alignas (std::max_align_t) unsigned char data[data_size]; void (*thunk) (scheduler&, lock&, void*); }; @@ -653,29 +739,46 @@ namespace build2 // size_t task_queue_depth_; // Multiple of max_active. - struct task_queue + // Our task queue is circular with head being the index of the first + // element and tail -- of the last. Since this makes the empty and one + // element cases indistinguishable, we also keep the size. + // + // The mark is an index somewhere between (figuratively speaking) head and + // tail, if enabled. If the mark is hit, then it is disabled until the + // queue becomes empty or it is reset by a push. + // + // Note also that the data array can be NULL (lazy allocation) and one + // must make sure it's allocated before calling push(). + // + struct task_queue_data { - std::mutex mutex; - bool shutdown = false; - - size_t stat_full = 0; // Number of times push() returned NULL. - - // Our task queue is circular with head being the index of the first - // element and tail -- of the last. Since this makes the empty and one - // element cases indistinguishable, we also keep the size. - // - // The mark is an index somewhere between (figuratively speaking) head - // and tail, if enabled. If the mark is hit, then it is disabled until - // the queue becomes empty or it is reset by a push. - // size_t head = 0; size_t mark = 0; size_t tail = 0; size_t size = 0; unique_ptr<task_data[]> data; + }; + + struct task_queue: task_queue_data + { + build2::mutex mutex; + bool shutdown = false; + + size_t stat_full = 0; // Number of times push() returned NULL. - task_queue (size_t depth): data (new task_data[depth]) {} + task_queue (size_t depth) {data.reset (new task_data[depth]);} + + void + swap (task_queue_data& d) + { + using std::swap; + swap (head, d.head); + swap (mark, d.mark); + swap (tail, d.tail); + swap (size, d.size); + swap (data, d.data); + } }; // Task queue API. Expects the queue mutex to be locked. @@ -846,6 +949,24 @@ namespace build2 static void queue (task_queue*) noexcept; + + // Sub-phases. + // + small_vector<vector<task_queue_data>, 2> phase_; + + size_t idle_reserve_; + size_t old_max_threads_; + size_t old_eff_max_threads_; + + private: + optional<size_t> + wait_impl (size_t, const atomic_count&, work_queue); + + void + deactivate_impl (bool, lock&&); + + lock + activate_impl (bool, bool); }; } |