From ea22643b2217921df74ea14df47d7c83987d5761 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Fri, 9 Dec 2016 17:29:27 +0200 Subject: Initial parallel scheduler implementation, use to run testscrips --- build2/b-options | 12 +- build2/b-options.cxx | 24 ++- build2/b-options.ixx | 16 +- build2/b.cli | 12 +- build2/b.cxx | 59 +++++- build2/buildfile | 1 + build2/scheduler | 453 ++++++++++++++++++++++++++++++++++++++++++ build2/scheduler.cxx | 383 +++++++++++++++++++++++++++++++++++ build2/test/script/lexer.cxx | 17 +- build2/test/script/parser.cxx | 187 ++++++----------- build2/test/script/runner.cxx | 5 +- build2/test/script/script | 17 +- build2/test/script/script.cxx | 67 ++++++- build2/types | 2 + 14 files changed, 1089 insertions(+), 166 deletions(-) create mode 100644 build2/scheduler create mode 100644 build2/scheduler.cxx (limited to 'build2') diff --git a/build2/b-options b/build2/b-options index 6a2088d..f05f483 100644 --- a/build2/b-options +++ b/build2/b-options @@ -402,7 +402,7 @@ namespace build2 V () const; const bool& - q () const; + quiet () const; const uint16_t& verbose () const; @@ -410,6 +410,12 @@ namespace build2 bool verbose_specified () const; + const size_t& + jobs () const; + + bool + jobs_specified () const; + const bool& no_column () const; @@ -473,9 +479,11 @@ namespace build2 public: bool v_; bool V_; - bool q_; + bool quiet_; uint16_t verbose_; bool verbose_specified_; + size_t jobs_; + bool jobs_specified_; bool no_column_; bool no_line_; path buildfile_; diff --git a/build2/b-options.cxx b/build2/b-options.cxx index 7e7a395..37daab6 100644 --- a/build2/b-options.cxx +++ b/build2/b-options.cxx @@ -571,9 +571,11 @@ namespace build2 options () : v_ (), V_ (), - q_ (), + quiet_ (), verbose_ (1), verbose_specified_ (false), + jobs_ (), + jobs_specified_ (false), no_column_ (), no_line_ (), buildfile_ ("buildfile"), @@ -668,7 +670,7 @@ namespace build2 << " equivalent to \033[1m--verbose 3\033[0m." << ::std::endl; os << std::endl - << "\033[1m-q\033[0m Run quietly, only printing error messages. This is" << ::std::endl + << "\033[1m--quiet\033[0m|\033[1m-q\033[0m Run quietly, only printing error messages. This is" << ::std::endl << " equivalent to \033[1m--verbose 0\033[0m." << ::std::endl; os << std::endl @@ -686,6 +688,14 @@ namespace build2 << " 6. Even more detailed information, including state dumps." << ::std::endl; os << std::endl + << "\033[1m--jobs\033[0m|\033[1m-j\033[0m \033[4mnum\033[0m Number of jobs to perform in parallel. This includes both" << ::std::endl + << " the number of active threads inside the build system as" << ::std::endl + << " well as the number of external commands (compilers," << ::std::endl + << " linkers, etc) started but not yet finished. If this option" << ::std::endl + << " is not specified, then the number of available hardware" << ::std::endl + << " threads is used." << ::std::endl; + + os << std::endl << "\033[1m--no-column\033[0m Don't print column numbers in diagnostics." << ::std::endl; os << std::endl @@ -754,11 +764,19 @@ namespace build2 &::build2::cl::thunk< options, bool, &options::v_ >; _cli_options_map_["-V"] = &::build2::cl::thunk< options, bool, &options::V_ >; + _cli_options_map_["--quiet"] = + &::build2::cl::thunk< options, bool, &options::quiet_ >; _cli_options_map_["-q"] = - &::build2::cl::thunk< options, bool, &options::q_ >; + &::build2::cl::thunk< options, bool, &options::quiet_ >; _cli_options_map_["--verbose"] = &::build2::cl::thunk< options, uint16_t, &options::verbose_, &options::verbose_specified_ >; + _cli_options_map_["--jobs"] = + &::build2::cl::thunk< options, size_t, &options::jobs_, + &options::jobs_specified_ >; + _cli_options_map_["-j"] = + &::build2::cl::thunk< options, size_t, &options::jobs_, + &options::jobs_specified_ >; _cli_options_map_["--no-column"] = &::build2::cl::thunk< options, bool, &options::no_column_ >; _cli_options_map_["--no-line"] = diff --git a/build2/b-options.ixx b/build2/b-options.ixx index fb9b10e..190c960 100644 --- a/build2/b-options.ixx +++ b/build2/b-options.ixx @@ -229,9 +229,9 @@ namespace build2 } inline const bool& options:: - q () const + quiet () const { - return this->q_; + return this->quiet_; } inline const uint16_t& options:: @@ -246,6 +246,18 @@ namespace build2 return this->verbose_specified_; } + inline const size_t& options:: + jobs () const + { + return this->jobs_; + } + + inline bool options:: + jobs_specified () const + { + return this->jobs_specified_; + } + inline const bool& options:: no_column () const { diff --git a/build2/b.cli b/build2/b.cli index 86211bb..dab2baa 100644 --- a/build2/b.cli +++ b/build2/b.cli @@ -212,7 +212,7 @@ namespace build2 \cb{--verbose 3}." } - bool -q + bool --quiet|-q { "Run quietly, only printing error messages. This is equivalent to \cb{--verbose 0}." @@ -241,6 +241,16 @@ namespace build2 \li|Even more detailed information, including state dumps.||" } + size_t --jobs|-j + { + "", + "Number of jobs to perform in parallel. This includes both the number of + active threads inside the build system as well as the number of external + commands (compilers, linkers, etc) started but not yet finished. If this + option is not specified, then the number of available hardware threads + is used." + } + bool --no-column { "Don't print column numbers in diagnostics." diff --git a/build2/b.cxx b/build2/b.cxx index 8d06a72..f3502b0 100644 --- a/build2/b.cxx +++ b/build2/b.cxx @@ -29,6 +29,7 @@ #include #include #include +#include #include @@ -164,7 +165,7 @@ main (int argc, char* argv[]) init (argv[0], ops.verbose_specified () ? ops.verbose () - : ops.V () ? 3 : ops.v () ? 2 : ops.q () ? 0 : 1); + : ops.V () ? 3 : ops.v () ? 2 : ops.quiet () ? 0 : 1); // Version. // @@ -244,6 +245,29 @@ main (int argc, char* argv[]) bm["cli"] = mf {nullptr, &cli::init}; } + // Start up the scheduler. + // + size_t jobs (0); + + if (ops.jobs_specified ()) + jobs = ops.jobs (); + + if (jobs == 0) + jobs = scheduler::hardware_concurrency (); + + if (jobs == 0) + { + warn << "unable to determine the number of hardware threads" << + info << "falling back to serial execution" << + info << "use --jobs|-j to override"; + + jobs = 1; + } + + sched.startup (jobs); + + // Trace some overall environment information. + // if (verb >= 5) { const char* p (getenv ("PATH")); @@ -251,6 +275,7 @@ main (int argc, char* argv[]) trace << "work: " << work; trace << "home: " << home; trace << "path: " << (p != nullptr ? p : ""); + trace << "jobs: " << jobs; } // Parse the buildspec. @@ -1045,16 +1070,36 @@ main (int argc, char* argv[]) if (lifted == nullptr && skip == 0) ++mit; } + + // Shutdown the scheduler. + // + scheduler::stat st (sched.shutdown ()); + + if (verb >= (jobs > 1 ? 3 : 4)) + { + info << "scheduler statistics:" << '\n' + << " thread_max_active " << st.thread_max_active << '\n' + << " thread_max_total " << st.thread_max_total << '\n' + << " thread_helpers " << st.thread_helpers << '\n' + << " thread_max_waiting " << st.thread_max_waiting << '\n' + << '\n' + << " task_queue_depth " << st.task_queue_depth << '\n' + << " task_queue_full " << st.task_queue_full << '\n' + << '\n' + << " wait_queue_slots " << st.wait_queue_slots << '\n' + << " wait_queue_collisions " << st.wait_queue_collisions << '\n'; + } + + return 0; } catch (const failed&) { - return 1; // Diagnostics has already been issued. + // Diagnostics has already been issued. } - /* - catch (const std::exception& e) + catch (const system_error& e) { - error << e.what (); - return 1; + error << "unhandled system error: " << e.what (); } - */ + + return 1; } diff --git a/build2/buildfile b/build2/buildfile index fabbe71..bea1dee 100644 --- a/build2/buildfile +++ b/build2/buildfile @@ -27,6 +27,7 @@ exe{b}: \ {hxx cxx}{ prerequisite } \ {hxx cxx}{ rule } \ {hxx }{ rule-map } \ + {hxx cxx}{ scheduler } \ {hxx cxx}{ scope } \ {hxx cxx}{ search } \ {hxx cxx}{ spec } \ diff --git a/build2/scheduler b/build2/scheduler new file mode 100644 index 0000000..99e9524 --- /dev/null +++ b/build2/scheduler @@ -0,0 +1,453 @@ +// file : build2/scheduler -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#ifndef BUILD2_SCHEDULER +#define BUILD2_SCHEDULER + +#include +#include +#include +#include +#include +#include // aligned_storage, etc +#include + +#include +#include + +namespace build2 +{ + // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g., + // running a process), where in comparison thread synchronization overhead + // is negligible. + // + // A thread (called "master") may need to perform several tasks which can be + // done in parallel (e.g., update all the prerequisites or run all the + // tests). To acomplish this, the master, via a call to async(), can ask the + // scheduler to run a task in another thread (called "helper"). If a helper + // is available, then the task is executed asynchronously by such helper. + // Otherwise, the task is executed synchronously as part of the async() + // call. Once the master thread has scheduled all the tasks, it calls wait() + // to await for their completion. + // + // The scheduler makes sure that only a certain number of threads (for + // example, the number of available hardware threads) are "active" at any + // given time (thus the reason why async() may choose to perform the task + // synchronously). When a master thread calls wait(), it is "suspended" + // until all its asynchronous tasks are completed (at which point it becomes + // "ready"). A suspension of a master results in either another ready master + // being "resumed" or another helper thread becoming available. + // + // On completion of a task a helper thread returns to the scheduler which + // can again lead either to a ready master being resumed (in which case the + // helper is suspended) or the helper becoming available to perform another + // task. + // + // Note that suspended threads are not reused as helpers. Rather, a new + // helper thread is always created if none is available. This is done to + // allow a ready master to continue as soon as possible. If it were reused + // as a helper, then it could be blocked on a nested wait() further down the + // stack. This means that the number of threads created by the scheduler + // will normally exceed the maximum active allowed. It should not, however, + // get excessive provided the tasks are not too deeply nested. + // + // @@ Actually, this is not quite correct: this seems to be a function of + // both depth and width of the task tree. Perhaps we should use a hybrid + // approach: when the number of helper threads reaches a certain limit + // we switch to reusing suspended threads? Also, a thread can rake its + // own and its subtask's queues directly in wait() since wait() won't + // return until they are done. + // + class scheduler + { + public: + using atomic_count = std::atomic; + + // F should return void and not throw any exceptions. The way the result + // of a task is communicated back to the master thread is ad hoc, usually + // via "out" arguments. Such result(s) can only be retrieved by the master + // once its task count reaches 0. + // + // The argument passing semantics is the same as for std::thread. In + // particular, lvalue-references are passed as copies (use ref()/cref() + // for the by-reference semantics). + // + // If the scheduler is shutdown, throw system_error(ECANCELED). + // + template + void + async (atomic_count& task_count, F&& f, A&&... a) + { + using task = task_type; + + static_assert (sizeof (task) <= sizeof (task_data), + "insufficient space"); + + static_assert (std::is_trivially_destructible::value, + "not trivially destructible"); + + // Push into the queue unless we are running serially or the queue is + // full. + // + task_data* td (nullptr); + + if (max_active_ != 1) + { + task_queue& tq (task_queue_ != nullptr + ? *task_queue_ + : create_queue ()); + + lock ql (tq.mutex); + + if (tq.shutdown) + throw system_error (ECANCELED, std::system_category ()); + + if ((td = push (tq)) != nullptr) + { + // Package the task. + // + new (&td->data) task { + &task_count, + decay_copy (forward (f)), + {decay_copy (forward (a))...}}; + + td->thunk = &task_thunk; + } + else + tq.stat_full++; + } + + // If serial/full, then run the task synchronously. In this case + // there is no need to mess with task count. + // + if (td == nullptr) + { + forward (f) (forward (a)...); + return; + } + + // Increment the task count. + // + task_count.fetch_add (1, std::memory_order_release); + + lock l (mutex_); + task_ = true; + + // If there is a spare active thread, wake up (or create) the helper. + // + if (active_ < max_active_) + activate_helper (l); + } + + // Wait until the task count reaches 0. If the scheduler is shutdown + // while waiting, throw system_error(ECANCELED). + // + void + wait (atomic_count& task_count); + + // Startup and shutdown. + // + public: + // Unless already shut down, call shutdown() but ignore errors. + // + ~scheduler (); + + // Create a shut down scheduler. + // + scheduler () = default; + + // Create a started up scheduler. + // + // The initial active argument is the number of threads to assume are + // already active (e.g., the calling thread). It must not be 0 (since + // someone has to schedule the first task). + // + // If maximum threads is unspecified, then a generally appropriate default + // limit it used. + // + scheduler (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0) + { + startup (max_active, init_active, max_threads); + } + + // Note: naturally not thread-safe. + // + void + startup (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0); + + // Wait for all the helper threads to terminate. Throw system_error on + // failure. Note that the initially active threads are not waited for. + // Return scheduling statistics. + // + struct stat + { + size_t thread_max_active = 0; // max # of active threads allowed. + size_t thread_max_total = 0; // max # of total threads allowed. + size_t thread_helpers = 0; // # of helper threads created. + size_t thread_max_waiting = 0; // max # of waiters at any time. + + size_t task_queue_depth = 0; // # of entries in a queue (capacity). + size_t task_queue_full = 0; // # of times task queue was full. + + size_t wait_queue_slots = 0; // # of wait slots (buckets). + size_t wait_queue_collisions = 0; // # of times slot had been occupied. + }; + + stat + shutdown (); + + // Return the number of hardware threads or 0 if unable to determine. + // + static size_t + hardware_concurrency () + { + return std::thread::hardware_concurrency (); + } + + private: + using lock = std::unique_lock; + + void + activate_helper (lock&); + + void + create_helper (lock&); + + // We restrict ourselves to a single pointer as an argument in hope of + // a small object optimization. + // + static void + helper (void*); + + void + suspend (atomic_count& task_count); + + void + resume (atomic_count& task_count); + + // Task encapsulation. + // + template + struct task_type + { + atomic_count* task_count; + std::decay_t func; + std::tuple...> args; + + template + void + thunk (std::index_sequence) + { + move (func) (std::get (move (args))...); + } + }; + + template + static void + task_thunk (scheduler& s, lock& ql, void* td) + { + using task = task_type; + + // Move the data and release the lock. + // + task t (move (*static_cast (td))); + ql.unlock (); + + t.thunk (std::index_sequence_for ()); + + atomic_count& tc (*t.task_count); + if (--tc == 0) + s.resume (tc); // Resume a waiter, if any. + } + + template + static std::decay_t + decay_copy (T&& x) {return forward (x);} + + private: + std::mutex mutex_; + + bool shutdown_ = true; // Shutdown flag. + bool task_ = false; // Task queued flag (see below). + + // The constraints that we must maintain: + // + // active <= max_active + // (init_active + helpers) <= max_threads + // + // Note that the first three are immutable between startup() and + // shutdown() so can be accessed without a lock (@@ What about + // the case of multiple initially active?). + // + size_t init_active_ = 0; // Initially active threads. + size_t max_active_ = 0; // Maximum number of active threads. + size_t max_threads_ = 0; // Maximum number of total threads. + + size_t helpers_ = 0; // Number of helper threads created so far. + + // Every thread that we manage must be accounted for in one of these + // counters. And their sum should equal (init_active + helpers). + // + size_t active_ = 0; // Active master threads executing a task. + size_t idle_ = 0; // Idle helper threads waiting for a task. + size_t waiting_ = 0; // Suspended master threads waiting for their tasks. + size_t ready_ = 0; // Ready master thread waiting to become active. + size_t starting_ = 0; // Helper threads starting up. + + std::condition_variable idle_condv_; // Idle helpers queue. + std::condition_variable ready_condv_; // Ready masters queue. + + // Statistics counters. + // + size_t stat_max_waiters_; + size_t stat_wait_collisions_; + + // Wait queue. + // + // A wait slot blocks a bunch of threads. When they are (all) unblocked, + // they re-examine their respective conditions and either carry on or + // block again. + // + // The wait queue is a shard of slots. A thread picks a slot based on the + // address of its task count variable. How many slots do we need? This + // depends on the number of waiters that we can have which cannot be + // greater than the total number of threads. + // + struct wait_slot + { + std::mutex mutex; + std::condition_variable condv; + size_t waiters = 0; + bool shutdown = true; + }; + + size_t wait_queue_size_; // Multiple of max_threads. + unique_ptr wait_queue_; + + // Task queue. + // + // Each queue has its own mutex. If the task_ flag above is true then + // there *might* be a task in one of the queues. If it is false, then + // it means there are either no tasks or someone is busy working on + // them. + // + // For now we only support trivially-destructible tasks. + // + struct task_data + { + std::aligned_storage::type data; + void (*thunk) (scheduler&, lock&, void*); + }; + + // We have two requirements: Firstly, we want to keep the master thread + // (the one that called wait()) busy working though its own queue for as + // long as possible before (if at all) it "reincarnates" as a helper. The + // main reason for this is the limited number of helpers we can create. + // + // Secondly, we don't want to block wait() longer than necessary since the + // master thread can do some work with the result. Plus, overall, we want + // to "unwind" task hierarchies as soon as possible since they hold up + // resources such as thread's stack. All this means that the master thread + // can only work through tasks that it has queued at this "level" of the + // async()/wait() calls since we know that wait() cannot return until + // they are done. + // + // To satisfy the first requirement, the master and helper threads get the + // tasks from different ends of the queue: master from the back while + // helpers from the front. And the master always adds new tasks to the + // back. + // + // To satisfy the second requirement, the master thread stores the index + // of the first task it has queued at this "level" and makes sure it + // doesn't try to deque any task beyond that. + // + size_t task_queue_depth_; // Multiple of max_active. + + struct task_queue + { + std::mutex mutex; + bool shutdown = false; + + size_t stat_full = 0; // Number of times pop() returned NULL. + + // head <= stop <= tail + // + size_t head = 0; // Index of the first element. + size_t stop = 0; // Index of the element to stop at in pop_back(). + size_t tail = 0; // Index of the one past last element. + + unique_ptr data; + + task_queue (size_t depth): data (new task_data[depth]) {} + }; + + // Task queue API. Expects the queue mutex to be locked. + // + + // Push a new task to the queue returning a pointer to the task data to be + // filled or NULL if the queue is full. + // + task_data* + push (task_queue& tq) + { + return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr; + } + + bool + empty_front (task_queue& tq) const {return tq.head == tq.tail;} + + void + pop_front (task_queue& tq, lock& ql) + { + task_data& td (tq.data[tq.head++]); + + if (tq.head == tq.tail) + tq.stop = tq.head = tq.tail = 0; // Reset. + else if (tq.head > tq.stop) + tq.stop = tq.head; + + // The thunk moves the task data to its stack, releases the lock, + // and continues to execute the task. + // + td.thunk (*this, ql, &td.data); + } + + bool + empty_back (task_queue& tq) const {return tq.stop == tq.tail;} + + void + pop_back (task_queue& tq, lock& ql) + { + task_data& td (tq.data[--tq.tail]); + + if (tq.head == tq.tail) + tq.stop = tq.head = tq.tail = 0; + + td.thunk (*this, ql, &td.data); + } + + // Each thread has its own queue. Instead of allocating all max_threads of + // them up front, we will reserve the space but will only construct queues + // on demand. + // + vector> task_queues_; + + // TLS cache of each thread's task queue. + // + thread_local static task_queue* task_queue_; + + task_queue& + create_queue (); + }; + + // Main (and only) scheduler. Started up and shut down in main(). + // + extern scheduler sched; +} + +#endif // BUILD2_SCHEDULER diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx new file mode 100644 index 0000000..b62d2d4 --- /dev/null +++ b/build2/scheduler.cxx @@ -0,0 +1,383 @@ +// file : build2/scheduler.cxx -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#include + +using namespace std; + +namespace build2 +{ + void scheduler:: + wait (atomic_count& task_count) + { + if (task_count == 0) + return; + + // See if we can run some of our own tasks. + // + task_queue& tq (*task_queue_); // Must have been initializied by async(). + + for (lock ql (tq.mutex); !tq.shutdown && !empty_back (tq); ) + { + // Save the old stop point and set the new one in case the task we are + // about to run adds sub-tasks. + // + size_t stop (tq.stop); + tq.stop = tq.tail - 1; // Index of the first sub-task to be added (-1 + // is for the following pop_back()). + + pop_back (tq, ql); // Releases the lock. + ql.lock (); + + // Restore the old stop point which we might have to adjust. + // + tq.stop = tq.head > stop ? tq.head : tq.tail < stop ? tq.tail : stop; + } + + // Note that empty task queue doesn't automatically mean the task count is + // zero (some might still be executing asynchronously). + // + if (task_count == 0) + return; + + suspend (task_count); + } + + void scheduler:: + suspend (atomic_count& tc) + { + wait_slot& s ( + wait_queue_[std::hash () (&tc) % wait_queue_size_]); + + // This thread is no longer active. + // + { + lock l (mutex_); + active_--; + waiting_++; + + if (waiting_ > stat_max_waiters_) + stat_max_waiters_ = waiting_; + + // A spare active thread has become available. If there are ready + // masters or eager helpers, wake someone up. + // + if (ready_ != 0) + ready_condv_.notify_one (); + else if (task_) + activate_helper (l); + } + + // Note that the task count is checked while holding the lock. We also + // have to notify while holding the lock (see resume()). The aim here + // is not to end up with a notification that happens between the check + // and the wait. + // + bool collision; + { + lock l (s.mutex); + collision = (s.waiters++ != 0); + + // Since we use a mutex for synchronization, we can relax the atomic + // access. + // + while (!s.shutdown && tc.load (std::memory_order_relaxed) != 0) + s.condv.wait (l); + + s.waiters--; + } + + // This thread is no longer waiting. + // + { + lock l (mutex_); + waiting_--; + + if (collision) + stat_wait_collisions_++; + + // If we have spare active threads, then become active. Otherwise it + // enters the ready queue. + // + ready_++; + + while (!shutdown_ && active_ >= max_active_) + ready_condv_.wait (l); + + ready_--; + + if (shutdown_) + throw system_error (ECANCELED, system_category ()); + + active_++; + } + } + + void scheduler:: + resume (atomic_count& tc) + { + wait_slot& s ( + wait_queue_[std::hash () (&tc) % wait_queue_size_]); + + // See suspend() for why we must hold the lock. + // + lock l (s.mutex); + + if (s.waiters != 0) + s.condv.notify_all (); + } + + scheduler:: + ~scheduler () + { + try { shutdown (); } catch (system_error&) {} + } + + void scheduler:: + startup (size_t max_active, size_t init_active, size_t max_threads) + { + // Use 4x max_active on 32-bit and 8x max_active on 64-bit. Unless we were + // asked to run serially. + // + if (max_threads == 0) + max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*)); + + assert (shutdown_ && + init_active != 0 && + init_active <= max_active && + max_active <= max_threads); + + active_ = init_active_ = init_active; + max_active_ = max_active; + max_threads_ = max_threads; + + // This value should be proportional to the amount of hardware concurrency + // we have. Note that the queue entry is quite sizable. + // + task_queue_depth_ = max_active * sizeof (void*) * 4; + + task_queues_.clear (); + task_queues_.reserve (max_threads_); + + // Pick a nice prime for common max_threads numbers. Though Intel Xeons + // are all over the map when it comes to cores (6, 8, 10, 12, 14, 16, + // 18, 20, 22). + // + wait_queue_size_ = // HW threads x bits + max_threads == 8 ? 17 : // 2 x 4 + max_threads == 16 ? 31 : // 4 x 4, 2 x 8 + max_threads == 32 ? 63 : // 4 x 8 + max_threads == 48 ? 97 : // 6 x 8 + max_threads == 64 ? 127 : // 8 x 8 + max_threads == 96 ? 191 : // 12 x 8 + max_threads == 128 ? 257 : // 16 x 8 + max_threads == 192 ? 383 : // 24 x 8 + max_threads == 256 ? 509 : // 32 x 8 + max_threads == 384 ? 769 : // 48 x 8 + max_threads == 512 ? 1021 : // 64 x 8 + 2 * max_threads - 1; + + wait_queue_.reset (new wait_slot[wait_queue_size_]); + + // Reset stats counters. + // + stat_max_waiters_ = 0; + stat_wait_collisions_ = 0; + + task_ = false; + shutdown_ = false; + + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].shutdown = false; + } + + auto scheduler:: + shutdown () -> stat + { + // Our overall approach to shutdown is not to try and stop everything as + // quickly as possible but rather to avoid performing any tasks. This + // avoids having code littered with if(shutdown) on every second line. + + stat r; + lock l (mutex_); + + if (!shutdown_) + { + // Signal shutdown and collect statistics. + // + shutdown_ = true; + + for (size_t i (0); i != wait_queue_size_; ++i) + { + wait_slot& ws (wait_queue_[i]); + lock l (ws.mutex); + ws.shutdown = true; + } + + for (unique_ptr& tq: task_queues_) + { + lock l (tq->mutex); + r.task_queue_full += tq->stat_full; + tq->shutdown = true; + } + + // Wait for all the helpers to terminate waking up any thread that + // sleeps. + // + r.thread_helpers = helpers_; + + while (helpers_ != 0) + { + bool i (idle_ != 0); + bool r (ready_ != 0); + bool w (waiting_ != 0); + + l.unlock (); + + if (i) + idle_condv_.notify_all (); + + if (r) + ready_condv_.notify_all (); + + if (w) + for (size_t i (0); i != wait_queue_size_; ++i) + wait_queue_[i].condv.notify_all (); + + this_thread::yield (); + l.lock (); + } + + r.thread_max_active = max_active_; + r.thread_max_total = max_threads_; + r.thread_max_waiting = stat_max_waiters_; + + r.task_queue_depth = task_queue_depth_; + + r.wait_queue_slots = wait_queue_size_; + r.wait_queue_collisions = stat_wait_collisions_; + } + + return r; + } + + void scheduler:: + activate_helper (lock& l) + { + if (!shutdown_) + { + if (idle_ != 0) + idle_condv_.notify_one (); + else if (init_active_ + helpers_ < max_threads_) + create_helper (l); + } + } + + void scheduler:: + create_helper (lock& l) + { + helpers_++; + starting_++; + l.unlock (); + + // Restore the counter if the thread creation fails. + // + struct guard + { + lock* l; + size_t& h; + size_t& s; + + ~guard () {if (l != nullptr) {l->lock (); h--; s--;}} + + } g {&l, helpers_, starting_}; + + thread t (helper, this); + g.l = nullptr; // Disarm. + + t.detach (); + } + + void scheduler:: + helper (void* d) + { + scheduler& s (*static_cast (d)); + + // Note that this thread can be in an in-between state (not active or + // idle) but only while holding the lock. Which means that if we have the + // lock then we can account for all of them (this is important during + // shutdown). Except when the thread is just starting, before acquiring + // the lock for the first time, which we handle with the starting count. + // + lock l (s.mutex_); + s.starting_--; + + while (!s.shutdown_) + { + // If there is a spare active thread, become active and go looking for + // some work. + // + if (s.active_ < s.max_active_) + { + s.active_++; + + while (s.task_) // There might be a task. + { + s.task_ = false; // We will process all that are currently there. + + // Queues are never removed and there shouldn't be any reallocations + // since we reserve maximum possible size upfront. Which means we + // can get the current number of queues and release the main lock + // while examining each of them. + // + size_t n (s.task_queues_.size ()); + l.unlock (); + + for (size_t i (0); i != n; ++i) + { + task_queue& tq (*s.task_queues_[i]); + + for (lock ql (tq.mutex); + !tq.shutdown && !s.empty_front (tq); + ql.lock ()) + s.pop_front (tq, ql); // Releases the lock. + } + + l.lock (); + // If task_ became true, then there might be new tasks. + } + + s.active_--; + + // While executing the tasks a thread might have become ready. + // + if (s.ready_ != 0) + s.ready_condv_.notify_one (); + } + + // Become idle and wait for a notification (note that task_ is false + // here). + // + s.idle_++; + s.idle_condv_.wait (l); + s.idle_--; + } + + s.helpers_--; + } + + thread_local scheduler::task_queue* scheduler::task_queue_ = nullptr; + + auto scheduler:: + create_queue () -> task_queue& + { + lock l (mutex_); + task_queues_.push_back (make_unique (task_queue_depth_)); + task_queue_ = task_queues_.back ().get (); + task_queue_->shutdown = shutdown_; + return *task_queue_; + } + + scheduler sched; +} diff --git a/build2/test/script/lexer.cxx b/build2/test/script/lexer.cxx index 74aa02e..8bd3484 100644 --- a/build2/test/script/lexer.cxx +++ b/build2/test/script/lexer.cxx @@ -516,7 +516,7 @@ namespace build2 lexer_mode m (st.mode); // Customized implementation that handles special variable names ($*, - // $NN, $~, $@). + // $N, $~, $@). // if (m != lexer_mode::variable) return base_lexer::word (st, sep); @@ -526,23 +526,16 @@ namespace build2 if (c != '*' && c != '~' && c != '@' && !digit (c)) return base_lexer::word (st, sep); - uint64_t ln (c.line), cn (c.column); - string lexeme; - get (); - lexeme += c; - if (digit (c)) - { - for (; digit (c = peek ()); get ()) - lexeme += c; - } + if (digit (c) && digit (peek ())) + fail (c) << "multi-digit special variable name"; state_.pop (); // Expire the variable mode. - return token (move (lexeme), + return token (string (1, c), sep, quote_type::unquoted, false, - ln, cn); + c.line, c.column); } } } diff --git a/build2/test/script/parser.cxx b/build2/test/script/parser.cxx index 059cb93..874e0d7 100644 --- a/build2/test/script/parser.cxx +++ b/build2/test/script/parser.cxx @@ -4,6 +4,8 @@ #include +#include + #include #include @@ -17,17 +19,13 @@ namespace build2 { using type = token_type; - // Return true if the string contains only digit characters (used to - // detect the special $NN variables). + // Return true if the string contains only a single digit characters + // (used to detect the special $N variables). // static inline bool - digits (const string& s) + digit (const string& s) { - for (char c: s) - if (!digit (c)) - return false; - - return !s.empty (); + return s.size () == 1 && butl::digit (s[0]); } // @@ -403,18 +401,24 @@ namespace build2 // bool semi (false); + line ln; switch (lt) { case line_type::var: { // Check if we are trying to modify any of the special aliases - // ($*, $~, $N). + // ($*, $N, $~, $@). // - const string& n (t.value); + string& n (t.value); - if (n == "*" || n == "~" || digits (n)) + if (n == "*" || n == "~" || n == "@" || digit (n)) fail (t) << "attempt to set '" << n << "' variable directly"; + // Pre-enter the variables now while we are executing serially. + // Once parallel, it becomes a lot harder to do. + // + ln.var = &script_->var_pool.insert (move (n)); + next (t, tt); // Assignment kind. parse_variable_line (t, tt); @@ -496,7 +500,9 @@ namespace build2 if (ls == nullptr) ls = &ls_data; - ls->push_back (line {lt, replay_data ()}); + ln.type = lt; + ln.tokens = replay_data (); + ls->push_back (move (ln)); if (lt == line_type::cmd_if || lt == line_type::cmd_ifn) { @@ -1163,14 +1169,14 @@ namespace build2 if (p != string::npos && ++p != r.details.size ()) r.details.resize (p); + if (r.empty ()) + fail (loc) << "empty description"; + // Insert id into the id map if we have one. // if (!r.id.empty ()) insert_id (r.id, loc); - if (r.empty ()) - fail (loc) << "empty description"; - return r; } @@ -1211,6 +1217,11 @@ namespace build2 if (r.empty ()) fail (loc) << "empty description"; + // Insert id into the id map if we have one. + // + if (pre_parse_ && !r.id.empty ()) + insert_id (r.id, loc); + return r; } @@ -2318,6 +2329,8 @@ namespace build2 { exec_lines (g->setup_.begin (), g->setup_.end (), li, false); + scheduler::atomic_count task_count (0); + for (const unique_ptr& chain: g->scopes) { // Pick a scope from the if-else chain. @@ -2388,11 +2401,25 @@ namespace build2 // exec_scope_body (); // scope_ = os; // - parser p; - p.execute (*s, *script_, *runner_); + + // @@ Exceptions. + // + sched.async (task_count, + [] (scope& scp, script& scr, runner& r) + { + parser p; + p.execute (scp, scr, r); + }, + ref (*s), + ref (*script_), + ref (*runner_)); } } + sched.wait (task_count); + + //@@ Check if failed. + exec_lines (g->tdown_.begin (), g->tdown_.end (), li, false); } else @@ -2409,11 +2436,11 @@ namespace build2 for (; i != e; ++i) { - line& l (*i); - line_type lt (l.type); + line& ln (*i); + line_type lt (ln.type); assert (path_ == nullptr); - replay_data (move (l.tokens)); // Set the tokens and start playing. + replay_data (move (ln.tokens)); // Set the tokens and start playing. // We don't really need to change the mode since we already know // the line type. @@ -2441,32 +2468,22 @@ namespace build2 // Assign. // - const variable& var (script_->var_pool.insert (move (name))); + const variable& var (*ln.var); value& lhs (kind == type::assign ? scope_->assign (var) : scope_->append (var)); - // @@ Need to adjust to make strings the default type. - // apply_value_attributes (&var, lhs, move (rhs), kind); - // Handle the $*, $NN special aliases. - // - // The plan is as follows: here we detect modification of the - // source variables (test*), and (re)set $* to NULL on this - // scope (this is important to both invalidate any old values - // but also to "stake" the lookup position). This signals to - // the variable lookup function below that the $* and $NN - // values need to be recalculated from their sources. Note - // that we don't need to invalidate $NN since their lookup - // always checks $* first. + // If we changes any of the test.* values, then reset the $*, + // $N special aliases. // if (var.name == script_->test_var.name || var.name == script_->opts_var.name || var.name == script_->args_var.name) { - scope_->assign (script_->cmd_var) = nullptr; + scope_->reset_special (); } replay_stop (); @@ -2623,100 +2640,18 @@ namespace build2 // If we have no scope (happens when pre-parsing directives), then we // only look for buildfile variables. // - if (scope_ == nullptr) - return script_->find_in_buildfile (name); - - // @@ MT: will need RW mutex on var_pool. Or maybe if it's not there - // then it can't possibly be found? Still will be setting variables. - // - if (name != "*" && !digits (name)) - return scope_->find (script_->var_pool.insert (move (name))); - - // Handle the $*, $NN special aliases. - // - // See the exec_lines() for the overall plan. - // - // @@ MT: we are potentially changing outer scopes. Could force - // lookup before executing tests in each group scope. Poblem is - // we don't know which $NN vars will be looked up from inside. - // Could we collect all the variable names during the pre-parse - // stage? They could be computed. - // - // Or we could set all the non-NULL $NN (i.e., based on the number - // of elements in $*). - // - - // In both cases first thing we do is lookup $*. It should always be - // defined since we set it on the script's root scope. - // - lookup l (scope_->find (script_->cmd_var)); - assert (l.defined ()); - - // $* NULL value means it needs to be (re)calculated. - // - value& v (const_cast (*l)); - bool recalc (v.null); - - if (recalc) - { - strings s; - - auto append = [&s] (const strings& v) - { - s.insert (s.end (), v.begin (), v.end ()); - }; - - if (lookup l = scope_->find (script_->test_var)) - s.push_back (cast (l).string ()); - - if (lookup l = scope_->find (script_->opts_var)) - append (cast (l)); - - if (lookup l = scope_->find (script_->args_var)) - append (cast (l)); - - v = move (s); - } - - if (name == "*") - return l; - - // Use the string type for the $NN variables. - // - const variable& var (script_->var_pool.insert (move (name))); - - // We need to look for $NN in the same scope as where we found $*. - // - variable_map& vars (const_cast (*l.vars)); - - // If there is already a value and no need to recalculate it, then we - // are done. - // - if (!recalc && (l = vars[var]).defined ()) - return l; - - // Convert the variable name to index we can use on $*. + // Otherwise, every variable that is ever set in a script has been + // pre-entered during pre-parse. Which means that if one is not found + // in the script pool then it can only possibly be set in the + // buildfile. // - unsigned long i; - - try - { - i = stoul (var.name); - } - catch (const exception&) - { - fail (loc) << "invalid $* index " << var.name << endf; - } - - const strings& s (cast (v)); - value& nv (vars.assign (var)); - - if (i < s.size ()) - nv = s[i]; - else - nv = nullptr; + const variable* pvar (scope_ != nullptr + ? script_->var_pool.find (name) + : nullptr); - return lookup (nv, vars); + return pvar != nullptr + ? scope_->find (*pvar) + : script_->find_in_buildfile (name); } size_t parser:: diff --git a/build2/test/script/runner.cxx b/build2/test/script/runner.cxx index ee059f4..3c646c7 100644 --- a/build2/test/script/runner.cxx +++ b/build2/test/script/runner.cxx @@ -188,8 +188,9 @@ namespace build2 // mkdir (sp.wd_path, 2); else - // The working directory is cleaned up by the test rule prior the - // script execution. + // Scope working directory shall be empty (the script working + // directory is cleaned up by the test rule prior the script + // execution). // assert (empty (sp.wd_path)); diff --git a/build2/test/script/script b/build2/test/script/script index 1be33bb..4f39c58 100644 --- a/build2/test/script/script +++ b/build2/test/script/script @@ -48,6 +48,11 @@ namespace build2 { line_type type; replay_tokens tokens; + + union + { + const variable* var; // Pre-entered for line_type::var. + }; }; // Most of the time we will have just one line (test command). @@ -296,6 +301,11 @@ namespace build2 value& append (const variable&); + // Reset special $*, $N variables based on the test.* values. + // + void + reset_special (); + // Cleanup. // public: @@ -400,9 +410,10 @@ namespace build2 const variable& opts_var; // test.options const variable& args_var; // test.arguments - const variable& cmd_var; // $* - const variable& wd_var; // $~ - const variable& id_var; // $@ + const variable& wd_var; // $~ + const variable& id_var; // $@ + const variable& cmd_var; // $* + const variable* cmdN_var[10]; // $N }; class script: public script_base, public group diff --git a/build2/test/script/script.cxx b/build2/test/script/script.cxx index 8fb8115..f1e1bd4 100644 --- a/build2/test/script/script.cxx +++ b/build2/test/script/script.cxx @@ -397,9 +397,20 @@ namespace build2 opts_var (var_pool.insert ("test.options")), args_var (var_pool.insert ("test.arguments")), - cmd_var (var_pool.insert ("*")), wd_var (var_pool.insert ("~")), - id_var (var_pool.insert ("@")) {} + id_var (var_pool.insert ("@")), + cmd_var (var_pool.insert ("*")), + cmdN_var { + &var_pool.insert ("0"), + &var_pool.insert ("1"), + &var_pool.insert ("2"), + &var_pool.insert ("3"), + &var_pool.insert ("4"), + &var_pool.insert ("5"), + &var_pool.insert ("6"), + &var_pool.insert ("7"), + &var_pool.insert ("8"), + &var_pool.insert ("9")} {} // script // @@ -445,10 +456,9 @@ namespace build2 v = path (tt.dir.string ()); // Strip trailing slash. } - // Also add the NULL $* value that signals it needs to be recalculated - // on first access. + // Set the special $*, $N variables. // - assign (cmd_var) = nullptr; + reset_special (); } lookup scope:: @@ -474,7 +484,7 @@ namespace build2 { // Switch to the corresponding buildfile variable. Note that we don't // want to insert a new variable into the pool (we might be running - // concurrently). Plus, if there is no such variable, then we cannot + // in parallel). Plus, if there is no such variable, then we cannot // possibly find any value. // const variable* pvar (build2::var_pool.find (n)); @@ -522,13 +532,54 @@ namespace build2 value& r (assign (var)); // NULL. - //@@ I guess this is where we convert untyped value to strings? - // if (l.defined ()) r = *l; // Copy value (and type) from the outer scope. return r; } + + void scope:: + reset_special () + { + // First assemble the $* value. + // + strings s; + + auto append = [&s] (const strings& v) + { + s.insert (s.end (), v.begin (), v.end ()); + }; + + if (lookup l = find (root->test_var)) + s.push_back (cast (l).representation ()); + + if (lookup l = find (root->opts_var)) + append (cast (l)); + + if (lookup l = find (root->args_var)) + append (cast (l)); + + // Set the $N values if present. + // + for (size_t i (0); i <= 9; ++i) + { + value& v (assign (*root->cmdN_var[i])); + + if (i < s.size ()) + { + if (i == 0) + v = path (s[i]); + else + v = s[i]; + } + else + v = nullptr; // Clear any old values. + } + + // Set $*. + // + assign (root->cmd_var) = move (s); + } } } } diff --git a/build2/types b/build2/types index c73da73..fb4cfbb 100644 --- a/build2/types +++ b/build2/types @@ -5,6 +5,7 @@ #ifndef BUILD2_TYPES #define BUILD2_TYPES +#include #include #include #include @@ -56,6 +57,7 @@ namespace build2 using std::shared_ptr; using std::weak_ptr; + using std::array; using std::vector; using butl::vector_view; // using butl::small_vector; // -- cgit v1.1