// file : build2/scheduler -*- C++ -*- // copyright : Copyright (c) 2014-2016 Code Synthesis Ltd // license : MIT; see accompanying LICENSE file #ifndef BUILD2_SCHEDULER #define BUILD2_SCHEDULER #include #include #include #include #include #include // aligned_storage, etc #include #include #include namespace build2 { // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g., // running a process), where in comparison thread synchronization overhead // is negligible. // // A thread (called "master") may need to perform several tasks which can be // done in parallel (e.g., update all the prerequisites or run all the // tests). To acomplish this, the master, via a call to async(), can ask the // scheduler to run a task in another thread (called "helper"). If a helper // is available, then the task is executed asynchronously by such helper. // Otherwise, the task is executed synchronously as part of the async() // call. Once the master thread has scheduled all the tasks, it calls wait() // to await for their completion. // // The scheduler makes sure that only a certain number of threads (for // example, the number of available hardware threads) are "active" at any // given time (thus the reason why async() may choose to perform the task // synchronously). When a master thread calls wait(), it is "suspended" // until all its asynchronous tasks are completed (at which point it becomes // "ready"). A suspension of a master results in either another ready master // being "resumed" or another helper thread becoming available. // // On completion of a task a helper thread returns to the scheduler which // can again lead either to a ready master being resumed (in which case the // helper is suspended) or the helper becoming available to perform another // task. // // Note that suspended threads are not reused as helpers. Rather, a new // helper thread is always created if none is available. This is done to // allow a ready master to continue as soon as possible. If it were reused // as a helper, then it could be blocked on a nested wait() further down the // stack. This means that the number of threads created by the scheduler // will normally exceed the maximum active allowed. It should not, however, // get excessive provided the tasks are not too deeply nested. // // @@ Actually, this is not quite correct: this seems to be a function of // both depth and width of the task tree. Perhaps we should use a hybrid // approach: when the number of helper threads reaches a certain limit // we switch to reusing suspended threads? Also, a thread can rake its // own and its subtask's queues directly in wait() since wait() won't // return until they are done. // class scheduler { public: using atomic_count = std::atomic; // F should return void and not throw any exceptions. The way the result // of a task is communicated back to the master thread is ad hoc, usually // via "out" arguments. Such result(s) can only be retrieved by the master // once its task count reaches 0. // // The argument passing semantics is the same as for std::thread. In // particular, lvalue-references are passed as copies (use ref()/cref() // for the by-reference semantics). // // If the scheduler is shutdown, throw system_error(ECANCELED). // template void async (atomic_count& task_count, F&& f, A&&... a) { using task = task_type; static_assert (sizeof (task) <= sizeof (task_data), "insufficient space"); static_assert (std::is_trivially_destructible::value, "not trivially destructible"); // Push into the queue unless we are running serially or the queue is // full. // task_data* td (nullptr); if (max_active_ != 1) { task_queue& tq (task_queue_ != nullptr ? *task_queue_ : create_queue ()); lock ql (tq.mutex); if (tq.shutdown) throw system_error (ECANCELED, std::system_category ()); if ((td = push (tq)) != nullptr) { // Package the task. // new (&td->data) task { &task_count, decay_copy (forward (f)), {decay_copy (forward (a))...}}; td->thunk = &task_thunk; } else tq.stat_full++; } // If serial/full, then run the task synchronously. In this case // there is no need to mess with task count. // if (td == nullptr) { forward (f) (forward (a)...); return; } // Increment the task count. // task_count.fetch_add (1, std::memory_order_release); lock l (mutex_); task_ = true; // If there is a spare active thread, wake up (or create) the helper. // if (active_ < max_active_) activate_helper (l); } // Wait until the task count reaches 0. If the scheduler is shutdown // while waiting, throw system_error(ECANCELED). // void wait (atomic_count& task_count); // Startup and shutdown. // public: // Unless already shut down, call shutdown() but ignore errors. // ~scheduler (); // Create a shut down scheduler. // scheduler () = default; // Create a started up scheduler. // // The initial active argument is the number of threads to assume are // already active (e.g., the calling thread). It must not be 0 (since // someone has to schedule the first task). // // If maximum threads is unspecified, then a generally appropriate default // limit it used. // scheduler (size_t max_active, size_t init_active = 1, size_t max_threads = 0) { startup (max_active, init_active, max_threads); } // Note: naturally not thread-safe. // void startup (size_t max_active, size_t init_active = 1, size_t max_threads = 0); // Wait for all the helper threads to terminate. Throw system_error on // failure. Note that the initially active threads are not waited for. // Return scheduling statistics. // struct stat { size_t thread_max_active = 0; // max # of active threads allowed. size_t thread_max_total = 0; // max # of total threads allowed. size_t thread_helpers = 0; // # of helper threads created. size_t thread_max_waiting = 0; // max # of waiters at any time. size_t task_queue_depth = 0; // # of entries in a queue (capacity). size_t task_queue_full = 0; // # of times task queue was full. size_t wait_queue_slots = 0; // # of wait slots (buckets). size_t wait_queue_collisions = 0; // # of times slot had been occupied. }; stat shutdown (); // Return the number of hardware threads or 0 if unable to determine. // static size_t hardware_concurrency () { return std::thread::hardware_concurrency (); } private: using lock = std::unique_lock; void activate_helper (lock&); void create_helper (lock&); // We restrict ourselves to a single pointer as an argument in hope of // a small object optimization. // static void helper (void*); void suspend (atomic_count& task_count); void resume (atomic_count& task_count); // Task encapsulation. // template struct task_type { atomic_count* task_count; std::decay_t func; std::tuple...> args; template void thunk (std::index_sequence) { move (func) (std::get (move (args))...); } }; template static void task_thunk (scheduler& s, lock& ql, void* td) { using task = task_type; // Move the data and release the lock. // task t (move (*static_cast (td))); ql.unlock (); t.thunk (std::index_sequence_for ()); atomic_count& tc (*t.task_count); if (--tc == 0) s.resume (tc); // Resume a waiter, if any. } template static std::decay_t decay_copy (T&& x) {return forward (x);} private: std::mutex mutex_; bool shutdown_ = true; // Shutdown flag. bool task_ = false; // Task queued flag (see below). // The constraints that we must maintain: // // active <= max_active // (init_active + helpers) <= max_threads // // Note that the first three are immutable between startup() and // shutdown() so can be accessed without a lock (@@ What about // the case of multiple initially active?). // size_t init_active_ = 0; // Initially active threads. size_t max_active_ = 0; // Maximum number of active threads. size_t max_threads_ = 0; // Maximum number of total threads. size_t helpers_ = 0; // Number of helper threads created so far. // Every thread that we manage must be accounted for in one of these // counters. And their sum should equal (init_active + helpers). // size_t active_ = 0; // Active master threads executing a task. size_t idle_ = 0; // Idle helper threads waiting for a task. size_t waiting_ = 0; // Suspended master threads waiting for their tasks. size_t ready_ = 0; // Ready master thread waiting to become active. size_t starting_ = 0; // Helper threads starting up. std::condition_variable idle_condv_; // Idle helpers queue. std::condition_variable ready_condv_; // Ready masters queue. // Statistics counters. // size_t stat_max_waiters_; size_t stat_wait_collisions_; // Wait queue. // // A wait slot blocks a bunch of threads. When they are (all) unblocked, // they re-examine their respective conditions and either carry on or // block again. // // The wait queue is a shard of slots. A thread picks a slot based on the // address of its task count variable. How many slots do we need? This // depends on the number of waiters that we can have which cannot be // greater than the total number of threads. // struct wait_slot { std::mutex mutex; std::condition_variable condv; size_t waiters = 0; bool shutdown = true; }; size_t wait_queue_size_; // Multiple of max_threads. unique_ptr wait_queue_; // Task queue. // // Each queue has its own mutex. If the task_ flag above is true then // there *might* be a task in one of the queues. If it is false, then // it means there are either no tasks or someone is busy working on // them. // // For now we only support trivially-destructible tasks. // struct task_data { std::aligned_storage::type data; void (*thunk) (scheduler&, lock&, void*); }; // We have two requirements: Firstly, we want to keep the master thread // (the one that called wait()) busy working though its own queue for as // long as possible before (if at all) it "reincarnates" as a helper. The // main reason for this is the limited number of helpers we can create. // // Secondly, we don't want to block wait() longer than necessary since the // master thread can do some work with the result. Plus, overall, we want // to "unwind" task hierarchies as soon as possible since they hold up // resources such as thread's stack. All this means that the master thread // can only work through tasks that it has queued at this "level" of the // async()/wait() calls since we know that wait() cannot return until // they are done. // // To satisfy the first requirement, the master and helper threads get the // tasks from different ends of the queue: master from the back while // helpers from the front. And the master always adds new tasks to the // back. // // To satisfy the second requirement, the master thread stores the index // of the first task it has queued at this "level" and makes sure it // doesn't try to deque any task beyond that. // size_t task_queue_depth_; // Multiple of max_active. struct task_queue { std::mutex mutex; bool shutdown = false; size_t stat_full = 0; // Number of times pop() returned NULL. // head <= stop <= tail // size_t head = 0; // Index of the first element. size_t stop = 0; // Index of the element to stop at in pop_back(). size_t tail = 0; // Index of the one past last element. unique_ptr data; task_queue (size_t depth): data (new task_data[depth]) {} }; // Task queue API. Expects the queue mutex to be locked. // // Push a new task to the queue returning a pointer to the task data to be // filled or NULL if the queue is full. // task_data* push (task_queue& tq) { return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr; } bool empty_front (task_queue& tq) const {return tq.head == tq.tail;} void pop_front (task_queue& tq, lock& ql) { task_data& td (tq.data[tq.head++]); if (tq.head == tq.tail) tq.stop = tq.head = tq.tail = 0; // Reset. else if (tq.head > tq.stop) tq.stop = tq.head; // The thunk moves the task data to its stack, releases the lock, // and continues to execute the task. // td.thunk (*this, ql, &td.data); } bool empty_back (task_queue& tq) const {return tq.stop == tq.tail;} void pop_back (task_queue& tq, lock& ql) { task_data& td (tq.data[--tq.tail]); if (tq.head == tq.tail) tq.stop = tq.head = tq.tail = 0; td.thunk (*this, ql, &td.data); } // Each thread has its own queue. Instead of allocating all max_threads of // them up front, we will reserve the space but will only construct queues // on demand. // vector> task_queues_; // TLS cache of each thread's task queue. // thread_local static task_queue* task_queue_; task_queue& create_queue (); }; // Main (and only) scheduler. Started up and shut down in main(). // extern scheduler sched; } #endif // BUILD2_SCHEDULER