From 052dc48939a063b19a13c10cb2c735b4b06a4c4b Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Tue, 13 Dec 2016 12:30:14 +0200 Subject: Various scheduler improvements and fixes --- build2/scheduler | 252 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 139 insertions(+), 113 deletions(-) (limited to 'build2/scheduler') diff --git a/build2/scheduler b/build2/scheduler index 80b9ec9..ffb8acd 100644 --- a/build2/scheduler +++ b/build2/scheduler @@ -8,8 +8,6 @@ #include #include #include -#include -#include #include // aligned_storage, etc #include @@ -49,15 +47,7 @@ namespace build2 // allow a ready master to continue as soon as possible. If it were reused // as a helper, then it could be blocked on a nested wait() further down the // stack. This means that the number of threads created by the scheduler - // will normally exceed the maximum active allowed. It should not, however, - // get excessive provided the tasks are not too deeply nested. - // - // @@ Actually, this is not quite correct: this seems to be a function of - // both depth and width of the task tree. Perhaps we should use a hybrid - // approach: when the number of helper threads reaches a certain limit - // we switch to reusing suspended threads? Also, a thread can rake its - // own and its subtask's queues directly in wait() since wait() won't - // return until they are done. + // will normally exceed the maximum active allowed. // class scheduler { @@ -77,68 +67,7 @@ namespace build2 // template void - async (atomic_count& task_count, F&& f, A&&... a) - { - using task = task_type; - - static_assert (sizeof (task) <= sizeof (task_data), - "insufficient space"); - - static_assert (std::is_trivially_destructible::value, - "not trivially destructible"); - - // Push into the queue unless we are running serially or the queue is - // full. - // - task_data* td (nullptr); - - if (max_active_ != 1) - { - task_queue& tq (task_queue_ != nullptr - ? *task_queue_ - : create_queue ()); - - lock ql (tq.mutex); - - if (tq.shutdown) - throw system_error (ECANCELED, std::system_category ()); - - if ((td = push (tq)) != nullptr) - { - // Package the task. - // - new (&td->data) task { - &task_count, - decay_copy (forward (f)), - typename task::args_type (decay_copy (forward (a))...)}; - - td->thunk = &task_thunk; - } - else - tq.stat_full++; - } - - // If serial/full, then run the task synchronously. In this case - // there is no need to mess with task count. - // - if (td == nullptr) - { - forward (f) (forward (a)...); - return; - } - - // Increment the task count. - // - task_count.fetch_add (1, std::memory_order_release); - - lock l (mutex_); - task_ = true; - - // If there is a spare active thread, wake up (or create) the helper. - // - if (active_ < max_active_) - activate_helper (l); - } + async (atomic_count& task_count, F&&, A&&...); // Wait until the task count reaches 0. If the scheduler is shutdown // while waiting, throw system_error(ECANCELED). @@ -163,22 +92,24 @@ namespace build2 // already active (e.g., the calling thread). It must not be 0 (since // someone has to schedule the first task). // - // If maximum threads is unspecified, then a generally appropriate default - // limit it used. + // If the maximum threads or task queue depth arguments are unspecified, + // then appropriate defaults are used. // scheduler (size_t max_active, size_t init_active = 1, - size_t max_threads = 0) + size_t max_threads = 0, + size_t queue_depth = 0) { - startup (max_active, init_active, max_threads); + startup (max_active, init_active, max_threads, queue_depth); } - // Note: naturally not thread-safe. + // Start the scheduler. // void startup (size_t max_active, size_t init_active = 1, - size_t max_threads = 0); + size_t max_threads = 0, + size_t queue_depth = 0); // Wait for all the helper threads to terminate. Throw system_error on // failure. Note that the initially active threads are not waited for. @@ -201,6 +132,35 @@ namespace build2 stat shutdown (); + // If initially active thread(s) (besides the one that calls startup()) + // exist before the call to startup(), then they must call join() before + // executing any tasks. The two common cases where you don't have to call + // join are a single active thread that calls startup()/shutdown() or + // active thread(s) that are created after startup(). + // + void + join () + { + assert (task_queue_ = nullptr); + + // Lock the mutex to make sure the values set in startup() are visible + // in this thread. + // + lock l (mutex_); + } + + // If initially active thread(s) participate in multiple schedulers and/or + // sessions (intervals between startup() and shutdown()), then they must + // call leave() before joining another scheduler/session. Note that this + // applies to the active thread that calls shutdown(). Note that a thread + // can only participate in one scheduler at a time. + // + void + leave () + { + task_queue_ = nullptr; + } + // Return the number of hardware threads or 0 if unable to determine. // static size_t @@ -252,21 +212,7 @@ namespace build2 template static void - task_thunk (scheduler& s, lock& ql, void* td) - { - using task = task_type; - - // Move the data and release the lock. - // - task t (move (*static_cast (td))); - ql.unlock (); - - t.thunk (std::index_sequence_for ()); - - atomic_count& tc (*t.task_count); - if (--tc == 0) - s.resume (tc); // Resume a waiter, if any. - } + task_thunk (scheduler&, lock&, void*); template static std::decay_t @@ -284,8 +230,7 @@ namespace build2 // (init_active + helpers) <= max_threads // // Note that the first three are immutable between startup() and - // shutdown() so can be accessed without a lock (@@ What about - // the case of multiple initially active?). + // shutdown() so can be accessed without a lock (but see join()). // size_t init_active_ = 0; // Initially active threads. size_t max_active_ = 0; // Maximum number of active threads. @@ -378,11 +323,18 @@ namespace build2 size_t stat_full = 0; // Number of times pop() returned NULL. - // head <= stop <= tail + // Our task queue is circular with head being the index of the first + // element and tail -- of the last. Since this makes the empty and one + // element cases indistinguishable, we also keep the size. // - size_t head = 0; // Index of the first element. - size_t stop = 0; // Index of the element to stop at in pop_back(). - size_t tail = 0; // Index of the one past last element. + // The mark is an index somewhere between (figuratively speaking) head + // and tail, if enabled. If the mark is hit, then it is disabled until + // the queue becomes empty. + // + size_t head = 0; + size_t mark = 0; + size_t tail = 0; + size_t size = 0; unique_ptr data; @@ -398,40 +350,102 @@ namespace build2 task_data* push (task_queue& tq) { - return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr; + size_t& s (tq.size); + size_t& t (tq.tail); + + if (s != task_queue_depth_) + { + // normal wrap empty + // | | | + t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t; + s++; + return &tq.data[t]; + } + + return nullptr; } bool - empty_front (task_queue& tq) const {return tq.head == tq.tail;} + empty_front (task_queue& tq) const {return tq.size == 0;} void pop_front (task_queue& tq, lock& ql) { - task_data& td (tq.data[tq.head++]); + size_t& s (tq.size); + size_t& h (tq.head); + size_t& m (tq.mark); + + bool a (h == m); // Adjust mark? + task_data& td (tq.data[h]); + + // normal wrap empty + // | | | + h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h; - if (tq.head == tq.tail) - tq.stop = tq.head = tq.tail = 0; // Reset. - else if (tq.head > tq.stop) - tq.stop = tq.head; + if (--s == 0 || a) + m = h; // Reset or adjust the mark. // The thunk moves the task data to its stack, releases the lock, // and continues to execute the task. // td.thunk (*this, ql, &td.data); + ql.lock (); } bool - empty_back (task_queue& tq) const {return tq.stop == tq.tail;} + empty_back (task_queue& tq) const + { + return tq.size == 0 || tq.mark == task_queue_depth_; + } void pop_back (task_queue& tq, lock& ql) { - task_data& td (tq.data[--tq.tail]); + size_t& s (tq.size); + size_t& t (tq.tail); + size_t& m (tq.mark); + + bool a (t == m); // Adjust mark? - if (tq.head == tq.tail) - tq.stop = tq.head = tq.tail = 0; + task_data& td (tq.data[t]); + + // Save the old queue mark and set the new one in case the task we are + // about to run adds sub-tasks. + // + size_t om (m); + m = t; // Where next push() will go. + + // normal wrap empty + // | | | + t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t; + --s; td.thunk (*this, ql, &td.data); + ql.lock (); + + // Restore the old mark (which we might have to adjust). + // + if (s == 0) + m = t; // Reset the mark. + else if (a) + m = task_queue_depth_; // Disable the mark. + else + // What happens if head goes past the old mark? In this case we will + // get into the empty queue state before we end up making any (wrong) + // decisions based on this value. Unfortunately there is no way to + // detect this (and do some sanity asserts) since things can wrap + // around. + // + // To put it another way, the understanding here is that after the + // task returns we will either have an empty queue or there will still + // be tasks between the old mark and and the current tail, something + // along these lines: + // + // OOOOOXXXXOOO + // | | | + // m h t + // + m = om; } // Each thread has its own queue. Instead of allocating all max_threads of @@ -440,9 +454,19 @@ namespace build2 // vector> task_queues_; - // TLS cache of each thread's task queue. + // TLS cache of thread's task queue. + // + static + // Apparently Apple's Clang "temporarily disabled" C++11 thread_local + // until they can implement a "fast" version, which reportedly happened in + // XCode 8. So for now we will continue using __thread for this target. // - thread_local static task_queue* task_queue_; +#if defined(__apple_build_version__) && __apple_build_version__ < 8000000 + __thread +#else + thread_local +#endif + task_queue* task_queue_; task_queue& create_queue (); @@ -453,4 +477,6 @@ namespace build2 extern scheduler sched; } +#include + #endif // BUILD2_SCHEDULER -- cgit v1.1