From 92dd62e0e565f177ab5861a9511bc0e303f61a79 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Fri, 23 Aug 2019 14:54:53 +0200 Subject: scheduler --- build2/b.cxx | 18 ++--- libbuild2/algorithm.cxx | 127 +++++++++++++++++++--------------- libbuild2/algorithm.ixx | 6 +- libbuild2/context.cxx | 30 ++++---- libbuild2/context.hxx | 14 ++-- libbuild2/context.ixx | 2 +- libbuild2/function.test.cxx | 3 +- libbuild2/operation.cxx | 12 ++-- libbuild2/test/rule.cxx | 32 ++++----- libbuild2/test/script/builtin.cxx | 4 +- libbuild2/test/script/parser.cxx | 26 +++---- libbuild2/test/script/parser.test.cxx | 2 +- tests/libbuild2/driver.cxx | 2 +- 13 files changed, 144 insertions(+), 134 deletions(-) diff --git a/build2/b.cxx b/build2/b.cxx index 3b19d07..d78da84 100644 --- a/build2/b.cxx +++ b/build2/b.cxx @@ -225,6 +225,8 @@ main (int argc, char* argv[]) << system_error (errno, generic_category ()); // Sanitize. #endif + scheduler sched; + // Parse the command line. // try @@ -584,13 +586,13 @@ main (int argc, char* argv[]) fail << "invalid --max-jobs|-J value"; } - sched.startup (jobs, - 1, - max_jobs, - jobs * ops.queue_depth (), - (ops.max_stack_specified () - ? optional (ops.max_stack () * 1024) - : nullopt)); + sched.startup (jobs, + 1, + max_jobs, + jobs * ops.queue_depth (), + (ops.max_stack_specified () + ? optional (ops.max_stack () * 1024) + : nullopt)); // @@ CTX: should these be per-context? // @@ -615,7 +617,7 @@ main (int argc, char* argv[]) // below). // unique_ptr ctx; - auto new_context = [&ctx, &cmd_vars] + auto new_context = [&ctx, &sched, &cmd_vars] { ctx = nullptr; // Free first. ctx.reset (new context (sched, diff --git a/libbuild2/algorithm.cxx b/libbuild2/algorithm.cxx index 2d2c940..7a616a5 100644 --- a/libbuild2/algorithm.cxx +++ b/libbuild2/algorithm.cxx @@ -169,12 +169,14 @@ namespace build2 target_lock lock_impl (action a, const target& ct, optional wq) { - assert (ct.ctx.phase == run_phase::match); + context& ctx (ct.ctx); + + assert (ctx.phase == run_phase::match); // Most likely the target's state is (count_touched - 1), that is, 0 or // previously executed, so let's start with that. // - size_t b (ct.ctx.count_base ()); + size_t b (ctx.count_base ()); size_t e (b + target::offset_touched - 1); size_t appl (b + target::offset_applied); @@ -210,7 +212,7 @@ namespace build2 // unless we release the phase. // phase_unlock ul (ct.ctx); - e = sched.wait (busy - 1, task_count, *wq); + e = ctx.sched.wait (busy - 1, task_count, *wq); } // We don't lock already applied or executed targets. @@ -248,15 +250,17 @@ namespace build2 void unlock_impl (action a, target& t, size_t offset) { - assert (t.ctx.phase == run_phase::match); + context& ctx (t.ctx); + + assert (ctx.phase == run_phase::match); atomic_count& task_count (t[a].task_count); // Set the task count and wake up any threads that might be waiting for // this target. // - task_count.store (offset + t.ctx.count_base (), memory_order_release); - sched.resume (task_count); + task_count.store (offset + ctx.count_base (), memory_order_release); + ctx.sched.resume (task_count); } target& @@ -632,32 +636,33 @@ namespace build2 // Also pass our diagnostics and lock stacks (this is safe since we // expect the caller to wait for completion before unwinding its stack). // - if (sched.async (start_count, - *task_count, - [a, try_match] (const diag_frame* ds, - const target_lock* ls, - target& t, size_t offset) - { - // Switch to caller's diag and lock stacks. - // - diag_frame::stack_guard dsg (ds); - target_lock::stack_guard lsg (ls); - - try - { - phase_lock pl (t.ctx, run_phase::match); // Throws. - { - target_lock l {a, &t, offset}; // Reassemble. - match_impl (l, false /* step */, try_match); - // Unlock within the match phase. - } - } - catch (const failed&) {} // Phase lock failure. - }, - diag_frame::stack (), - target_lock::stack (), - ref (*ld.target), - ld.offset)) + if (ct.ctx.sched.async ( + start_count, + *task_count, + [a, try_match] (const diag_frame* ds, + const target_lock* ls, + target& t, size_t offset) + { + // Switch to caller's diag and lock stacks. + // + diag_frame::stack_guard dsg (ds); + target_lock::stack_guard lsg (ls); + + try + { + phase_lock pl (t.ctx, run_phase::match); // Throws. + { + target_lock l {a, &t, offset}; // Reassemble. + match_impl (l, false /* step */, try_match); + // Unlock within the match phase. + } + } + catch (const failed&) {} // Phase lock failure. + }, + diag_frame::stack (), + target_lock::stack (), + ref (*ld.target), + ld.offset)) return make_pair (true, target_state::postponed); // Queued. // Matched synchronously, fall through. @@ -1527,6 +1532,8 @@ namespace build2 static target_state execute_impl (action a, target& t) { + context& ctx (t.ctx); + target::opstate& s (t[a]); assert (s.task_count.load (memory_order_consume) == t.ctx.count_busy () @@ -1574,7 +1581,7 @@ namespace build2 { recipe_function** f (s.recipe.target ()); if (f == nullptr || *f != &group_action) - t.ctx.target_count.fetch_sub (1, memory_order_relaxed); + ctx.target_count.fetch_sub (1, memory_order_relaxed); } // Decrement the task count (to count_executed) and wake up any threads @@ -1583,8 +1590,8 @@ namespace build2 size_t tc (s.task_count.fetch_sub ( target::offset_busy - target::offset_executed, memory_order_release)); - assert (tc == t.ctx.count_busy ()); - sched.resume (s.task_count); + assert (tc == ctx.count_busy ()); + ctx.sched.resume (s.task_count); return ts; } @@ -1654,7 +1661,7 @@ namespace build2 execute_recipe (a, t, nullptr /* recipe */); s.task_count.store (exec, memory_order_release); - sched.resume (s.task_count); + ctx.sched.resume (s.task_count); } else { @@ -1664,15 +1671,15 @@ namespace build2 // Pass our diagnostics stack (this is safe since we expect the // caller to wait for completion before unwinding its diag stack). // - if (sched.async (start_count, - *task_count, - [a] (const diag_frame* ds, target& t) - { - diag_frame::stack_guard dsg (ds); - execute_impl (a, t); - }, - diag_frame::stack (), - ref (t))) + if (ctx.sched.async (start_count, + *task_count, + [a] (const diag_frame* ds, target& t) + { + diag_frame::stack_guard dsg (ds); + execute_impl (a, t); + }, + diag_frame::stack (), + ref (t))) return target_state::unknown; // Queued. // Executed synchronously, fall through. @@ -1692,15 +1699,17 @@ namespace build2 target_state execute_direct (action a, const target& ct) { + context& ctx (ct.ctx); + target& t (const_cast (ct)); // MT-aware. target::opstate& s (t[a]); // Similar logic to match() above except we execute synchronously. // - size_t tc (t.ctx.count_applied ()); + size_t tc (ctx.count_applied ()); - size_t exec (t.ctx.count_executed ()); - size_t busy (t.ctx.count_busy ()); + size_t exec (ctx.count_executed ()); + size_t busy (ctx.count_busy ()); if (s.task_count.compare_exchange_strong ( tc, @@ -1722,15 +1731,17 @@ namespace build2 } s.task_count.store (exec, memory_order_release); - sched.resume (s.task_count); + ctx.sched.resume (s.task_count); } } else { // If the target is busy, wait for it. // - if (tc >= busy) sched.wait (exec, s.task_count, scheduler::work_none); - else assert (tc == exec); + if (tc >= busy) + ctx.sched.wait (exec, s.task_count, scheduler::work_none); + else + assert (tc == exec); } return t.executed_state (a); @@ -1796,7 +1807,7 @@ namespace build2 // const auto& tc (mt[a].task_count); if (tc.load (memory_order_acquire) >= busy) - sched.wait (exec, tc, scheduler::work_none); + ctx.sched.wait (exec, tc, scheduler::work_none); r |= mt.executed_state (a); @@ -1848,7 +1859,7 @@ namespace build2 const auto& tc (mt[a].task_count); if (tc.load (memory_order_acquire) >= busy) - sched.wait (exec, tc, scheduler::work_none); + ctx.sched.wait (exec, tc, scheduler::work_none); r |= mt.executed_state (a); @@ -1932,7 +1943,7 @@ namespace build2 const auto& tc (pt[a].task_count); if (tc.load (memory_order_acquire) >= busy) - sched.wait (exec, tc, scheduler::work_none); + ctx.sched.wait (exec, tc, scheduler::work_none); target_state s (pt.executed_state (a)); rs |= s; @@ -1989,6 +2000,8 @@ namespace build2 target_state group_action (action a, const target& t) { + context& ctx (t.ctx); + // If the group is busy, we wait, similar to prerequisites. // const target& g (*t.group); @@ -1996,9 +2009,9 @@ namespace build2 target_state gs (execute (a, g)); if (gs == target_state::busy) - sched.wait (t.ctx.count_executed (), - g[a].task_count, - scheduler::work_none); + ctx.sched.wait (ctx.count_executed (), + g[a].task_count, + scheduler::work_none); // Return target_state::group to signal to execute() that this target's // state comes from the group (which, BTW, can be failed). diff --git a/libbuild2/algorithm.ixx b/libbuild2/algorithm.ixx index 13a3323..9593ac0 100644 --- a/libbuild2/algorithm.ixx +++ b/libbuild2/algorithm.ixx @@ -547,9 +547,9 @@ namespace build2 execute_wait (action a, const target& t) { if (execute (a, t) == target_state::busy) - sched.wait (t.ctx.count_executed (), - t[a].task_count, - scheduler::work_none); + t.ctx.sched.wait (t.ctx.count_executed (), + t[a].task_count, + scheduler::work_none); return t.executed_state (a); } diff --git a/libbuild2/context.cxx b/libbuild2/context.cxx index c454123..f4d8a39 100644 --- a/libbuild2/context.cxx +++ b/libbuild2/context.cxx @@ -55,7 +55,7 @@ namespace build2 sched (s), dry_run_option (dr), keep_going (kg), - phase_mutex (phase), + phase_mutex (*this), scopes (data_->scopes), global_scope (create_global_scope (data_->scopes)), targets (data_->targets), @@ -546,8 +546,6 @@ namespace build2 skip_count.store (0, memory_order_relaxed); } - scheduler sched; - bool run_phase_mutex:: lock (run_phase p) { @@ -573,16 +571,16 @@ namespace build2 // if (u) { - phase_ = p; + ctx_.phase = p; r = !fail_; } - else if (phase_ != p) + else if (ctx_.phase != p) { - sched.deactivate (false /* external */); - for (; phase_ != p; v->wait (l)) ; + ctx_.sched.deactivate (false /* external */); + for (; ctx_.phase != p; v->wait (l)) ; r = !fail_; l.unlock (); // Important: activate() can block. - sched.activate (false /* external */); + ctx_.sched.activate (false /* external */); } else r = !fail_; @@ -628,10 +626,10 @@ namespace build2 { condition_variable* v; - if (lc_ != 0) {phase_ = run_phase::load; v = &lv_;} - else if (mc_ != 0) {phase_ = run_phase::match; v = &mv_;} - else if (ec_ != 0) {phase_ = run_phase::execute; v = &ev_;} - else {phase_ = run_phase::load; v = nullptr;} + if (lc_ != 0) {ctx_.phase = run_phase::load; v = &lv_;} + else if (mc_ != 0) {ctx_.phase = run_phase::match; v = &mv_;} + else if (ec_ != 0) {ctx_.phase = run_phase::execute; v = &ev_;} + else {ctx_.phase = run_phase::load; v = nullptr;} if (v != nullptr) { @@ -678,7 +676,7 @@ namespace build2 if (u) { - phase_ = n; + ctx_.phase = n; r = !fail_; // Notify others that could be waiting for this phase. @@ -691,11 +689,11 @@ namespace build2 } else // phase != n { - sched.deactivate (false /* external */); - for (; phase_ != n; v->wait (l)) ; + ctx_.sched.deactivate (false /* external */); + for (; ctx_.phase != n; v->wait (l)) ; r = !fail_; l.unlock (); // Important: activate() can block. - sched.activate (false /* external */); + ctx_.sched.activate (false /* external */); } } diff --git a/libbuild2/context.hxx b/libbuild2/context.hxx index 5060773..23cbdb9 100644 --- a/libbuild2/context.hxx +++ b/libbuild2/context.hxx @@ -19,6 +19,8 @@ namespace build2 { + class context; + class scope; class scope_map; class target_set; @@ -35,12 +37,6 @@ namespace build2 struct opspec; - // Main scheduler. Started up and shut down in main(). - // - // @@ CTX: move to main(). - // - LIBBUILD2_SYMEXPORT extern scheduler sched; - class LIBBUILD2_SYMEXPORT run_phase_mutex { public: @@ -65,8 +61,8 @@ namespace build2 private: friend class context; - run_phase_mutex (run_phase& p) - : phase_ (p), fail_ (false), lc_ (0), mc_ (0), ec_ (0) {} + run_phase_mutex (context& c) + : ctx_ (c), fail_ (false), lc_ (0), mc_ (0), ec_ (0) {} private: friend struct phase_lock; @@ -83,7 +79,7 @@ namespace build2 // When the mutex is unlocked (all three counters become zero, the phase // is always changed to load (this is also the initial state). // - run_phase& phase_; + context& ctx_; mutex m_; diff --git a/libbuild2/context.ixx b/libbuild2/context.ixx index 4750de0..7fb85ad 100644 --- a/libbuild2/context.ixx +++ b/libbuild2/context.ixx @@ -58,7 +58,7 @@ namespace build2 wait () { phase_unlock u (*ctx, phase); - sched.wait (start_count, *task_count); + ctx->sched.wait (start_count, *task_count); task_count = nullptr; } } diff --git a/libbuild2/function.test.cxx b/libbuild2/function.test.cxx index 670114e..016d846 100644 --- a/libbuild2/function.test.cxx +++ b/libbuild2/function.test.cxx @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -41,7 +42,7 @@ namespace build2 // init_diag (1); init (nullptr, argv[0]); - sched.startup (1); // Serial execution. + scheduler sched (1); // Serial execution. context ctx (sched); function_family f ("dummy"); diff --git a/libbuild2/operation.cxx b/libbuild2/operation.cxx index 080123e..c07d359 100644 --- a/libbuild2/operation.cxx +++ b/libbuild2/operation.cxx @@ -151,7 +151,7 @@ namespace build2 what = " targets to " + diag_do (ctx, a); - mg = sched.monitor ( + mg = ctx.sched.monitor ( ctx.target_count, incr, [incr, &what] (size_t c) -> size_t @@ -274,9 +274,9 @@ namespace build2 // switch (ctx.current_inner_oif->concurrency) { - case 0: sched.tune (1); break; // Run serially. - case 1: break; // Run as is. - default: assert (false); // Not yet supported. + case 0: ctx.sched.tune (1); break; // Run serially. + case 1: break; // Run as is. + default: assert (false); // Not yet supported. } phase_lock pl (ctx, run_phase::execute); // Never switched. @@ -299,7 +299,7 @@ namespace build2 { what = "% of targets " + diag_did (ctx, a); - mg = sched.monitor ( + mg = ctx.sched.monitor ( ctx.target_count, init - incr, [init, incr, &what, &ctx] (size_t c) -> size_t @@ -352,7 +352,7 @@ namespace build2 // We are now running serially. // - sched.tune (0); // Restore original scheduler settings. + ctx.sched.tune (0); // Restore original scheduler settings. // Clear the dry-run flag. // diff --git a/libbuild2/test/rule.cxx b/libbuild2/test/rule.cxx index c0ece26..c216e66 100644 --- a/libbuild2/test/rule.cxx +++ b/libbuild2/test/rule.cxx @@ -542,22 +542,22 @@ namespace build2 { scope_state& r (res.back ()); - if (!sched.async (ctx.count_busy (), - t[a].task_count, - [this] (const diag_frame* ds, - scope_state& r, - const target& t, - const testscript& ts, - const dir_path& wd) - { - diag_frame::stack_guard dsg (ds); - r = perform_script_impl (t, ts, wd, *this); - }, - diag_frame::stack (), - ref (r), - cref (t), - cref (ts), - cref (wd))) + if (!ctx.sched.async (ctx.count_busy (), + t[a].task_count, + [this] (const diag_frame* ds, + scope_state& r, + const target& t, + const testscript& ts, + const dir_path& wd) + { + diag_frame::stack_guard dsg (ds); + r = perform_script_impl (t, ts, wd, *this); + }, + diag_frame::stack (), + ref (r), + cref (t), + cref (ts), + cref (wd))) { // Executed synchronously. If failed and we were not asked to // keep going, bail out. diff --git a/libbuild2/test/script/builtin.cxx b/libbuild2/test/script/builtin.cxx index a725e17..06b0cec 100644 --- a/libbuild2/test/script/builtin.cxx +++ b/libbuild2/test/script/builtin.cxx @@ -1583,7 +1583,7 @@ namespace build2 // Note: can be executed synchronously. // static uint8_t - sleep (scope&, + sleep (scope& s, const strings& args, auto_fd in, auto_fd out, auto_fd err) noexcept try @@ -1637,7 +1637,7 @@ namespace build2 // If/when required we could probably support the precise sleep mode // (e.g., via an option). // - sched.sleep (chrono::seconds (n)); + s.root.test_target.ctx.sched.sleep (chrono::seconds (n)); r = 0; } diff --git a/libbuild2/test/script/parser.cxx b/libbuild2/test/script/parser.cxx index 582237a..8b8f705 100644 --- a/libbuild2/test/script/parser.cxx +++ b/libbuild2/test/script/parser.cxx @@ -3016,19 +3016,19 @@ namespace build2 // UBSan workaround. // const diag_frame* df (diag_frame::stack ()); - if (!sched.async (task_count, - [] (const diag_frame* ds, - scope& s, - script& scr, - runner& r) - { - diag_frame::stack_guard dsg (ds); - execute_impl (s, scr, r); - }, - df, - ref (*chain), - ref (*script_), - ref (*runner_))) + if (!ctx.sched.async (task_count, + [] (const diag_frame* ds, + scope& s, + script& scr, + runner& r) + { + diag_frame::stack_guard dsg (ds); + execute_impl (s, scr, r); + }, + df, + ref (*chain), + ref (*script_), + ref (*runner_))) { // Bail out if the scope has failed and we weren't instructed // to keep going. diff --git a/libbuild2/test/script/parser.test.cxx b/libbuild2/test/script/parser.test.cxx index e45674b..56630fe 100644 --- a/libbuild2/test/script/parser.test.cxx +++ b/libbuild2/test/script/parser.test.cxx @@ -155,7 +155,7 @@ namespace build2 // init_diag (1); init (nullptr, argv[0]); - sched.startup (1); // Serial execution. + scheduler sched (1); // Serial execution. context ctx (sched); bool scope (false); diff --git a/tests/libbuild2/driver.cxx b/tests/libbuild2/driver.cxx index 8ae83d5..a70e707 100644 --- a/tests/libbuild2/driver.cxx +++ b/tests/libbuild2/driver.cxx @@ -26,7 +26,7 @@ main (int, char* argv[]) in::build2_in_load (); version::build2_version_load (); - sched.startup (1); // Serial execution. + scheduler sched (1); // Serial execution. context ctx (sched); return 0; -- cgit v1.1