diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2017-03-14 14:12:01 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2017-03-14 14:12:01 +0200 |
commit | 0275a8661dce5b89960d2baf6245bf08679fb596 (patch) | |
tree | 2f471e1528ea144c184abce0a6f3376aa344db82 | |
parent | 233255f0e14f364841751755958375fe27380ba6 (diff) |
Fix scheduler bug
-rw-r--r-- | build2/scheduler | 11 | ||||
-rw-r--r-- | build2/scheduler.txx | 57 |
2 files changed, 48 insertions, 20 deletions
diff --git a/build2/scheduler b/build2/scheduler index cfccefe..a8f81cd 100644 --- a/build2/scheduler +++ b/build2/scheduler @@ -463,6 +463,7 @@ namespace build2 { size_t& s (tq.size); size_t& t (tq.tail); + size_t& m (tq.mark); if (s != task_queue_depth_) { @@ -470,6 +471,10 @@ namespace build2 // | | | t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t; s++; + + if (m == task_queue_depth_) // Enable the mark if first push. + m = t; + queued_task_count_.fetch_add (1, std::memory_order_release); return &tq.data[t]; } @@ -523,11 +528,11 @@ namespace build2 task_data& td (tq.data[t]); - // Save the old queue mark and set the new one in case the task we are - // about to run adds sub-tasks. + // Save the old queue mark and disable it in case the task we are about + // to run adds sub-tasks. The first push(), if any, will reset it. // size_t om (m); - m = t; // Where next push() will go. + m = task_queue_depth_; // normal wrap empty // | | | diff --git a/build2/scheduler.txx b/build2/scheduler.txx index c9d2d14..3410b6f 100644 --- a/build2/scheduler.txx +++ b/build2/scheduler.txx @@ -18,23 +18,29 @@ namespace build2 static_assert (std::is_trivially_destructible<task>::value, "not trivially destructible"); - // Push into the queue unless we are running serially or the queue is - // full. + // If running serially, then run the task synchronously. In this case + // there is no need to mess with task count. // - task_data* td (nullptr); - - if (max_active_ != 1) + if (max_active_ == 1) { - task_queue* tq (task_queue_); // Single load. - if (tq == nullptr) - tq = &create_queue (); + forward<F> (f) (forward<A> (a)...); + return false; + } + + // Try to push the task into the queue falling back to running serially + // if the queue is full. + // + task_queue* tq (task_queue_); // Single load. + if (tq == nullptr) + tq = &create_queue (); + { lock ql (tq->mutex); if (tq->shutdown) throw system_error (ECANCELED, std::system_category ()); - if ((td = push (*tq)) != nullptr) + if (task_data* td = push (*tq)) { // Package the task (under lock). // @@ -47,16 +53,33 @@ namespace build2 td->thunk = &task_thunk<F, A...>; } else + { tq->stat_full++; - } - // If serial/full, then run the task synchronously. In this case there is - // no need to mess with task count. - // - if (td == nullptr) - { - forward<F> (f) (forward<A> (a)...); - return false; + // We have to perform the same mark adjust/restore as in pop_back() + // since the task we are about to execute synchronously may try to + // work the queue. + // + // It would have been cleaner to package all this logic into push() + // but that would require dragging function/argument types into it. + // + size_t& s (tq->size); + size_t& t (tq->tail); + size_t& m (tq->mark); + + size_t om (m); + m = task_queue_depth_; + + { + ql.unlock (); + forward<F> (f) (forward<A> (a)...); // Should not throw. + ql.lock (); + } + + m = s == 0 ? t : om; + + return false; + } } // Increment the task count. |