aboutsummaryrefslogtreecommitdiff
path: root/libbuild2/scheduler.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'libbuild2/scheduler.cxx')
-rw-r--r--libbuild2/scheduler.cxx229
1 files changed, 193 insertions, 36 deletions
diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx
index deb5399..e3fbcc1 100644
--- a/libbuild2/scheduler.cxx
+++ b/libbuild2/scheduler.cxx
@@ -5,8 +5,11 @@
#if defined(__linux__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__)
# include <pthread.h>
-# ifdef __FreeBSD__
+# if defined(__FreeBSD__)
# include <pthread_np.h> // pthread_attr_get_np() (in <pthread.h> on NetBSD)
+# elif defined(__OpenBSD__)
+# include <sys/signal.h>
+# include <pthread_np.h> // pthread_stackseg_np()
# endif
#endif
@@ -50,16 +53,9 @@ namespace build2
scheduler_queue = q;
}
- size_t scheduler::
- wait (size_t start_count, const atomic_count& task_count, work_queue wq)
+ optional<size_t> scheduler::
+ wait_impl (size_t start_count, const atomic_count& task_count, work_queue wq)
{
- // Note that task_count is a synchronization point.
- //
- size_t tc;
-
- if ((tc = task_count.load (memory_order_acquire)) <= start_count)
- return tc;
-
assert (max_active_ != 1); // Serial execution, nobody to wait for.
// See if we can run some of our own tasks.
@@ -71,6 +67,8 @@ namespace build2
//
if (task_queue* tq = queue ())
{
+ size_t tc;
+
for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); )
{
pop_back (*tq, ql);
@@ -91,16 +89,15 @@ namespace build2
}
}
- return suspend (start_count, task_count);
+ return nullopt;
}
void scheduler::
- deactivate (bool external)
+ deactivate_impl (bool external, lock&& rl)
{
- if (max_active_ == 1) // Serial execution.
- return;
+ // Note: assume non-serial execution.
- lock l (mutex_);
+ lock l (move (rl)); // Make sure unlocked on exception.
active_--;
waiting_++;
@@ -118,7 +115,7 @@ namespace build2
{
ready_condv_.notify_one ();
}
- else if (queued_task_count_.load (std::memory_order_consume) != 0 &&
+ else if (queued_task_count_.load (memory_order_consume) != 0 &&
activate_helper (l))
;
else if (active_ == 0 && external_ == 0)
@@ -133,11 +130,10 @@ namespace build2
}
}
- void scheduler::
- activate (bool external, bool collision)
+ scheduler::lock scheduler::
+ activate_impl (bool external, bool collision)
{
- if (max_active_ == 1) // Serial execution.
- return;
+ // Note: assume non-serial execution.
lock l (mutex_);
@@ -162,6 +158,8 @@ namespace build2
if (shutdown_)
throw_generic_error (ECANCELED);
+
+ return l;
}
void scheduler::
@@ -209,7 +207,10 @@ namespace build2
deallocate (size_t n)
{
if (max_active_ == 1) // Serial execution.
+ {
+ assert (n == 0);
return;
+ }
lock l (mutex_);
active_ -= n;
@@ -218,13 +219,15 @@ namespace build2
size_t scheduler::
suspend (size_t start_count, const atomic_count& task_count)
{
+ assert (max_active_ != 1); // Suspend during serial execution?
+
wait_slot& s (
wait_queue_[
hash<const atomic_count*> () (&task_count) % wait_queue_size_]);
// This thread is no longer active.
//
- deactivate (false /* external */);
+ deactivate_impl (false /* external */, lock (mutex_));
// Note that the task count is checked while holding the lock. We also
// have to notify while holding the lock (see resume()). The aim here
@@ -261,7 +264,7 @@ namespace build2
// This thread is no longer waiting.
//
- activate (false /* external */, collision);
+ activate_impl (false /* external */, collision);
return tc;
}
@@ -367,8 +370,14 @@ namespace build2
size_t init_active,
size_t max_threads,
size_t queue_depth,
- optional<size_t> max_stack)
+ optional<size_t> max_stack,
+ size_t orig_max_active)
{
+ if (orig_max_active == 0)
+ orig_max_active = max_active;
+ else
+ assert (max_active <= orig_max_active);
+
// Lock the mutex to make sure our changes are visible in (other) active
// threads.
//
@@ -380,16 +389,18 @@ namespace build2
// were asked to run serially.
//
if (max_threads == 0)
- max_threads = (max_active == 1 ? 1 :
- sizeof (void*) < 8 ? 8 : 32) * max_active;
+ max_threads = (orig_max_active == 1
+ ? 1
+ : (sizeof (void*) < 8 ? 8 : 32) * orig_max_active);
assert (shutdown_ &&
init_active != 0 &&
init_active <= max_active &&
- max_active <= max_threads);
+ orig_max_active <= max_threads);
active_ = init_active_ = init_active;
- max_active_ = orig_max_active_ = max_active;
+ max_active_ = max_active;
+ orig_max_active_ = orig_max_active;
max_threads_ = max_threads;
// This value should be proportional to the amount of hardware concurrency
@@ -403,15 +414,19 @@ namespace build2
//
task_queue_depth_ = queue_depth != 0
? queue_depth
- : max_active * 8;
+ : orig_max_active_ * 8;
queued_task_count_.store (0, memory_order_relaxed);
if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0)
wait_queue_.reset (new wait_slot[wait_queue_size_]);
- // Reset counters.
+ // Reset other state.
//
+ phase_.clear ();
+
+ idle_reserve_ = 0;
+
stat_max_waiters_ = 0;
stat_wait_collisions_ = 0;
@@ -422,6 +437,8 @@ namespace build2
shutdown_ = false;
+ // Delay thread startup if serial.
+ //
if (max_active_ != 1)
dead_thread_ = thread (deadlock_monitor, this);
}
@@ -430,7 +447,7 @@ namespace build2
tune (size_t max_active)
{
// Note that if we tune a parallel scheduler to run serially, we will
- // still have the deadlock monitoring thread running.
+ // still have the deadlock monitoring thread loitering around.
// With multiple initial active threads we will need to make changes to
// max_active_ visible to other threads and which we currently say can be
@@ -452,6 +469,11 @@ namespace build2
lock l (wait_idle ());
swap (max_active_, max_active);
+
+ // Start the deadlock thread if its startup was delayed.
+ //
+ if (max_active_ != 1 && !dead_thread_.joinable ())
+ dead_thread_ = thread (deadlock_monitor, this);
}
return max_active == orig_max_active_ ? 0 : max_active;
@@ -520,7 +542,7 @@ namespace build2
// Wait for the deadlock monitor (the only remaining thread).
//
- if (orig_max_active_ != 1) // See tune() for why not max_active_.
+ if (dead_thread_.joinable ())
{
l.unlock ();
dead_condv_.notify_one ();
@@ -546,6 +568,130 @@ namespace build2
return r;
}
+ void scheduler::
+ push_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ // Note that we cannot "wait out" until all the old phase threads
+ // deactivate themselves because we are called while holding the phase
+ // transition lock which may prevent that from happening.
+ //
+ lock l (mutex_);
+
+ // Here is the problem: the old phase is likely to have a bunch of waiting
+ // threads with non-empty queues and after switching the phase new helpers
+ // are going to start working those queues (and immediately get blocked
+ // trying to lock the "old" phase). This is further exacerbated by the
+ // fact that helpers get tasks from the front of the queue while new tasks
+ // are added at the back. Which means helpers won't see any "new" phase
+ // tasks until enough of them get "sacrificed" (i.e., blocked) to clear
+ // the old phase backlog (or more like front-log in this case).
+ //
+ // Since none of the old phase tasks can make any progress until we return
+ // to the old phase, we need to somehow "hide" their tasks from the new
+ // phase helpers. The way we are going to do it is to temporarily (until
+ // pop) replace such queues with empty ones. This should be ok since a
+ // thread with such a "shadowed" queue won't wake up until we return to
+ // the old phase (but the shadow queue may be used if the thread in
+ // question is also switching to the new phase).
+ //
+ // Note also that the assumption here is that while we may still have
+ // "phase-less" threads milling around (e.g., transitioning from active to
+ // waiting), they do not access the queue (helpers are a special case that
+ // we deal with by locking the queue).
+ //
+ phase_.emplace_back (task_queues_.size ());
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto j (ph.begin ());
+ for (auto i (task_queues_.begin ()); i != task_queues_.end (); ++i, ++j)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+
+ if (tq.size != 0)
+ {
+ // Note that task_queue::data will be allocated lazily (there is a
+ // good chance this queue is not going to be used in the new phase).
+ //
+ queued_task_count_.fetch_sub (tq.size, memory_order_release);
+ tq.swap (*j);
+ }
+ }
+
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ // Boost the max_threads limit for the first sub-phase.
+ //
+ // Ideally/long-term we want to redo this by waking up one of the old
+ // phase waiting threads to serve as a helper.
+ //
+ if (phase_.size () == 1)
+ {
+ size_t cur_threads (init_active_ + helpers_ - idle_reserve_);
+
+ old_eff_max_threads_ = (cur_threads > max_threads_
+ ? cur_threads
+ : max_threads_);
+ old_max_threads_ = max_threads_;
+
+ max_threads_ = old_eff_max_threads_ + max_threads_ / 2;
+ idle_reserve_ = 0;
+ }
+ }
+
+ void scheduler::
+ pop_phase ()
+ {
+ if (max_active_ == 1) // Serial execution.
+ return;
+
+ lock l (mutex_);
+ assert (!phase_.empty ());
+
+ // Restore the queue sizes.
+ //
+ assert (queued_task_count_.load (memory_order_consume) == 0);
+
+ vector<task_queue_data>& ph (phase_.back ());
+
+ auto i (task_queues_.begin ());
+ for (auto j (ph.begin ()); j != ph.end (); ++i, ++j)
+ {
+ if (j->size != 0)
+ {
+ task_queue& tq (*i);
+ lock ql (tq.mutex);
+ tq.swap (*j);
+ queued_task_count_.fetch_add (tq.size, memory_order_release);
+ }
+ }
+
+ phase_.pop_back ();
+
+ // Restore the original limit and reserve idle helpers that we created
+ // above the old (effective) limit.
+ //
+ if (phase_.size () == 0)
+ {
+ size_t cur_threads (init_active_ + helpers_);
+
+ if (cur_threads > old_eff_max_threads_)
+ {
+ idle_reserve_ = cur_threads - old_eff_max_threads_;
+
+ // Not necessarily the case since some helpers may have still picked
+ // up tasks from the old phase and are now in waiting_.
+ //
+ //assert (idle_reserve_ <= idle_);
+ }
+
+ max_threads_ = old_max_threads_;
+ }
+ }
+
scheduler::monitor_guard scheduler::
monitor (atomic_count& c, size_t t, function<size_t (size_t)> f)
{
@@ -571,18 +717,18 @@ namespace build2
if (shutdown_)
return false;
- if (idle_ != 0)
+ if (idle_ > idle_reserve_)
{
idle_condv_.notify_one ();
}
//
// Ignore the max_threads value if we have queued tasks but no active
// threads. This means everyone is waiting for something to happen but
- // nobody is doing anything (e.g., working the queues). This, for
- // example, can happen if a thread waits for a task that is in its queue
- // but is below the mark.
+ // nobody is doing anything (e.g., working the queues). This, for example,
+ // can happen if a thread waits for a task that is in its queue but is
+ // below the mark.
//
- else if (init_active_ + helpers_ < max_threads_ ||
+ else if (init_active_ + helpers_ - idle_reserve_ < max_threads_ ||
(active_ == 0 &&
queued_task_count_.load (memory_order_consume) != 0))
{
@@ -712,6 +858,15 @@ namespace build2
if (r != 0)
throw_system_error (r);
+#elif defined(__OpenBSD__)
+ stack_t s;
+ int r (pthread_stackseg_np (pthread_self (), &s));
+
+ if (r != 0)
+ throw_system_error (r);
+
+ stack_size = s.ss_size;
+
#else // defined(__APPLE__)
stack_size = pthread_get_stacksize_np (pthread_self ());
#endif
@@ -783,6 +938,8 @@ namespace build2
{
s.active_++;
+ // Note: see the push_phase() logic if changing anything here.
+ //
while (s.queued_task_count_.load (memory_order_consume) != 0)
{
// Queues are never removed which means we can get the current range