aboutsummaryrefslogtreecommitdiff
path: root/build2
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2016-12-13 12:30:14 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2016-12-13 12:30:14 +0200
commit052dc48939a063b19a13c10cb2c735b4b06a4c4b (patch)
tree1ea8bd67374ee13cc3640d237d33ba165c495e25 /build2
parent2d2cbcaf1c2afd1565502f8f0c83fb1cd56f6cec (diff)
Various scheduler improvements and fixes
Diffstat (limited to 'build2')
-rw-r--r--build2/scheduler252
-rw-r--r--build2/scheduler.cxx129
-rw-r--r--build2/scheduler.txx91
3 files changed, 313 insertions, 159 deletions
diff --git a/build2/scheduler b/build2/scheduler
index 80b9ec9..ffb8acd 100644
--- a/build2/scheduler
+++ b/build2/scheduler
@@ -8,8 +8,6 @@
#include <mutex>
#include <tuple>
#include <atomic>
-#include <cerrno>
-#include <cassert>
#include <type_traits> // aligned_storage, etc
#include <condition_variable>
@@ -49,15 +47,7 @@ namespace build2
// allow a ready master to continue as soon as possible. If it were reused
// as a helper, then it could be blocked on a nested wait() further down the
// stack. This means that the number of threads created by the scheduler
- // will normally exceed the maximum active allowed. It should not, however,
- // get excessive provided the tasks are not too deeply nested.
- //
- // @@ Actually, this is not quite correct: this seems to be a function of
- // both depth and width of the task tree. Perhaps we should use a hybrid
- // approach: when the number of helper threads reaches a certain limit
- // we switch to reusing suspended threads? Also, a thread can rake its
- // own and its subtask's queues directly in wait() since wait() won't
- // return until they are done.
+ // will normally exceed the maximum active allowed.
//
class scheduler
{
@@ -77,68 +67,7 @@ namespace build2
//
template <typename F, typename... A>
void
- async (atomic_count& task_count, F&& f, A&&... a)
- {
- using task = task_type<F, A...>;
-
- static_assert (sizeof (task) <= sizeof (task_data),
- "insufficient space");
-
- static_assert (std::is_trivially_destructible<task>::value,
- "not trivially destructible");
-
- // Push into the queue unless we are running serially or the queue is
- // full.
- //
- task_data* td (nullptr);
-
- if (max_active_ != 1)
- {
- task_queue& tq (task_queue_ != nullptr
- ? *task_queue_
- : create_queue ());
-
- lock ql (tq.mutex);
-
- if (tq.shutdown)
- throw system_error (ECANCELED, std::system_category ());
-
- if ((td = push (tq)) != nullptr)
- {
- // Package the task.
- //
- new (&td->data) task {
- &task_count,
- decay_copy (forward <F> (f)),
- typename task::args_type (decay_copy (forward <A> (a))...)};
-
- td->thunk = &task_thunk<F, A...>;
- }
- else
- tq.stat_full++;
- }
-
- // If serial/full, then run the task synchronously. In this case
- // there is no need to mess with task count.
- //
- if (td == nullptr)
- {
- forward<F> (f) (forward<A> (a)...);
- return;
- }
-
- // Increment the task count.
- //
- task_count.fetch_add (1, std::memory_order_release);
-
- lock l (mutex_);
- task_ = true;
-
- // If there is a spare active thread, wake up (or create) the helper.
- //
- if (active_ < max_active_)
- activate_helper (l);
- }
+ async (atomic_count& task_count, F&&, A&&...);
// Wait until the task count reaches 0. If the scheduler is shutdown
// while waiting, throw system_error(ECANCELED).
@@ -163,22 +92,24 @@ namespace build2
// already active (e.g., the calling thread). It must not be 0 (since
// someone has to schedule the first task).
//
- // If maximum threads is unspecified, then a generally appropriate default
- // limit it used.
+ // If the maximum threads or task queue depth arguments are unspecified,
+ // then appropriate defaults are used.
//
scheduler (size_t max_active,
size_t init_active = 1,
- size_t max_threads = 0)
+ size_t max_threads = 0,
+ size_t queue_depth = 0)
{
- startup (max_active, init_active, max_threads);
+ startup (max_active, init_active, max_threads, queue_depth);
}
- // Note: naturally not thread-safe.
+ // Start the scheduler.
//
void
startup (size_t max_active,
size_t init_active = 1,
- size_t max_threads = 0);
+ size_t max_threads = 0,
+ size_t queue_depth = 0);
// Wait for all the helper threads to terminate. Throw system_error on
// failure. Note that the initially active threads are not waited for.
@@ -201,6 +132,35 @@ namespace build2
stat
shutdown ();
+ // If initially active thread(s) (besides the one that calls startup())
+ // exist before the call to startup(), then they must call join() before
+ // executing any tasks. The two common cases where you don't have to call
+ // join are a single active thread that calls startup()/shutdown() or
+ // active thread(s) that are created after startup().
+ //
+ void
+ join ()
+ {
+ assert (task_queue_ = nullptr);
+
+ // Lock the mutex to make sure the values set in startup() are visible
+ // in this thread.
+ //
+ lock l (mutex_);
+ }
+
+ // If initially active thread(s) participate in multiple schedulers and/or
+ // sessions (intervals between startup() and shutdown()), then they must
+ // call leave() before joining another scheduler/session. Note that this
+ // applies to the active thread that calls shutdown(). Note that a thread
+ // can only participate in one scheduler at a time.
+ //
+ void
+ leave ()
+ {
+ task_queue_ = nullptr;
+ }
+
// Return the number of hardware threads or 0 if unable to determine.
//
static size_t
@@ -252,21 +212,7 @@ namespace build2
template <typename F, typename... A>
static void
- task_thunk (scheduler& s, lock& ql, void* td)
- {
- using task = task_type<F, A...>;
-
- // Move the data and release the lock.
- //
- task t (move (*static_cast<task*> (td)));
- ql.unlock ();
-
- t.thunk (std::index_sequence_for<A...> ());
-
- atomic_count& tc (*t.task_count);
- if (--tc == 0)
- s.resume (tc); // Resume a waiter, if any.
- }
+ task_thunk (scheduler&, lock&, void*);
template <typename T>
static std::decay_t<T>
@@ -284,8 +230,7 @@ namespace build2
// (init_active + helpers) <= max_threads
//
// Note that the first three are immutable between startup() and
- // shutdown() so can be accessed without a lock (@@ What about
- // the case of multiple initially active?).
+ // shutdown() so can be accessed without a lock (but see join()).
//
size_t init_active_ = 0; // Initially active threads.
size_t max_active_ = 0; // Maximum number of active threads.
@@ -378,11 +323,18 @@ namespace build2
size_t stat_full = 0; // Number of times pop() returned NULL.
- // head <= stop <= tail
+ // Our task queue is circular with head being the index of the first
+ // element and tail -- of the last. Since this makes the empty and one
+ // element cases indistinguishable, we also keep the size.
//
- size_t head = 0; // Index of the first element.
- size_t stop = 0; // Index of the element to stop at in pop_back().
- size_t tail = 0; // Index of the one past last element.
+ // The mark is an index somewhere between (figuratively speaking) head
+ // and tail, if enabled. If the mark is hit, then it is disabled until
+ // the queue becomes empty.
+ //
+ size_t head = 0;
+ size_t mark = 0;
+ size_t tail = 0;
+ size_t size = 0;
unique_ptr<task_data[]> data;
@@ -398,40 +350,102 @@ namespace build2
task_data*
push (task_queue& tq)
{
- return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr;
+ size_t& s (tq.size);
+ size_t& t (tq.tail);
+
+ if (s != task_queue_depth_)
+ {
+ // normal wrap empty
+ // | | |
+ t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
+ s++;
+ return &tq.data[t];
+ }
+
+ return nullptr;
}
bool
- empty_front (task_queue& tq) const {return tq.head == tq.tail;}
+ empty_front (task_queue& tq) const {return tq.size == 0;}
void
pop_front (task_queue& tq, lock& ql)
{
- task_data& td (tq.data[tq.head++]);
+ size_t& s (tq.size);
+ size_t& h (tq.head);
+ size_t& m (tq.mark);
+
+ bool a (h == m); // Adjust mark?
+ task_data& td (tq.data[h]);
+
+ // normal wrap empty
+ // | | |
+ h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;
- if (tq.head == tq.tail)
- tq.stop = tq.head = tq.tail = 0; // Reset.
- else if (tq.head > tq.stop)
- tq.stop = tq.head;
+ if (--s == 0 || a)
+ m = h; // Reset or adjust the mark.
// The thunk moves the task data to its stack, releases the lock,
// and continues to execute the task.
//
td.thunk (*this, ql, &td.data);
+ ql.lock ();
}
bool
- empty_back (task_queue& tq) const {return tq.stop == tq.tail;}
+ empty_back (task_queue& tq) const
+ {
+ return tq.size == 0 || tq.mark == task_queue_depth_;
+ }
void
pop_back (task_queue& tq, lock& ql)
{
- task_data& td (tq.data[--tq.tail]);
+ size_t& s (tq.size);
+ size_t& t (tq.tail);
+ size_t& m (tq.mark);
+
+ bool a (t == m); // Adjust mark?
- if (tq.head == tq.tail)
- tq.stop = tq.head = tq.tail = 0;
+ task_data& td (tq.data[t]);
+
+ // Save the old queue mark and set the new one in case the task we are
+ // about to run adds sub-tasks.
+ //
+ size_t om (m);
+ m = t; // Where next push() will go.
+
+ // normal wrap empty
+ // | | |
+ t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
+ --s;
td.thunk (*this, ql, &td.data);
+ ql.lock ();
+
+ // Restore the old mark (which we might have to adjust).
+ //
+ if (s == 0)
+ m = t; // Reset the mark.
+ else if (a)
+ m = task_queue_depth_; // Disable the mark.
+ else
+ // What happens if head goes past the old mark? In this case we will
+ // get into the empty queue state before we end up making any (wrong)
+ // decisions based on this value. Unfortunately there is no way to
+ // detect this (and do some sanity asserts) since things can wrap
+ // around.
+ //
+ // To put it another way, the understanding here is that after the
+ // task returns we will either have an empty queue or there will still
+ // be tasks between the old mark and and the current tail, something
+ // along these lines:
+ //
+ // OOOOOXXXXOOO
+ // | | |
+ // m h t
+ //
+ m = om;
}
// Each thread has its own queue. Instead of allocating all max_threads of
@@ -440,9 +454,19 @@ namespace build2
//
vector<unique_ptr<task_queue>> task_queues_;
- // TLS cache of each thread's task queue.
+ // TLS cache of thread's task queue.
+ //
+ static
+ // Apparently Apple's Clang "temporarily disabled" C++11 thread_local
+ // until they can implement a "fast" version, which reportedly happened in
+ // XCode 8. So for now we will continue using __thread for this target.
//
- thread_local static task_queue* task_queue_;
+#if defined(__apple_build_version__) && __apple_build_version__ < 8000000
+ __thread
+#else
+ thread_local
+#endif
+ task_queue* task_queue_;
task_queue&
create_queue ();
@@ -453,4 +477,6 @@ namespace build2
extern scheduler sched;
}
+#include <build2/scheduler.txx>
+
#endif // BUILD2_SCHEDULER
diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx
index b62d2d4..4619e85 100644
--- a/build2/scheduler.cxx
+++ b/build2/scheduler.cxx
@@ -4,6 +4,8 @@
#include <build2/scheduler>
+#include <cerrno>
+
using namespace std;
namespace build2
@@ -16,27 +18,14 @@ namespace build2
// See if we can run some of our own tasks.
//
- task_queue& tq (*task_queue_); // Must have been initializied by async().
+ task_queue& tq (*task_queue_); // Must have been set by async() or task
+ // would have been 0.
for (lock ql (tq.mutex); !tq.shutdown && !empty_back (tq); )
- {
- // Save the old stop point and set the new one in case the task we are
- // about to run adds sub-tasks.
- //
- size_t stop (tq.stop);
- tq.stop = tq.tail - 1; // Index of the first sub-task to be added (-1
- // is for the following pop_back()).
-
- pop_back (tq, ql); // Releases the lock.
- ql.lock ();
-
- // Restore the old stop point which we might have to adjust.
- //
- tq.stop = tq.head > stop ? tq.head : tq.tail < stop ? tq.tail : stop;
- }
+ pop_back (tq, ql);
- // Note that empty task queue doesn't automatically mean the task count is
- // zero (some might still be executing asynchronously).
+ // Note that empty task queue doesn't automatically mean the task count
+ // is zero (some might still be executing asynchronously).
//
if (task_count == 0)
return;
@@ -135,8 +124,16 @@ namespace build2
}
void scheduler::
- startup (size_t max_active, size_t init_active, size_t max_threads)
+ startup (size_t max_active,
+ size_t init_active,
+ size_t max_threads,
+ size_t queue_depth)
{
+ // Lock the mutex to make sure our changes are visible in (other) active
+ // threads.
+ //
+ lock l (mutex_);
+
// Use 4x max_active on 32-bit and 8x max_active on 64-bit. Unless we were
// asked to run serially.
//
@@ -153,30 +150,53 @@ namespace build2
max_threads_ = max_threads;
// This value should be proportional to the amount of hardware concurrency
- // we have. Note that the queue entry is quite sizable.
+ // we have (no use queing things if helpers cannot keep up). Note that the
+ // queue entry is quite sizable.
//
- task_queue_depth_ = max_active * sizeof (void*) * 4;
+ task_queue_depth_ = queue_depth != 0
+ ? queue_depth
+ : max_active * sizeof (void*) * 2;
- task_queues_.clear ();
task_queues_.reserve (max_threads_);
- // Pick a nice prime for common max_threads numbers. Though Intel Xeons
- // are all over the map when it comes to cores (6, 8, 10, 12, 14, 16,
- // 18, 20, 22).
+ // Pick a nice prime for common max_threads numbers. Experience shows that
+ // we want something close to 2x for small numbers, then reduce to 1.5x
+ // in-between, and 1x for large ones.
+ //
+ // Note that Intel Xeons are all over the map when it comes to cores (6,
+ // 8, 10, 12, 14, 16, 18, 20, 22).
//
wait_queue_size_ = // HW threads x bits
+ //
+ // 2x
+ //
max_threads == 8 ? 17 : // 2 x 4
max_threads == 16 ? 31 : // 4 x 4, 2 x 8
- max_threads == 32 ? 63 : // 4 x 8
- max_threads == 48 ? 97 : // 6 x 8
- max_threads == 64 ? 127 : // 8 x 8
- max_threads == 96 ? 191 : // 12 x 8
- max_threads == 128 ? 257 : // 16 x 8
- max_threads == 192 ? 383 : // 24 x 8
- max_threads == 256 ? 509 : // 32 x 8
- max_threads == 384 ? 769 : // 48 x 8
- max_threads == 512 ? 1021 : // 64 x 8
- 2 * max_threads - 1;
+ //
+ // 1.5x
+ //
+ max_threads == 32 ? 47 : // 4 x 8
+ max_threads == 48 ? 53 : // 6 x 8
+ max_threads == 64 ? 67 : // 8 x 8
+ max_threads == 80 ? 89 : // 10 x 8
+ //
+ // 1x
+ //
+ max_threads == 96 ? 101 : // 12 x 8
+ max_threads == 112 ? 127 : // 14 x 8
+ max_threads == 128 ? 131 : // 16 x 8
+ max_threads == 144 ? 139 : // 18 x 8
+ max_threads == 160 ? 157 : // 20 x 8
+ max_threads == 176 ? 173 : // 22 x 8
+ max_threads == 192 ? 191 : // 24 x 8
+ max_threads == 224 ? 223 : // 28 x 8
+ max_threads == 256 ? 251 : // 32 x 8
+ max_threads == 288 ? 271 : // 36 x 8
+ max_threads == 320 ? 313 : // 40 x 8
+ max_threads == 352 ? 331 : // 44 x 8
+ max_threads == 384 ? 367 : // 48 x 8
+ max_threads == 512 ? 499 : // 64 x 8
+ max_threads - 1; // Assume max_threads is even.
wait_queue_.reset (new wait_slot[wait_queue_size_]);
@@ -249,6 +269,11 @@ namespace build2
l.lock ();
}
+ // Free the memory.
+ //
+ wait_queue_.reset ();
+ task_queues_.clear ();
+
r.thread_max_active = max_active_;
r.thread_max_total = max_threads_;
r.thread_max_waiting = stat_max_waiters_;
@@ -281,7 +306,7 @@ namespace build2
starting_++;
l.unlock ();
- // Restore the counter if the thread creation fails.
+ // Restore the counters if the thread creation fails.
//
struct guard
{
@@ -338,10 +363,8 @@ namespace build2
{
task_queue& tq (*s.task_queues_[i]);
- for (lock ql (tq.mutex);
- !tq.shutdown && !s.empty_front (tq);
- ql.lock ())
- s.pop_front (tq, ql); // Releases the lock.
+ for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); )
+ s.pop_front (tq, ql);
}
l.lock ();
@@ -367,16 +390,30 @@ namespace build2
s.helpers_--;
}
- thread_local scheduler::task_queue* scheduler::task_queue_ = nullptr;
+#if defined(__apple_build_version__) && __apple_build_version__ < 8000000
+ __thread
+#else
+ thread_local
+#endif
+ scheduler::task_queue* scheduler::task_queue_ = nullptr;
auto scheduler::
create_queue () -> task_queue&
{
- lock l (mutex_);
- task_queues_.push_back (make_unique<task_queue> (task_queue_depth_));
- task_queue_ = task_queues_.back ().get ();
- task_queue_->shutdown = shutdown_;
- return *task_queue_;
+ // Note that task_queue_depth is immutable between startup() and
+ // shutdown() (but see join()).
+ //
+ unique_ptr<task_queue> tqp (new task_queue (task_queue_depth_));
+ task_queue& tq (*tqp);
+
+ {
+ lock l (mutex_);
+ tq.shutdown = shutdown_;
+ task_queues_.push_back (move (tqp));
+ }
+
+ task_queue_ = &tq;
+ return tq;
}
scheduler sched;
diff --git a/build2/scheduler.txx b/build2/scheduler.txx
new file mode 100644
index 0000000..9202f0b
--- /dev/null
+++ b/build2/scheduler.txx
@@ -0,0 +1,91 @@
+// file : build2/scheduler.txx -*- C++ -*-
+// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd
+// license : MIT; see accompanying LICENSE file
+
+#include <cerrno>
+
+namespace build2
+{
+ template <typename F, typename... A>
+ void scheduler::
+ async (atomic_count& task_count, F&& f, A&&... a)
+ {
+ using task = task_type<F, A...>;
+
+ static_assert (sizeof (task) <= sizeof (task_data::data),
+ "insufficient space");
+
+ static_assert (std::is_trivially_destructible<task>::value,
+ "not trivially destructible");
+
+ // Push into the queue unless we are running serially or the queue is
+ // full.
+ //
+ task_data* td (nullptr);
+
+ if (max_active_ != 1)
+ {
+ task_queue* tq (task_queue_); // Single load.
+ if (tq == nullptr)
+ tq = &create_queue ();
+
+ lock ql (tq->mutex);
+
+ if (tq->shutdown)
+ throw system_error (ECANCELED, std::system_category ());
+
+ if ((td = push (*tq)) != nullptr)
+ {
+ // Package the task.
+ //
+ new (&td->data) task {
+ &task_count,
+ decay_copy (forward <F> (f)),
+ typename task::args_type (decay_copy (forward <A> (a))...)};
+
+ td->thunk = &task_thunk<F, A...>;
+ }
+ else
+ tq->stat_full++;
+ }
+
+ // If serial/full, then run the task synchronously. In this case
+ // there is no need to mess with task count.
+ //
+ if (td == nullptr)
+ {
+ forward<F> (f) (forward<A> (a)...);
+ return;
+ }
+
+ // Increment the task count.
+ //
+ task_count.fetch_add (1, std::memory_order_release);
+
+ lock l (mutex_);
+ task_ = true;
+
+ // If there is a spare active thread, wake up (or create) the helper.
+ //
+ if (active_ < max_active_)
+ activate_helper (l);
+ }
+
+ template <typename F, typename... A>
+ void scheduler::
+ task_thunk (scheduler& s, lock& ql, void* td)
+ {
+ using task = task_type<F, A...>;
+
+ // Move the data and release the lock.
+ //
+ task t (move (*static_cast<task*> (td)));
+ ql.unlock ();
+
+ t.thunk (std::index_sequence_for<A...> ());
+
+ atomic_count& tc (*t.task_count);
+ if (--tc == 0)
+ s.resume (tc); // Resume a waiter, if any.
+ }
+}