aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2017-03-14 14:12:01 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2017-03-14 14:12:01 +0200
commit0275a8661dce5b89960d2baf6245bf08679fb596 (patch)
tree2f471e1528ea144c184abce0a6f3376aa344db82
parent233255f0e14f364841751755958375fe27380ba6 (diff)
Fix scheduler bug
-rw-r--r--build2/scheduler11
-rw-r--r--build2/scheduler.txx57
2 files changed, 48 insertions, 20 deletions
diff --git a/build2/scheduler b/build2/scheduler
index cfccefe..a8f81cd 100644
--- a/build2/scheduler
+++ b/build2/scheduler
@@ -463,6 +463,7 @@ namespace build2
{
size_t& s (tq.size);
size_t& t (tq.tail);
+ size_t& m (tq.mark);
if (s != task_queue_depth_)
{
@@ -470,6 +471,10 @@ namespace build2
// | | |
t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
s++;
+
+ if (m == task_queue_depth_) // Enable the mark if first push.
+ m = t;
+
queued_task_count_.fetch_add (1, std::memory_order_release);
return &tq.data[t];
}
@@ -523,11 +528,11 @@ namespace build2
task_data& td (tq.data[t]);
- // Save the old queue mark and set the new one in case the task we are
- // about to run adds sub-tasks.
+ // Save the old queue mark and disable it in case the task we are about
+ // to run adds sub-tasks. The first push(), if any, will reset it.
//
size_t om (m);
- m = t; // Where next push() will go.
+ m = task_queue_depth_;
// normal wrap empty
// | | |
diff --git a/build2/scheduler.txx b/build2/scheduler.txx
index c9d2d14..3410b6f 100644
--- a/build2/scheduler.txx
+++ b/build2/scheduler.txx
@@ -18,23 +18,29 @@ namespace build2
static_assert (std::is_trivially_destructible<task>::value,
"not trivially destructible");
- // Push into the queue unless we are running serially or the queue is
- // full.
+ // If running serially, then run the task synchronously. In this case
+ // there is no need to mess with task count.
//
- task_data* td (nullptr);
-
- if (max_active_ != 1)
+ if (max_active_ == 1)
{
- task_queue* tq (task_queue_); // Single load.
- if (tq == nullptr)
- tq = &create_queue ();
+ forward<F> (f) (forward<A> (a)...);
+ return false;
+ }
+
+ // Try to push the task into the queue falling back to running serially
+ // if the queue is full.
+ //
+ task_queue* tq (task_queue_); // Single load.
+ if (tq == nullptr)
+ tq = &create_queue ();
+ {
lock ql (tq->mutex);
if (tq->shutdown)
throw system_error (ECANCELED, std::system_category ());
- if ((td = push (*tq)) != nullptr)
+ if (task_data* td = push (*tq))
{
// Package the task (under lock).
//
@@ -47,16 +53,33 @@ namespace build2
td->thunk = &task_thunk<F, A...>;
}
else
+ {
tq->stat_full++;
- }
- // If serial/full, then run the task synchronously. In this case there is
- // no need to mess with task count.
- //
- if (td == nullptr)
- {
- forward<F> (f) (forward<A> (a)...);
- return false;
+ // We have to perform the same mark adjust/restore as in pop_back()
+ // since the task we are about to execute synchronously may try to
+ // work the queue.
+ //
+ // It would have been cleaner to package all this logic into push()
+ // but that would require dragging function/argument types into it.
+ //
+ size_t& s (tq->size);
+ size_t& t (tq->tail);
+ size_t& m (tq->mark);
+
+ size_t om (m);
+ m = task_queue_depth_;
+
+ {
+ ql.unlock ();
+ forward<F> (f) (forward<A> (a)...); // Should not throw.
+ ql.lock ();
+ }
+
+ m = s == 0 ? t : om;
+
+ return false;
+ }
}
// Increment the task count.