aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2017-02-10 07:56:33 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2017-02-13 12:42:42 +0200
commitbcb2a89e111a918a48a132a2a29e0c26d724591d (patch)
tree282bd5358f32fa8983c81c0d746d7d05bd5c3ddf
parent107357ca4d02341810f47dee20df034e9c4574e0 (diff)
Redo scheduler task flag as atomic counter
Makes for simpler code and also seems to perform better.
-rw-r--r--build2/scheduler14
-rw-r--r--build2/scheduler.cxx60
-rw-r--r--build2/scheduler.txx17
3 files changed, 38 insertions, 53 deletions
diff --git a/build2/scheduler b/build2/scheduler
index 1ce98e8..d2eb0fc 100644
--- a/build2/scheduler
+++ b/build2/scheduler
@@ -282,7 +282,6 @@ namespace build2
std::mutex mutex_;
bool shutdown_ = true; // Shutdown flag.
- bool task_ = false; // Task queued flag (see below).
// The constraints that we must maintain:
//
@@ -348,10 +347,12 @@ namespace build2
// Task queue.
//
- // Each queue has its own mutex. If the task_ flag above is true then
- // there *might* be a task in one of the queues. If it is false, then
- // it means there are definitely no tasks.
+ // Each queue has its own mutex plus we have an atomic total count of the
+ // queued tasks. Note that it should only be modified while holding one
+ // of the queue lock.
//
+ atomic_count queued_task_count_;
+
// For now we only support trivially-destructible tasks.
//
struct task_data
@@ -427,6 +428,7 @@ namespace build2
// | | |
t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
s++;
+ queued_task_count_.fetch_add (1, std::memory_order_release);
return &tq.data[t];
}
@@ -453,6 +455,8 @@ namespace build2
if (--s == 0 || a)
m = h; // Reset or adjust the mark.
+ queued_task_count_.fetch_sub (1, std::memory_order_release);
+
// The thunk moves the task data to its stack, releases the lock,
// and continues to execute the task.
//
@@ -488,6 +492,8 @@ namespace build2
t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
--s;
+ queued_task_count_.fetch_sub (1, std::memory_order_release);
+
td.thunk (*this, ql, &td.data);
ql.lock ();
diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx
index 0e5e280..308310e 100644
--- a/build2/scheduler.cxx
+++ b/build2/scheduler.cxx
@@ -46,7 +46,7 @@ namespace build2
suspend (size_t start_count, const atomic_count& tc)
{
wait_slot& s (
- wait_queue_[std::hash<const atomic_count*> () (&tc) % wait_queue_size_]);
+ wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]);
// This thread is no longer active.
//
@@ -64,7 +64,7 @@ namespace build2
//
if (ready_ != 0)
ready_condv_.notify_one ();
- else if (task_)
+ else if (queued_task_count_ != 0)
activate_helper (l);
}
@@ -132,7 +132,7 @@ namespace build2
return;
wait_slot& s (
- wait_queue_[std::hash<const atomic_count*> () (&tc) % wait_queue_size_]);
+ wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]);
// See suspend() for why we must hold the lock.
//
@@ -185,6 +185,8 @@ namespace build2
if (max_active != 1)
task_queues_.reserve (max_threads_);
+ queued_task_count_.store (0, memory_order_relaxed);
+
// Pick a nice prime for common max_threads/2 numbers. Experience shows
// that we want something close to 2x for small numbers, then reduce to
// 1.5x in-between, and 1x for large ones.
@@ -237,11 +239,10 @@ namespace build2
stat_max_waiters_ = 0;
stat_wait_collisions_ = 0;
- task_ = false;
- shutdown_ = false;
-
for (size_t i (0); i != wait_queue_size_; ++i)
wait_queue_[i].shutdown = false;
+
+ shutdown_ = false;
}
void scheduler::
@@ -406,50 +407,25 @@ namespace build2
{
s.active_++;
- while (s.task_) // There might be a task.
+ while (s.queued_task_count_ != 0)
{
- // The tricky part here is to clear the task_ flag with confidence.
- // So we quickly scan the queues for any tasks while holding the
- // scheduler lock. If all of them are empty, then we can clear the
- // task_ flag.
+ // Queues are never removed and there shouldn't be any reallocations
+ // since we reserve maximum possible size upfront. Which means we
+ // can get the current number of queues and release the main lock
+ // while examining each of them.
//
- bool empty (true);
+ size_t n (s.task_queues_.size ());
+ l.unlock ();
- for (size_t i (0), n (s.task_queues_.size ()); i != n; ++i)
+ for (size_t i (0); i != n; ++i)
{
task_queue& tq (*s.task_queues_[i]);
- lock ql (tq.mutex); // Lock the queue.
-
- if (tq.shutdown)
- break;
-
- if (!s.empty_front (tq))
- {
- if (empty)
- {
- empty = false;
- l.unlock (); // Release scheduler lock.
- }
-
- // Work on this queue for as long as we can then continue with
- // the rest (in which case empty will be false and scheduler
- // lock is already released).
- //
- // Note that the queues are never removed and there shouldn't be
- // any reallocations since we reserve the maximum possible size
- // upfront.
- //
- do
- s.pop_front (tq, ql);
- while (!tq.shutdown && !s.empty_front (tq));
- }
+ for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); )
+ s.pop_front (tq, ql);
}
- if (empty)
- s.task_ = false; // Scheduler still locked.
- else
- l.lock (); // Relock and rescan.
+ l.lock ();
}
s.active_--;
diff --git a/build2/scheduler.txx b/build2/scheduler.txx
index d5e9afa..f53c044 100644
--- a/build2/scheduler.txx
+++ b/build2/scheduler.txx
@@ -36,7 +36,7 @@ namespace build2
if ((td = push (*tq)) != nullptr)
{
- // Package the task.
+ // Package the task (under lock).
//
new (&td->data) task {
&task_count,
@@ -63,13 +63,16 @@ namespace build2
//
task_count.fetch_add (1, std::memory_order_release);
- lock l (mutex_);
- task_ = true;
-
- // If there is a spare active thread, wake up (or create) the helper.
+ // If there is a spare active thread, wake up (or create) the helper
+ // (unless someone already snatched it).
//
- if (active_ < max_active_)
- activate_helper (l);
+ if (queued_task_count_ != 0)
+ {
+ lock l (mutex_);
+
+ if (active_ < max_active_)
+ activate_helper (l);
+ }
return true;
}