aboutsummaryrefslogtreecommitdiff
path: root/libbuild2
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2021-05-12 10:42:42 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2021-05-12 16:36:20 +0200
commit330db1f15d95537e288b4c371a26e43b5a9b2196 (patch)
tree48cd3779bc7849bd0252da28e7b957a2a6c6c615 /libbuild2
parent88379eedeae654391711d8cdda17dfc2be6367ef (diff)
Deal with helper thread starvation during phase switching
The implemented solution entails shadowing old phase queues so that helpers don't pick up old phase tasks and boosting the max_threads count so that we can create more helpers if all the existing ones are stuck in the old phase.
Diffstat (limited to 'libbuild2')
-rw-r--r--libbuild2/adhoc-rule-cxx.cxx4
-rw-r--r--libbuild2/context.cxx60
-rw-r--r--libbuild2/context.hxx12
-rw-r--r--libbuild2/module.cxx4
-rw-r--r--libbuild2/scheduler.cxx146
-rw-r--r--libbuild2/scheduler.hxx81
6 files changed, 261 insertions, 46 deletions
diff --git a/libbuild2/adhoc-rule-cxx.cxx b/libbuild2/adhoc-rule-cxx.cxx
index ece687f..1066f0a 100644
--- a/libbuild2/adhoc-rule-cxx.cxx
+++ b/libbuild2/adhoc-rule-cxx.cxx
@@ -297,6 +297,10 @@ namespace build2
auto_thread_env penv (nullptr);
context& ctx (*t.ctx.module_context);
+ // Enter a scheduler sub-phase.
+ //
+ scheduler::phase_guard pg (ctx.sched);
+
// Mark the queue so that we don't work any tasks that may already be
// there.
//
diff --git a/libbuild2/context.cxx b/libbuild2/context.cxx
index 3ae8a07..e8232c7 100644
--- a/libbuild2/context.cxx
+++ b/libbuild2/context.cxx
@@ -621,7 +621,7 @@ namespace build2
}
bool run_phase_mutex::
- lock (run_phase p)
+ lock (run_phase n)
{
bool r;
@@ -632,7 +632,7 @@ namespace build2
// Increment the counter.
//
condition_variable* v (nullptr);
- switch (p)
+ switch (n)
{
case run_phase::load: lc_++; v = &lv_; break;
case run_phase::match: mc_++; v = &mv_; break;
@@ -645,13 +645,13 @@ namespace build2
//
if (u)
{
- ctx_.phase = p;
+ ctx_.phase = n;
r = !fail_;
}
- else if (ctx_.phase != p)
+ else if (ctx_.phase != n)
{
ctx_.sched.deactivate (false /* external */);
- for (; ctx_.phase != p; v->wait (l)) ;
+ for (; ctx_.phase != n; v->wait (l)) ;
r = !fail_;
l.unlock (); // Important: activate() can block.
ctx_.sched.activate (false /* external */);
@@ -662,7 +662,7 @@ namespace build2
// In case of load, acquire the exclusive access mutex.
//
- if (p == run_phase::load)
+ if (n == run_phase::load)
{
if (!lm_.try_lock ())
{
@@ -677,11 +677,11 @@ namespace build2
}
void run_phase_mutex::
- unlock (run_phase p)
+ unlock (run_phase o)
{
// In case of load, release the exclusive access mutex.
//
- if (p == run_phase::load)
+ if (o == run_phase::load)
lm_.unlock ();
{
@@ -690,25 +690,35 @@ namespace build2
// Decrement the counter and see if this phase has become unlocked.
//
bool u (false);
- switch (p)
+ switch (o)
{
case run_phase::load: u = (--lc_ == 0); break;
case run_phase::match: u = (--mc_ == 0); break;
case run_phase::execute: u = (--ec_ == 0); break;
}
- // If the phase is unlocked, pick a new phase and notify the waiters.
- // Note that we notify all load waiters so that they can all serialize
- // behind the second-level mutex.
+ // If the phase became unlocked, pick a new phase and notify the
+ // waiters. Note that we notify all load waiters so that they can all
+ // serialize behind the second-level mutex.
//
if (u)
{
+ run_phase n;
condition_variable* v;
+ if (lc_ != 0) {n = run_phase::load; v = &lv_;}
+ else if (mc_ != 0) {n = run_phase::match; v = &mv_;}
+ else if (ec_ != 0) {n = run_phase::execute; v = &ev_;}
+ else {n = run_phase::load; v = nullptr;}
- if (lc_ != 0) {ctx_.phase = run_phase::load; v = &lv_;}
- else if (mc_ != 0) {ctx_.phase = run_phase::match; v = &mv_;}
- else if (ec_ != 0) {ctx_.phase = run_phase::execute; v = &ev_;}
- else {ctx_.phase = run_phase::load; v = nullptr;}
+ ctx_.phase = n;
+
+ // Enter/leave scheduler sub-phase. See also the other half in
+ // relock().
+ //
+ if (o == run_phase::match && n == run_phase::execute)
+ ctx_.sched.push_phase ();
+ else if (o == run_phase::execute && n == run_phase::match)
+ ctx_.sched.pop_phase ();
if (v != nullptr)
{
@@ -758,6 +768,14 @@ namespace build2
ctx_.phase = n;
r = !fail_;
+ // Enter/leave scheduler sub-phase. See also the other half in
+ // unlock().
+ //
+ if (o == run_phase::match && n == run_phase::execute)
+ ctx_.sched.push_phase ();
+ else if (o == run_phase::execute && n == run_phase::match)
+ ctx_.sched.pop_phase ();
+
// Notify others that could be waiting for this phase.
//
if (v != nullptr)
@@ -846,7 +864,7 @@ namespace build2
phase_lock_instance = prev;
ctx.phase_mutex.unlock (phase);
- //text << this_thread::get_id () << " phase release " << p;
+ //text << this_thread::get_id () << " phase release " << phase;
}
}
@@ -913,10 +931,13 @@ namespace build2
if (new_phase == run_phase::load) // Note: load lock is exclusive.
ctx.load_generation++;
- //text << this_thread::get_id () << " phase switch " << o << " " << n;
+ //text << this_thread::get_id () << " phase switch "
+ // << old_phase << " " << new_phase;
}
#if 0
+ // NOTE: see push/pop_phase() logic if trying to enable this.
+ //
phase_switch::
phase_switch (phase_unlock&& u, phase_lock&& l)
: old_phase (u.l->phase), new_phase (l.phase)
@@ -950,6 +971,7 @@ namespace build2
if (!r && !uncaught_exception ())
throw failed ();
- //text << this_thread::get_id () << " phase restore " << n << " " << o;
+ //text << this_thread::get_id () << " phase restore "
+ // << new_phase << " " << old_phase;
}
}
diff --git a/libbuild2/context.hxx b/libbuild2/context.hxx
index ad89f58..c93c1c9 100644
--- a/libbuild2/context.hxx
+++ b/libbuild2/context.hxx
@@ -38,8 +38,7 @@ namespace build2
void
unlock (run_phase);
- // Switch from one phase to another. Semantically, just unlock() followed
- // by lock() but more efficient.
+ // Switch from one phase to another.
//
bool
relock (run_phase unlock, run_phase lock);
@@ -116,7 +115,10 @@ namespace build2
// we have switched to another task).
//
// Note that sharing the same scheduler between multiple top-level contexts
- // can currently be problematic due to operation-specific scheduler tuning.
+ // can currently be problematic due to operation-specific scheduler tuning
+ // as all as phase pushing/popping (perhaps this suggest that we should
+ // instead go the multiple communicating schedulers route, a la the job
+ // server).
//
// The loaded_modules state (module.hxx) is shared among all the contexts
// (there is no way to have multiple shared library loading "contexts") and
@@ -635,12 +637,12 @@ namespace build2
wait_guard (context&,
atomic_count& task_count,
- bool phase = false);
+ bool unlock_phase = false);
wait_guard (context&,
size_t start_count,
atomic_count& task_count,
- bool phase = false);
+ bool unlock_phase = false);
void
wait ();
diff --git a/libbuild2/module.cxx b/libbuild2/module.cxx
index fc50aef..2ee29d6 100644
--- a/libbuild2/module.cxx
+++ b/libbuild2/module.cxx
@@ -418,6 +418,10 @@ namespace build2
auto_thread_env penv (nullptr);
context& ctx (*bs.ctx.module_context);
+ // Enter a scheduler sub-phase.
+ //
+ scheduler::phase_guard pg (ctx.sched);
+
// Mark the queue so that we don't work any tasks that may already be
// there (we could be called in strange ways, for example, as part of
// match via dir_search()).
diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx
index 43da681..8c0ea17 100644
--- a/libbuild2/scheduler.cxx
+++ b/libbuild2/scheduler.cxx
@@ -113,7 +113,7 @@ namespace build2
{
ready_condv_.notify_one ();
}
- else if (queued_task_count_.load (std::memory_order_consume) != 0 &&
+ else if (queued_task_count_.load (memory_order_consume) != 0 &&
activate_helper (l))
;
else if (active_ == 0 && external_ == 0)
@@ -405,8 +405,12 @@ namespace build2
if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0)
wait_queue_.reset (new wait_slot[wait_queue_size_]);
- // Reset counters.
+ // Reset other state.
//
+ phase_.clear ();
+
+ idle_reserve_ = 0;
+
stat_max_waiters_ = 0;
stat_wait_collisions_ = 0;
@@ -541,6 +545,132 @@ namespace build2
return r;
}
+ void scheduler::
+ push_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ // Note that we cannot "wait out" until all the old phase threads
+ // deactivate themselves because we are called while holding the phase
+ // transition lock which may prevent that from happening.
+ //
+ lock l (mutex_);
+
+ // Here is the problem: the old phase is likely to have a bunch of waiting
+ // threads with non-empty queues and after switching the phase new helpers
+ // are going to start working those queues (and immediately get blocked
+ // trying to lock the "old" phase). This is further exacerbated by the
+ // fact that helpers get tasks from the front of the queue while new tasks
+ // are added at the back. Which means helpers won't see any "new" phase
+ // tasks until enough of them get "sacrificed" (i.e., blocked) to clear
+ // the old phase backlog (or more like front-log in this case).
+ //
+ // Since none of the old phase tasks can make any progress until we return
+ // to the old phase, we need to somehow "hide" their tasks from the new
+ // phase helpers. The way we are going to do it is to temporarily (until
+ // pop) replace such queues with empty ones. This should be ok since a
+ // thread with such a "shadowed" queue won't wake up until we return to
+ // the old phase.
+ //
+ // Note also that the assumption here is that while we may still have
+ // "phase-less" threads milling around (e.g., transitioning from active to
+ // waiting), they do not access the queue (helpers are a special case that
+ // we deal with by locking the queue).
+ //
+ phase_.emplace_back (task_queues_.size ());
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto j (ph.begin ());
+ for (auto i (task_queues_.begin ()); i != task_queues_.end (); ++i, ++j)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+
+ if (tq.size != 0)
+ {
+ queued_task_count_.fetch_sub (tq.size, memory_order_release);
+
+ // @@ TODO: should we make task_queue::data allocation lazy? On the
+ // other hand, we don't seem to get many non-empty queues here on
+ // real-world projects.
+ //
+ j->data.reset (new task_data[task_queue_depth_]);
+ tq.swap (*j);
+ }
+ }
+
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ // Boost the max_threads limit for the first sub-phase.
+ //
+ // Ideally/long-term we want to redo this by waking up one of the old
+ // phase waiting threads to serve as a helper.
+ //
+ if (phase_.size () == 1)
+ {
+ size_t cur_threads (init_active_ + helpers_ - idle_reserve_);
+
+ old_eff_max_threads_ = (cur_threads > max_threads_
+ ? cur_threads
+ : max_threads_);
+ old_max_threads_ = max_threads_;
+
+ max_threads_ = old_eff_max_threads_ + max_threads_ / 2;
+ idle_reserve_ = 0;
+ }
+ }
+
+ void scheduler::
+ pop_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ lock l (mutex_);
+ assert (!phase_.empty ());
+
+ // Restore the queue sizes.
+ //
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto i (task_queues_.begin ());
+ for (auto j (ph.begin ()); j != ph.end (); ++i, ++j)
+ {
+ if (j->size != 0)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+ tq.swap (*j);
+ queued_task_count_.fetch_add (tq.size, memory_order_release);
+ }
+ }
+
+ phase_.pop_back ();
+
+ // Restore the original limit and reserve idle helpers that we created
+ // above the old (effective) limit.
+ //
+ if (phase_.size () == 0)
+ {
+ size_t cur_threads (init_active_ + helpers_);
+
+ if (cur_threads > old_eff_max_threads_)
+ {
+ idle_reserve_ = cur_threads - old_eff_max_threads_;
+
+ // Not necessarily the case since some helpers may have still picked
+ // up tasks from the old phase and are now in waiting_.
+ //
+ //assert (idle_reserve_ <= idle_);
+ }
+
+ max_threads_ = old_max_threads_;
+ }
+ }
+
scheduler::monitor_guard scheduler::
monitor (atomic_count& c, size_t t, function<size_t (size_t)> f)
{
@@ -566,18 +696,18 @@ namespace build2
if (shutdown_)
return false;
- if (idle_ != 0)
+ if (idle_ > idle_reserve_)
{
idle_condv_.notify_one ();
}
//
// Ignore the max_threads value if we have queued tasks but no active
// threads. This means everyone is waiting for something to happen but
- // nobody is doing anything (e.g., working the queues). This, for
- // example, can happen if a thread waits for a task that is in its queue
- // but is below the mark.
+ // nobody is doing anything (e.g., working the queues). This, for example,
+ // can happen if a thread waits for a task that is in its queue but is
+ // below the mark.
//
- else if (init_active_ + helpers_ < max_threads_ ||
+ else if (init_active_ + helpers_ - idle_reserve_ < max_threads_ ||
(active_ == 0 &&
queued_task_count_.load (memory_order_consume) != 0))
{
@@ -778,6 +908,8 @@ namespace build2
{
s.active_++;
+ // Note: see the push_phase() logic if changing anything here.
+ //
while (s.queued_task_count_.load (memory_order_consume) != 0)
{
// Queues are never removed which means we can get the current range
diff --git a/libbuild2/scheduler.hxx b/libbuild2/scheduler.hxx
index 9556caa..b85d353 100644
--- a/libbuild2/scheduler.hxx
+++ b/libbuild2/scheduler.hxx
@@ -135,6 +135,35 @@ namespace build2
L& lock,
work_queue = work_all);
+ // Sub-phases.
+ //
+ // Note that these functions should be called while holding the lock
+ // protecting the phase transition, when there are no longer any threads
+ // in the old phase nor yet any threads in the new phase (or, equivalent,
+ // for example, if the old phase does not utilize the scheduler).
+ //
+ // In particular, for push, while we don't expect any further calls to
+ // async() or wait() in the old phase (until pop), there could still be
+ // active threads that haven't had a chance to deactivated themselves yet.
+ // For pop there should be no remaining tasks queued corresponding to the
+ // phase being popped.
+ //
+ void
+ push_phase ();
+
+ void
+ pop_phase ();
+
+ struct phase_guard
+ {
+ explicit
+ phase_guard (scheduler& s): s_ (s) {s_.push_phase ();}
+ ~phase_guard () {s_.pop_phase ();}
+
+ private:
+ scheduler& s_;
+ };
+
// Mark the queue so that we don't work any tasks that may already be
// there. In the normal "bunch of acync() calls followed by wait()"
// cases this happens automatically but in special cases where async()
@@ -663,29 +692,43 @@ namespace build2
//
size_t task_queue_depth_; // Multiple of max_active.
- struct task_queue
+ // Our task queue is circular with head being the index of the first
+ // element and tail -- of the last. Since this makes the empty and one
+ // element cases indistinguishable, we also keep the size.
+ //
+ // The mark is an index somewhere between (figuratively speaking) head and
+ // tail, if enabled. If the mark is hit, then it is disabled until the
+ // queue becomes empty or it is reset by a push.
+ //
+ struct task_queue_data
{
- std::mutex mutex;
- bool shutdown = false;
-
- size_t stat_full = 0; // Number of times push() returned NULL.
-
- // Our task queue is circular with head being the index of the first
- // element and tail -- of the last. Since this makes the empty and one
- // element cases indistinguishable, we also keep the size.
- //
- // The mark is an index somewhere between (figuratively speaking) head
- // and tail, if enabled. If the mark is hit, then it is disabled until
- // the queue becomes empty or it is reset by a push.
- //
size_t head = 0;
size_t mark = 0;
size_t tail = 0;
size_t size = 0;
unique_ptr<task_data[]> data;
+ };
+
+ struct task_queue: task_queue_data
+ {
+ std::mutex mutex;
+ bool shutdown = false;
- task_queue (size_t depth): data (new task_data[depth]) {}
+ size_t stat_full = 0; // Number of times push() returned NULL.
+
+ task_queue (size_t depth) {data.reset (new task_data[depth]);}
+
+ void
+ swap (task_queue_data& d)
+ {
+ using std::swap;
+ swap (head, d.head);
+ swap (mark, d.mark);
+ swap (tail, d.tail);
+ swap (size, d.size);
+ swap (data, d.data);
+ }
};
// Task queue API. Expects the queue mutex to be locked.
@@ -857,6 +900,14 @@ namespace build2
static void
queue (task_queue*) noexcept;
+ // Sub-phases.
+ //
+ small_vector<vector<task_queue_data>, 2> phase_;
+
+ size_t idle_reserve_;
+ size_t old_max_threads_;
+ size_t old_eff_max_threads_;
+
private:
optional<size_t>
wait_impl (size_t, const atomic_count&, work_queue);