aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'build2/scheduler')
-rw-r--r--build2/scheduler252
1 files changed, 139 insertions, 113 deletions
diff --git a/build2/scheduler b/build2/scheduler
index 80b9ec9..ffb8acd 100644
--- a/build2/scheduler
+++ b/build2/scheduler
@@ -8,8 +8,6 @@
#include <mutex>
#include <tuple>
#include <atomic>
-#include <cerrno>
-#include <cassert>
#include <type_traits> // aligned_storage, etc
#include <condition_variable>
@@ -49,15 +47,7 @@ namespace build2
// allow a ready master to continue as soon as possible. If it were reused
// as a helper, then it could be blocked on a nested wait() further down the
// stack. This means that the number of threads created by the scheduler
- // will normally exceed the maximum active allowed. It should not, however,
- // get excessive provided the tasks are not too deeply nested.
- //
- // @@ Actually, this is not quite correct: this seems to be a function of
- // both depth and width of the task tree. Perhaps we should use a hybrid
- // approach: when the number of helper threads reaches a certain limit
- // we switch to reusing suspended threads? Also, a thread can rake its
- // own and its subtask's queues directly in wait() since wait() won't
- // return until they are done.
+ // will normally exceed the maximum active allowed.
//
class scheduler
{
@@ -77,68 +67,7 @@ namespace build2
//
template <typename F, typename... A>
void
- async (atomic_count& task_count, F&& f, A&&... a)
- {
- using task = task_type<F, A...>;
-
- static_assert (sizeof (task) <= sizeof (task_data),
- "insufficient space");
-
- static_assert (std::is_trivially_destructible<task>::value,
- "not trivially destructible");
-
- // Push into the queue unless we are running serially or the queue is
- // full.
- //
- task_data* td (nullptr);
-
- if (max_active_ != 1)
- {
- task_queue& tq (task_queue_ != nullptr
- ? *task_queue_
- : create_queue ());
-
- lock ql (tq.mutex);
-
- if (tq.shutdown)
- throw system_error (ECANCELED, std::system_category ());
-
- if ((td = push (tq)) != nullptr)
- {
- // Package the task.
- //
- new (&td->data) task {
- &task_count,
- decay_copy (forward <F> (f)),
- typename task::args_type (decay_copy (forward <A> (a))...)};
-
- td->thunk = &task_thunk<F, A...>;
- }
- else
- tq.stat_full++;
- }
-
- // If serial/full, then run the task synchronously. In this case
- // there is no need to mess with task count.
- //
- if (td == nullptr)
- {
- forward<F> (f) (forward<A> (a)...);
- return;
- }
-
- // Increment the task count.
- //
- task_count.fetch_add (1, std::memory_order_release);
-
- lock l (mutex_);
- task_ = true;
-
- // If there is a spare active thread, wake up (or create) the helper.
- //
- if (active_ < max_active_)
- activate_helper (l);
- }
+ async (atomic_count& task_count, F&&, A&&...);
// Wait until the task count reaches 0. If the scheduler is shutdown
// while waiting, throw system_error(ECANCELED).
@@ -163,22 +92,24 @@ namespace build2
// already active (e.g., the calling thread). It must not be 0 (since
// someone has to schedule the first task).
//
- // If maximum threads is unspecified, then a generally appropriate default
- // limit it used.
+ // If the maximum threads or task queue depth arguments are unspecified,
+ // then appropriate defaults are used.
//
scheduler (size_t max_active,
size_t init_active = 1,
- size_t max_threads = 0)
+ size_t max_threads = 0,
+ size_t queue_depth = 0)
{
- startup (max_active, init_active, max_threads);
+ startup (max_active, init_active, max_threads, queue_depth);
}
- // Note: naturally not thread-safe.
+ // Start the scheduler.
//
void
startup (size_t max_active,
size_t init_active = 1,
- size_t max_threads = 0);
+ size_t max_threads = 0,
+ size_t queue_depth = 0);
// Wait for all the helper threads to terminate. Throw system_error on
// failure. Note that the initially active threads are not waited for.
@@ -201,6 +132,35 @@ namespace build2
stat
shutdown ();
+ // If initially active thread(s) (besides the one that calls startup())
+ // exist before the call to startup(), then they must call join() before
+ // executing any tasks. The two common cases where you don't have to call
+ // join are a single active thread that calls startup()/shutdown() or
+ // active thread(s) that are created after startup().
+ //
+ void
+ join ()
+ {
+ assert (task_queue_ = nullptr);
+
+ // Lock the mutex to make sure the values set in startup() are visible
+ // in this thread.
+ //
+ lock l (mutex_);
+ }
+
+ // If initially active thread(s) participate in multiple schedulers and/or
+ // sessions (intervals between startup() and shutdown()), then they must
+ // call leave() before joining another scheduler/session. Note that this
+ // applies to the active thread that calls shutdown(). Note that a thread
+ // can only participate in one scheduler at a time.
+ //
+ void
+ leave ()
+ {
+ task_queue_ = nullptr;
+ }
+
// Return the number of hardware threads or 0 if unable to determine.
//
static size_t
@@ -252,21 +212,7 @@ namespace build2
template <typename F, typename... A>
static void
- task_thunk (scheduler& s, lock& ql, void* td)
- {
- using task = task_type<F, A...>;
-
- // Move the data and release the lock.
- //
- task t (move (*static_cast<task*> (td)));
- ql.unlock ();
-
- t.thunk (std::index_sequence_for<A...> ());
-
- atomic_count& tc (*t.task_count);
- if (--tc == 0)
- s.resume (tc); // Resume a waiter, if any.
- }
+ task_thunk (scheduler&, lock&, void*);
template <typename T>
static std::decay_t<T>
@@ -284,8 +230,7 @@ namespace build2
// (init_active + helpers) <= max_threads
//
// Note that the first three are immutable between startup() and
- // shutdown() so can be accessed without a lock (@@ What about
- // the case of multiple initially active?).
+ // shutdown() so can be accessed without a lock (but see join()).
//
size_t init_active_ = 0; // Initially active threads.
size_t max_active_ = 0; // Maximum number of active threads.
@@ -378,11 +323,18 @@ namespace build2
size_t stat_full = 0; // Number of times pop() returned NULL.
- // head <= stop <= tail
+ // Our task queue is circular with head being the index of the first
+ // element and tail -- of the last. Since this makes the empty and one
+ // element cases indistinguishable, we also keep the size.
//
- size_t head = 0; // Index of the first element.
- size_t stop = 0; // Index of the element to stop at in pop_back().
- size_t tail = 0; // Index of the one past last element.
+ // The mark is an index somewhere between (figuratively speaking) head
+ // and tail, if enabled. If the mark is hit, then it is disabled until
+ // the queue becomes empty.
+ //
+ size_t head = 0;
+ size_t mark = 0;
+ size_t tail = 0;
+ size_t size = 0;
unique_ptr<task_data[]> data;
@@ -398,40 +350,102 @@ namespace build2
task_data*
push (task_queue& tq)
{
- return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr;
+ size_t& s (tq.size);
+ size_t& t (tq.tail);
+
+ if (s != task_queue_depth_)
+ {
+ // normal wrap empty
+ // | | |
+ t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
+ s++;
+ return &tq.data[t];
+ }
+
+ return nullptr;
}
bool
- empty_front (task_queue& tq) const {return tq.head == tq.tail;}
+ empty_front (task_queue& tq) const {return tq.size == 0;}
void
pop_front (task_queue& tq, lock& ql)
{
- task_data& td (tq.data[tq.head++]);
+ size_t& s (tq.size);
+ size_t& h (tq.head);
+ size_t& m (tq.mark);
+
+ bool a (h == m); // Adjust mark?
+ task_data& td (tq.data[h]);
+
+ // normal wrap empty
+ // | | |
+ h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;
- if (tq.head == tq.tail)
- tq.stop = tq.head = tq.tail = 0; // Reset.
- else if (tq.head > tq.stop)
- tq.stop = tq.head;
+ if (--s == 0 || a)
+ m = h; // Reset or adjust the mark.
// The thunk moves the task data to its stack, releases the lock,
// and continues to execute the task.
//
td.thunk (*this, ql, &td.data);
+ ql.lock ();
}
bool
- empty_back (task_queue& tq) const {return tq.stop == tq.tail;}
+ empty_back (task_queue& tq) const
+ {
+ return tq.size == 0 || tq.mark == task_queue_depth_;
+ }
void
pop_back (task_queue& tq, lock& ql)
{
- task_data& td (tq.data[--tq.tail]);
+ size_t& s (tq.size);
+ size_t& t (tq.tail);
+ size_t& m (tq.mark);
+
+ bool a (t == m); // Adjust mark?
- if (tq.head == tq.tail)
- tq.stop = tq.head = tq.tail = 0;
+ task_data& td (tq.data[t]);
+
+ // Save the old queue mark and set the new one in case the task we are
+ // about to run adds sub-tasks.
+ //
+ size_t om (m);
+ m = t; // Where next push() will go.
+
+ // normal wrap empty
+ // | | |
+ t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
+ --s;
td.thunk (*this, ql, &td.data);
+ ql.lock ();
+
+ // Restore the old mark (which we might have to adjust).
+ //
+ if (s == 0)
+ m = t; // Reset the mark.
+ else if (a)
+ m = task_queue_depth_; // Disable the mark.
+ else
+ // What happens if head goes past the old mark? In this case we will
+ // get into the empty queue state before we end up making any (wrong)
+ // decisions based on this value. Unfortunately there is no way to
+ // detect this (and do some sanity asserts) since things can wrap
+ // around.
+ //
+ // To put it another way, the understanding here is that after the
+ // task returns we will either have an empty queue or there will still
+ // be tasks between the old mark and and the current tail, something
+ // along these lines:
+ //
+ // OOOOOXXXXOOO
+ // | | |
+ // m h t
+ //
+ m = om;
}
// Each thread has its own queue. Instead of allocating all max_threads of
@@ -440,9 +454,19 @@ namespace build2
//
vector<unique_ptr<task_queue>> task_queues_;
- // TLS cache of each thread's task queue.
+ // TLS cache of thread's task queue.
+ //
+ static
+ // Apparently Apple's Clang "temporarily disabled" C++11 thread_local
+ // until they can implement a "fast" version, which reportedly happened in
+ // XCode 8. So for now we will continue using __thread for this target.
//
- thread_local static task_queue* task_queue_;
+#if defined(__apple_build_version__) && __apple_build_version__ < 8000000
+ __thread
+#else
+ thread_local
+#endif
+ task_queue* task_queue_;
task_queue&
create_queue ();
@@ -453,4 +477,6 @@ namespace build2
extern scheduler sched;
}
+#include <build2/scheduler.txx>
+
#endif // BUILD2_SCHEDULER