aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libbuild2/adhoc-rule-cxx.cxx4
-rw-r--r--libbuild2/context.cxx60
-rw-r--r--libbuild2/context.hxx12
-rw-r--r--libbuild2/module.cxx4
-rw-r--r--libbuild2/scheduler.cxx146
-rw-r--r--libbuild2/scheduler.hxx81
6 files changed, 261 insertions, 46 deletions
diff --git a/libbuild2/adhoc-rule-cxx.cxx b/libbuild2/adhoc-rule-cxx.cxx
index ece687f..1066f0a 100644
--- a/libbuild2/adhoc-rule-cxx.cxx
+++ b/libbuild2/adhoc-rule-cxx.cxx
@@ -297,6 +297,10 @@ namespace build2
auto_thread_env penv (nullptr);
context& ctx (*t.ctx.module_context);
+ // Enter a scheduler sub-phase.
+ //
+ scheduler::phase_guard pg (ctx.sched);
+
// Mark the queue so that we don't work any tasks that may already be
// there.
//
diff --git a/libbuild2/context.cxx b/libbuild2/context.cxx
index 3ae8a07..e8232c7 100644
--- a/libbuild2/context.cxx
+++ b/libbuild2/context.cxx
@@ -621,7 +621,7 @@ namespace build2
}
bool run_phase_mutex::
- lock (run_phase p)
+ lock (run_phase n)
{
bool r;
@@ -632,7 +632,7 @@ namespace build2
// Increment the counter.
//
condition_variable* v (nullptr);
- switch (p)
+ switch (n)
{
case run_phase::load: lc_++; v = &lv_; break;
case run_phase::match: mc_++; v = &mv_; break;
@@ -645,13 +645,13 @@ namespace build2
//
if (u)
{
- ctx_.phase = p;
+ ctx_.phase = n;
r = !fail_;
}
- else if (ctx_.phase != p)
+ else if (ctx_.phase != n)
{
ctx_.sched.deactivate (false /* external */);
- for (; ctx_.phase != p; v->wait (l)) ;
+ for (; ctx_.phase != n; v->wait (l)) ;
r = !fail_;
l.unlock (); // Important: activate() can block.
ctx_.sched.activate (false /* external */);
@@ -662,7 +662,7 @@ namespace build2
// In case of load, acquire the exclusive access mutex.
//
- if (p == run_phase::load)
+ if (n == run_phase::load)
{
if (!lm_.try_lock ())
{
@@ -677,11 +677,11 @@ namespace build2
}
void run_phase_mutex::
- unlock (run_phase p)
+ unlock (run_phase o)
{
// In case of load, release the exclusive access mutex.
//
- if (p == run_phase::load)
+ if (o == run_phase::load)
lm_.unlock ();
{
@@ -690,25 +690,35 @@ namespace build2
// Decrement the counter and see if this phase has become unlocked.
//
bool u (false);
- switch (p)
+ switch (o)
{
case run_phase::load: u = (--lc_ == 0); break;
case run_phase::match: u = (--mc_ == 0); break;
case run_phase::execute: u = (--ec_ == 0); break;
}
- // If the phase is unlocked, pick a new phase and notify the waiters.
- // Note that we notify all load waiters so that they can all serialize
- // behind the second-level mutex.
+ // If the phase became unlocked, pick a new phase and notify the
+ // waiters. Note that we notify all load waiters so that they can all
+ // serialize behind the second-level mutex.
//
if (u)
{
+ run_phase n;
condition_variable* v;
+ if (lc_ != 0) {n = run_phase::load; v = &lv_;}
+ else if (mc_ != 0) {n = run_phase::match; v = &mv_;}
+ else if (ec_ != 0) {n = run_phase::execute; v = &ev_;}
+ else {n = run_phase::load; v = nullptr;}
- if (lc_ != 0) {ctx_.phase = run_phase::load; v = &lv_;}
- else if (mc_ != 0) {ctx_.phase = run_phase::match; v = &mv_;}
- else if (ec_ != 0) {ctx_.phase = run_phase::execute; v = &ev_;}
- else {ctx_.phase = run_phase::load; v = nullptr;}
+ ctx_.phase = n;
+
+ // Enter/leave scheduler sub-phase. See also the other half in
+ // relock().
+ //
+ if (o == run_phase::match && n == run_phase::execute)
+ ctx_.sched.push_phase ();
+ else if (o == run_phase::execute && n == run_phase::match)
+ ctx_.sched.pop_phase ();
if (v != nullptr)
{
@@ -758,6 +768,14 @@ namespace build2
ctx_.phase = n;
r = !fail_;
+ // Enter/leave scheduler sub-phase. See also the other half in
+ // unlock().
+ //
+ if (o == run_phase::match && n == run_phase::execute)
+ ctx_.sched.push_phase ();
+ else if (o == run_phase::execute && n == run_phase::match)
+ ctx_.sched.pop_phase ();
+
// Notify others that could be waiting for this phase.
//
if (v != nullptr)
@@ -846,7 +864,7 @@ namespace build2
phase_lock_instance = prev;
ctx.phase_mutex.unlock (phase);
- //text << this_thread::get_id () << " phase release " << p;
+ //text << this_thread::get_id () << " phase release " << phase;
}
}
@@ -913,10 +931,13 @@ namespace build2
if (new_phase == run_phase::load) // Note: load lock is exclusive.
ctx.load_generation++;
- //text << this_thread::get_id () << " phase switch " << o << " " << n;
+ //text << this_thread::get_id () << " phase switch "
+ // << old_phase << " " << new_phase;
}
#if 0
+ // NOTE: see push/pop_phase() logic if trying to enable this.
+ //
phase_switch::
phase_switch (phase_unlock&& u, phase_lock&& l)
: old_phase (u.l->phase), new_phase (l.phase)
@@ -950,6 +971,7 @@ namespace build2
if (!r && !uncaught_exception ())
throw failed ();
- //text << this_thread::get_id () << " phase restore " << n << " " << o;
+ //text << this_thread::get_id () << " phase restore "
+ // << new_phase << " " << old_phase;
}
}
diff --git a/libbuild2/context.hxx b/libbuild2/context.hxx
index ad89f58..c93c1c9 100644
--- a/libbuild2/context.hxx
+++ b/libbuild2/context.hxx
@@ -38,8 +38,7 @@ namespace build2
void
unlock (run_phase);
- // Switch from one phase to another. Semantically, just unlock() followed
- // by lock() but more efficient.
+ // Switch from one phase to another.
//
bool
relock (run_phase unlock, run_phase lock);
@@ -116,7 +115,10 @@ namespace build2
// we have switched to another task).
//
// Note that sharing the same scheduler between multiple top-level contexts
- // can currently be problematic due to operation-specific scheduler tuning.
+ // can currently be problematic due to operation-specific scheduler tuning
+ // as all as phase pushing/popping (perhaps this suggest that we should
+ // instead go the multiple communicating schedulers route, a la the job
+ // server).
//
// The loaded_modules state (module.hxx) is shared among all the contexts
// (there is no way to have multiple shared library loading "contexts") and
@@ -635,12 +637,12 @@ namespace build2
wait_guard (context&,
atomic_count& task_count,
- bool phase = false);
+ bool unlock_phase = false);
wait_guard (context&,
size_t start_count,
atomic_count& task_count,
- bool phase = false);
+ bool unlock_phase = false);
void
wait ();
diff --git a/libbuild2/module.cxx b/libbuild2/module.cxx
index fc50aef..2ee29d6 100644
--- a/libbuild2/module.cxx
+++ b/libbuild2/module.cxx
@@ -418,6 +418,10 @@ namespace build2
auto_thread_env penv (nullptr);
context& ctx (*bs.ctx.module_context);
+ // Enter a scheduler sub-phase.
+ //
+ scheduler::phase_guard pg (ctx.sched);
+
// Mark the queue so that we don't work any tasks that may already be
// there (we could be called in strange ways, for example, as part of
// match via dir_search()).
diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx
index 43da681..8c0ea17 100644
--- a/libbuild2/scheduler.cxx
+++ b/libbuild2/scheduler.cxx
@@ -113,7 +113,7 @@ namespace build2
{
ready_condv_.notify_one ();
}
- else if (queued_task_count_.load (std::memory_order_consume) != 0 &&
+ else if (queued_task_count_.load (memory_order_consume) != 0 &&
activate_helper (l))
;
else if (active_ == 0 && external_ == 0)
@@ -405,8 +405,12 @@ namespace build2
if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0)
wait_queue_.reset (new wait_slot[wait_queue_size_]);
- // Reset counters.
+ // Reset other state.
//
+ phase_.clear ();
+
+ idle_reserve_ = 0;
+
stat_max_waiters_ = 0;
stat_wait_collisions_ = 0;
@@ -541,6 +545,132 @@ namespace build2
return r;
}
+ void scheduler::
+ push_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ // Note that we cannot "wait out" until all the old phase threads
+ // deactivate themselves because we are called while holding the phase
+ // transition lock which may prevent that from happening.
+ //
+ lock l (mutex_);
+
+ // Here is the problem: the old phase is likely to have a bunch of waiting
+ // threads with non-empty queues and after switching the phase new helpers
+ // are going to start working those queues (and immediately get blocked
+ // trying to lock the "old" phase). This is further exacerbated by the
+ // fact that helpers get tasks from the front of the queue while new tasks
+ // are added at the back. Which means helpers won't see any "new" phase
+ // tasks until enough of them get "sacrificed" (i.e., blocked) to clear
+ // the old phase backlog (or more like front-log in this case).
+ //
+ // Since none of the old phase tasks can make any progress until we return
+ // to the old phase, we need to somehow "hide" their tasks from the new
+ // phase helpers. The way we are going to do it is to temporarily (until
+ // pop) replace such queues with empty ones. This should be ok since a
+ // thread with such a "shadowed" queue won't wake up until we return to
+ // the old phase.
+ //
+ // Note also that the assumption here is that while we may still have
+ // "phase-less" threads milling around (e.g., transitioning from active to
+ // waiting), they do not access the queue (helpers are a special case that
+ // we deal with by locking the queue).
+ //
+ phase_.emplace_back (task_queues_.size ());
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto j (ph.begin ());
+ for (auto i (task_queues_.begin ()); i != task_queues_.end (); ++i, ++j)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+
+ if (tq.size != 0)
+ {
+ queued_task_count_.fetch_sub (tq.size, memory_order_release);
+
+ // @@ TODO: should we make task_queue::data allocation lazy? On the
+ // other hand, we don't seem to get many non-empty queues here on
+ // real-world projects.
+ //
+ j->data.reset (new task_data[task_queue_depth_]);
+ tq.swap (*j);
+ }
+ }
+
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ // Boost the max_threads limit for the first sub-phase.
+ //
+ // Ideally/long-term we want to redo this by waking up one of the old
+ // phase waiting threads to serve as a helper.
+ //
+ if (phase_.size () == 1)
+ {
+ size_t cur_threads (init_active_ + helpers_ - idle_reserve_);
+
+ old_eff_max_threads_ = (cur_threads > max_threads_
+ ? cur_threads
+ : max_threads_);
+ old_max_threads_ = max_threads_;
+
+ max_threads_ = old_eff_max_threads_ + max_threads_ / 2;
+ idle_reserve_ = 0;
+ }
+ }
+
+ void scheduler::
+ pop_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ lock l (mutex_);
+ assert (!phase_.empty ());
+
+ // Restore the queue sizes.
+ //
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto i (task_queues_.begin ());
+ for (auto j (ph.begin ()); j != ph.end (); ++i, ++j)
+ {
+ if (j->size != 0)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+ tq.swap (*j);
+ queued_task_count_.fetch_add (tq.size, memory_order_release);
+ }
+ }
+
+ phase_.pop_back ();
+
+ // Restore the original limit and reserve idle helpers that we created
+ // above the old (effective) limit.
+ //
+ if (phase_.size () == 0)
+ {
+ size_t cur_threads (init_active_ + helpers_);
+
+ if (cur_threads > old_eff_max_threads_)
+ {
+ idle_reserve_ = cur_threads - old_eff_max_threads_;
+
+ // Not necessarily the case since some helpers may have still picked
+ // up tasks from the old phase and are now in waiting_.
+ //
+ //assert (idle_reserve_ <= idle_);
+ }
+
+ max_threads_ = old_max_threads_;
+ }
+ }
+
scheduler::monitor_guard scheduler::
monitor (atomic_count& c, size_t t, function<size_t (size_t)> f)
{
@@ -566,18 +696,18 @@ namespace build2
if (shutdown_)
return false;
- if (idle_ != 0)
+ if (idle_ > idle_reserve_)
{
idle_condv_.notify_one ();
}
//
// Ignore the max_threads value if we have queued tasks but no active
// threads. This means everyone is waiting for something to happen but
- // nobody is doing anything (e.g., working the queues). This, for
- // example, can happen if a thread waits for a task that is in its queue
- // but is below the mark.
+ // nobody is doing anything (e.g., working the queues). This, for example,
+ // can happen if a thread waits for a task that is in its queue but is
+ // below the mark.
//
- else if (init_active_ + helpers_ < max_threads_ ||
+ else if (init_active_ + helpers_ - idle_reserve_ < max_threads_ ||
(active_ == 0 &&
queued_task_count_.load (memory_order_consume) != 0))
{
@@ -778,6 +908,8 @@ namespace build2
{
s.active_++;
+ // Note: see the push_phase() logic if changing anything here.
+ //
while (s.queued_task_count_.load (memory_order_consume) != 0)
{
// Queues are never removed which means we can get the current range
diff --git a/libbuild2/scheduler.hxx b/libbuild2/scheduler.hxx
index 9556caa..b85d353 100644
--- a/libbuild2/scheduler.hxx
+++ b/libbuild2/scheduler.hxx
@@ -135,6 +135,35 @@ namespace build2
L& lock,
work_queue = work_all);
+ // Sub-phases.
+ //
+ // Note that these functions should be called while holding the lock
+ // protecting the phase transition, when there are no longer any threads
+ // in the old phase nor yet any threads in the new phase (or, equivalent,
+ // for example, if the old phase does not utilize the scheduler).
+ //
+ // In particular, for push, while we don't expect any further calls to
+ // async() or wait() in the old phase (until pop), there could still be
+ // active threads that haven't had a chance to deactivated themselves yet.
+ // For pop there should be no remaining tasks queued corresponding to the
+ // phase being popped.
+ //
+ void
+ push_phase ();
+
+ void
+ pop_phase ();
+
+ struct phase_guard
+ {
+ explicit
+ phase_guard (scheduler& s): s_ (s) {s_.push_phase ();}
+ ~phase_guard () {s_.pop_phase ();}
+
+ private:
+ scheduler& s_;
+ };
+
// Mark the queue so that we don't work any tasks that may already be
// there. In the normal "bunch of acync() calls followed by wait()"
// cases this happens automatically but in special cases where async()
@@ -663,29 +692,43 @@ namespace build2
//
size_t task_queue_depth_; // Multiple of max_active.
- struct task_queue
+ // Our task queue is circular with head being the index of the first
+ // element and tail -- of the last. Since this makes the empty and one
+ // element cases indistinguishable, we also keep the size.
+ //
+ // The mark is an index somewhere between (figuratively speaking) head and
+ // tail, if enabled. If the mark is hit, then it is disabled until the
+ // queue becomes empty or it is reset by a push.
+ //
+ struct task_queue_data
{
- std::mutex mutex;
- bool shutdown = false;
-
- size_t stat_full = 0; // Number of times push() returned NULL.
-
- // Our task queue is circular with head being the index of the first
- // element and tail -- of the last. Since this makes the empty and one
- // element cases indistinguishable, we also keep the size.
- //
- // The mark is an index somewhere between (figuratively speaking) head
- // and tail, if enabled. If the mark is hit, then it is disabled until
- // the queue becomes empty or it is reset by a push.
- //
size_t head = 0;
size_t mark = 0;
size_t tail = 0;
size_t size = 0;
unique_ptr<task_data[]> data;
+ };
+
+ struct task_queue: task_queue_data
+ {
+ std::mutex mutex;
+ bool shutdown = false;
- task_queue (size_t depth): data (new task_data[depth]) {}
+ size_t stat_full = 0; // Number of times push() returned NULL.
+
+ task_queue (size_t depth) {data.reset (new task_data[depth]);}
+
+ void
+ swap (task_queue_data& d)
+ {
+ using std::swap;
+ swap (head, d.head);
+ swap (mark, d.mark);
+ swap (tail, d.tail);
+ swap (size, d.size);
+ swap (data, d.data);
+ }
};
// Task queue API. Expects the queue mutex to be locked.
@@ -857,6 +900,14 @@ namespace build2
static void
queue (task_queue*) noexcept;
+ // Sub-phases.
+ //
+ small_vector<vector<task_queue_data>, 2> phase_;
+
+ size_t idle_reserve_;
+ size_t old_max_threads_;
+ size_t old_eff_max_threads_;
+
private:
optional<size_t>
wait_impl (size_t, const atomic_count&, work_queue);