diff options
Diffstat (limited to 'libbuild2/scheduler.cxx')
-rw-r--r-- | libbuild2/scheduler.cxx | 229 |
1 files changed, 193 insertions, 36 deletions
diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx index deb5399..e3fbcc1 100644 --- a/libbuild2/scheduler.cxx +++ b/libbuild2/scheduler.cxx @@ -5,8 +5,11 @@ #if defined(__linux__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__) # include <pthread.h> -# ifdef __FreeBSD__ +# if defined(__FreeBSD__) # include <pthread_np.h> // pthread_attr_get_np() (in <pthread.h> on NetBSD) +# elif defined(__OpenBSD__) +# include <sys/signal.h> +# include <pthread_np.h> // pthread_stackseg_np() # endif #endif @@ -50,16 +53,9 @@ namespace build2 scheduler_queue = q; } - size_t scheduler:: - wait (size_t start_count, const atomic_count& task_count, work_queue wq) + optional<size_t> scheduler:: + wait_impl (size_t start_count, const atomic_count& task_count, work_queue wq) { - // Note that task_count is a synchronization point. - // - size_t tc; - - if ((tc = task_count.load (memory_order_acquire)) <= start_count) - return tc; - assert (max_active_ != 1); // Serial execution, nobody to wait for. // See if we can run some of our own tasks. @@ -71,6 +67,8 @@ namespace build2 // if (task_queue* tq = queue ()) { + size_t tc; + for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) { pop_back (*tq, ql); @@ -91,16 +89,15 @@ namespace build2 } } - return suspend (start_count, task_count); + return nullopt; } void scheduler:: - deactivate (bool external) + deactivate_impl (bool external, lock&& rl) { - if (max_active_ == 1) // Serial execution. - return; + // Note: assume non-serial execution. - lock l (mutex_); + lock l (move (rl)); // Make sure unlocked on exception. active_--; waiting_++; @@ -118,7 +115,7 @@ namespace build2 { ready_condv_.notify_one (); } - else if (queued_task_count_.load (std::memory_order_consume) != 0 && + else if (queued_task_count_.load (memory_order_consume) != 0 && activate_helper (l)) ; else if (active_ == 0 && external_ == 0) @@ -133,11 +130,10 @@ namespace build2 } } - void scheduler:: - activate (bool external, bool collision) + scheduler::lock scheduler:: + activate_impl (bool external, bool collision) { - if (max_active_ == 1) // Serial execution. - return; + // Note: assume non-serial execution. lock l (mutex_); @@ -162,6 +158,8 @@ namespace build2 if (shutdown_) throw_generic_error (ECANCELED); + + return l; } void scheduler:: @@ -209,7 +207,10 @@ namespace build2 deallocate (size_t n) { if (max_active_ == 1) // Serial execution. + { + assert (n == 0); return; + } lock l (mutex_); active_ -= n; @@ -218,13 +219,15 @@ namespace build2 size_t scheduler:: suspend (size_t start_count, const atomic_count& task_count) { + assert (max_active_ != 1); // Suspend during serial execution? + wait_slot& s ( wait_queue_[ hash<const atomic_count*> () (&task_count) % wait_queue_size_]); // This thread is no longer active. // - deactivate (false /* external */); + deactivate_impl (false /* external */, lock (mutex_)); // Note that the task count is checked while holding the lock. We also // have to notify while holding the lock (see resume()). The aim here @@ -261,7 +264,7 @@ namespace build2 // This thread is no longer waiting. // - activate (false /* external */, collision); + activate_impl (false /* external */, collision); return tc; } @@ -367,8 +370,14 @@ namespace build2 size_t init_active, size_t max_threads, size_t queue_depth, - optional<size_t> max_stack) + optional<size_t> max_stack, + size_t orig_max_active) { + if (orig_max_active == 0) + orig_max_active = max_active; + else + assert (max_active <= orig_max_active); + // Lock the mutex to make sure our changes are visible in (other) active // threads. // @@ -380,16 +389,18 @@ namespace build2 // were asked to run serially. // if (max_threads == 0) - max_threads = (max_active == 1 ? 1 : - sizeof (void*) < 8 ? 8 : 32) * max_active; + max_threads = (orig_max_active == 1 + ? 1 + : (sizeof (void*) < 8 ? 8 : 32) * orig_max_active); assert (shutdown_ && init_active != 0 && init_active <= max_active && - max_active <= max_threads); + orig_max_active <= max_threads); active_ = init_active_ = init_active; - max_active_ = orig_max_active_ = max_active; + max_active_ = max_active; + orig_max_active_ = orig_max_active; max_threads_ = max_threads; // This value should be proportional to the amount of hardware concurrency @@ -403,15 +414,19 @@ namespace build2 // task_queue_depth_ = queue_depth != 0 ? queue_depth - : max_active * 8; + : orig_max_active_ * 8; queued_task_count_.store (0, memory_order_relaxed); if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0) wait_queue_.reset (new wait_slot[wait_queue_size_]); - // Reset counters. + // Reset other state. // + phase_.clear (); + + idle_reserve_ = 0; + stat_max_waiters_ = 0; stat_wait_collisions_ = 0; @@ -422,6 +437,8 @@ namespace build2 shutdown_ = false; + // Delay thread startup if serial. + // if (max_active_ != 1) dead_thread_ = thread (deadlock_monitor, this); } @@ -430,7 +447,7 @@ namespace build2 tune (size_t max_active) { // Note that if we tune a parallel scheduler to run serially, we will - // still have the deadlock monitoring thread running. + // still have the deadlock monitoring thread loitering around. // With multiple initial active threads we will need to make changes to // max_active_ visible to other threads and which we currently say can be @@ -452,6 +469,11 @@ namespace build2 lock l (wait_idle ()); swap (max_active_, max_active); + + // Start the deadlock thread if its startup was delayed. + // + if (max_active_ != 1 && !dead_thread_.joinable ()) + dead_thread_ = thread (deadlock_monitor, this); } return max_active == orig_max_active_ ? 0 : max_active; @@ -520,7 +542,7 @@ namespace build2 // Wait for the deadlock monitor (the only remaining thread). // - if (orig_max_active_ != 1) // See tune() for why not max_active_. + if (dead_thread_.joinable ()) { l.unlock (); dead_condv_.notify_one (); @@ -546,6 +568,130 @@ namespace build2 return r; } + void scheduler:: + push_phase () + { + if (max_active_ == 1) // Serial execution. + return; + + // Note that we cannot "wait out" until all the old phase threads + // deactivate themselves because we are called while holding the phase + // transition lock which may prevent that from happening. + // + lock l (mutex_); + + // Here is the problem: the old phase is likely to have a bunch of waiting + // threads with non-empty queues and after switching the phase new helpers + // are going to start working those queues (and immediately get blocked + // trying to lock the "old" phase). This is further exacerbated by the + // fact that helpers get tasks from the front of the queue while new tasks + // are added at the back. Which means helpers won't see any "new" phase + // tasks until enough of them get "sacrificed" (i.e., blocked) to clear + // the old phase backlog (or more like front-log in this case). + // + // Since none of the old phase tasks can make any progress until we return + // to the old phase, we need to somehow "hide" their tasks from the new + // phase helpers. The way we are going to do it is to temporarily (until + // pop) replace such queues with empty ones. This should be ok since a + // thread with such a "shadowed" queue won't wake up until we return to + // the old phase (but the shadow queue may be used if the thread in + // question is also switching to the new phase). + // + // Note also that the assumption here is that while we may still have + // "phase-less" threads milling around (e.g., transitioning from active to + // waiting), they do not access the queue (helpers are a special case that + // we deal with by locking the queue). + // + phase_.emplace_back (task_queues_.size ()); + vector<task_queue_data>& ph (phase_.back ()); + + auto j (ph.begin ()); + for (auto i (task_queues_.begin ()); i != task_queues_.end (); ++i, ++j) + { + task_queue& tq (*i); + lock ql (tq.mutex); + + if (tq.size != 0) + { + // Note that task_queue::data will be allocated lazily (there is a + // good chance this queue is not going to be used in the new phase). + // + queued_task_count_.fetch_sub (tq.size, memory_order_release); + tq.swap (*j); + } + } + + assert (queued_task_count_.load (memory_order_consume) == 0); + + // Boost the max_threads limit for the first sub-phase. + // + // Ideally/long-term we want to redo this by waking up one of the old + // phase waiting threads to serve as a helper. + // + if (phase_.size () == 1) + { + size_t cur_threads (init_active_ + helpers_ - idle_reserve_); + + old_eff_max_threads_ = (cur_threads > max_threads_ + ? cur_threads + : max_threads_); + old_max_threads_ = max_threads_; + + max_threads_ = old_eff_max_threads_ + max_threads_ / 2; + idle_reserve_ = 0; + } + } + + void scheduler:: + pop_phase () + { + if (max_active_ == 1) // Serial execution. + return; + + lock l (mutex_); + assert (!phase_.empty ()); + + // Restore the queue sizes. + // + assert (queued_task_count_.load (memory_order_consume) == 0); + + vector<task_queue_data>& ph (phase_.back ()); + + auto i (task_queues_.begin ()); + for (auto j (ph.begin ()); j != ph.end (); ++i, ++j) + { + if (j->size != 0) + { + task_queue& tq (*i); + lock ql (tq.mutex); + tq.swap (*j); + queued_task_count_.fetch_add (tq.size, memory_order_release); + } + } + + phase_.pop_back (); + + // Restore the original limit and reserve idle helpers that we created + // above the old (effective) limit. + // + if (phase_.size () == 0) + { + size_t cur_threads (init_active_ + helpers_); + + if (cur_threads > old_eff_max_threads_) + { + idle_reserve_ = cur_threads - old_eff_max_threads_; + + // Not necessarily the case since some helpers may have still picked + // up tasks from the old phase and are now in waiting_. + // + //assert (idle_reserve_ <= idle_); + } + + max_threads_ = old_max_threads_; + } + } + scheduler::monitor_guard scheduler:: monitor (atomic_count& c, size_t t, function<size_t (size_t)> f) { @@ -571,18 +717,18 @@ namespace build2 if (shutdown_) return false; - if (idle_ != 0) + if (idle_ > idle_reserve_) { idle_condv_.notify_one (); } // // Ignore the max_threads value if we have queued tasks but no active // threads. This means everyone is waiting for something to happen but - // nobody is doing anything (e.g., working the queues). This, for - // example, can happen if a thread waits for a task that is in its queue - // but is below the mark. + // nobody is doing anything (e.g., working the queues). This, for example, + // can happen if a thread waits for a task that is in its queue but is + // below the mark. // - else if (init_active_ + helpers_ < max_threads_ || + else if (init_active_ + helpers_ - idle_reserve_ < max_threads_ || (active_ == 0 && queued_task_count_.load (memory_order_consume) != 0)) { @@ -712,6 +858,15 @@ namespace build2 if (r != 0) throw_system_error (r); +#elif defined(__OpenBSD__) + stack_t s; + int r (pthread_stackseg_np (pthread_self (), &s)); + + if (r != 0) + throw_system_error (r); + + stack_size = s.ss_size; + #else // defined(__APPLE__) stack_size = pthread_get_stacksize_np (pthread_self ()); #endif @@ -783,6 +938,8 @@ namespace build2 { s.active_++; + // Note: see the push_phase() logic if changing anything here. + // while (s.queued_task_count_.load (memory_order_consume) != 0) { // Queues are never removed which means we can get the current range |