aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler
blob: ffb8acd6fc9507cedd2aca22ac363966940d945c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
// file      : build2/scheduler -*- C++ -*-
// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd
// license   : MIT; see accompanying LICENSE file

#ifndef BUILD2_SCHEDULER
#define BUILD2_SCHEDULER

#include <mutex>
#include <tuple>
#include <atomic>
#include <type_traits>        // aligned_storage, etc
#include <condition_variable>

#include <build2/types>
#include <build2/utility>

namespace build2
{
  // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g.,
  // running a process), where in comparison thread synchronization overhead
  // is negligible.
  //
  // A thread (called "master") may need to perform several tasks which can be
  // done in parallel (e.g., update all the prerequisites or run all the
  // tests). To acomplish this, the master, via a call to async(), can ask the
  // scheduler to run a task in another thread (called "helper"). If a helper
  // is available, then the task is executed asynchronously by such helper.
  // Otherwise, the task is executed synchronously as part of the async()
  // call. Once the master thread has scheduled all the tasks, it calls wait()
  // to await for their completion.
  //
  // The scheduler makes sure that only a certain number of threads (for
  // example, the number of available hardware threads) are "active" at any
  // given time (thus the reason why async() may choose to perform the task
  // synchronously). When a master thread calls wait(), it is "suspended"
  // until all its asynchronous tasks are completed (at which point it becomes
  // "ready"). A suspension of a master results in either another ready master
  // being "resumed" or another helper thread becoming available.
  //
  // On completion of a task a helper thread returns to the scheduler which
  // can again lead either to a ready master being resumed (in which case the
  // helper is suspended) or the helper becoming available to perform another
  // task.
  //
  // Note that suspended threads are not reused as helpers. Rather, a new
  // helper thread is always created if none is available. This is done to
  // allow a ready master to continue as soon as possible. If it were reused
  // as a helper, then it could be blocked on a nested wait() further down the
  // stack. This means that the number of threads created by the scheduler
  // will normally exceed the maximum active allowed.
  //
  class scheduler
  {
  public:
    using atomic_count = std::atomic<size_t>;

    // F should return void and not throw any exceptions. The way the result
    // of a task is communicated back to the master thread is ad hoc, usually
    // via "out" arguments. Such result(s) can only be retrieved by the master
    // once its task count reaches 0.
    //
    // The argument passing semantics is the same as for std::thread. In
    // particular, lvalue-references are passed as copies (use ref()/cref()
    // for the by-reference semantics).
    //
    // If the scheduler is shutdown, throw system_error(ECANCELED).
    //
    template <typename F, typename... A>
    void
    async (atomic_count& task_count, F&&, A&&...);

    // Wait until the task count reaches 0. If the scheduler is shutdown
    // while waiting, throw system_error(ECANCELED).
    //
    void
    wait (atomic_count& task_count);

    // Startup and shutdown.
    //
  public:
    // Unless already shut down, call shutdown() but ignore errors.
    //
    ~scheduler ();

    // Create a shut down scheduler.
    //
    scheduler () = default;

    // Create a started up scheduler.
    //
    // The initial active argument is the number of threads to assume are
    // already active (e.g., the calling thread). It must not be 0 (since
    // someone has to schedule the first task).
    //
    // If the maximum threads or task queue depth arguments are unspecified,
    // then appropriate defaults are used.
    //
    scheduler (size_t max_active,
               size_t init_active = 1,
               size_t max_threads = 0,
               size_t queue_depth = 0)
    {
      startup (max_active, init_active, max_threads, queue_depth);
    }

    // Start the scheduler.
    //
    void
    startup (size_t max_active,
             size_t init_active = 1,
             size_t max_threads = 0,
             size_t queue_depth = 0);

    // Wait for all the helper threads to terminate. Throw system_error on
    // failure. Note that the initially active threads are not waited for.
    // Return scheduling statistics.
    //
    struct stat
    {
      size_t thread_max_active     = 0; // max # of active threads allowed.
      size_t thread_max_total      = 0; // max # of total threads allowed.
      size_t thread_helpers        = 0; // # of helper threads created.
      size_t thread_max_waiting    = 0; // max # of waiters at any time.

      size_t task_queue_depth      = 0; // # of entries in a queue (capacity).
      size_t task_queue_full       = 0; // # of times task queue was full.

      size_t wait_queue_slots      = 0; // # of wait slots (buckets).
      size_t wait_queue_collisions = 0; // # of times slot had been occupied.
    };

    stat
    shutdown ();

    // If initially active thread(s) (besides the one that calls startup())
    // exist before the call to startup(), then they must call join() before
    // executing any tasks. The two common cases where you don't have to call
    // join are a single active thread that calls startup()/shutdown() or
    // active thread(s) that are created after startup().
    //
    void
    join ()
    {
      assert (task_queue_ = nullptr);

      // Lock the mutex to make sure the values set in startup() are visible
      // in this thread.
      //
      lock l (mutex_);
    }

    // If initially active thread(s) participate in multiple schedulers and/or
    // sessions (intervals between startup() and shutdown()), then they must
    // call leave() before joining another scheduler/session. Note that this
    // applies to the active thread that calls shutdown(). Note that a thread
    // can only participate in one scheduler at a time.
    //
    void
    leave ()
    {
      task_queue_ = nullptr;
    }

    // Return the number of hardware threads or 0 if unable to determine.
    //
    static size_t
    hardware_concurrency ()
    {
      return std::thread::hardware_concurrency ();
    }

  private:
    using lock = std::unique_lock<std::mutex>;

    void
    activate_helper (lock&);

    void
    create_helper (lock&);

    // We restrict ourselves to a single pointer as an argument in hope of
    // a small object optimization.
    //
    static void
    helper (void*);

    void
    suspend (atomic_count& task_count);

    void
    resume (atomic_count& task_count);

    // Task encapsulation.
    //
    template <typename F, typename... A>
    struct task_type
    {
      using func_type = std::decay_t<F>;
      using args_type = std::tuple<std::decay_t<A>...>;

      atomic_count* task_count;
      func_type func;
      args_type args;

      template <size_t... i>
      void
      thunk (std::index_sequence<i...>)
      {
        move (func) (std::get<i> (move (args))...);
      }
    };

    template <typename F, typename... A>
    static void
    task_thunk (scheduler&, lock&, void*);

    template <typename T>
    static std::decay_t<T>
    decay_copy (T&& x) {return forward<T> (x);}

  private:
    std::mutex mutex_;

    bool shutdown_ = true;   // Shutdown flag.
    bool task_ = false;      // Task queued flag (see below).

    // The constraints that we must maintain:
    //
    //                  active <= max_active
    // (init_active + helpers) <= max_threads
    //
    // Note that the first three are immutable between startup() and
    // shutdown() so can be accessed without a lock (but see join()).
    //
    size_t init_active_ = 0; // Initially active threads.
    size_t max_active_  = 0; // Maximum number of active threads.
    size_t max_threads_ = 0; // Maximum number of total threads.

    size_t helpers_     = 0; // Number of helper threads created so far.

    // Every thread that we manage must be accounted for in one of these
    // counters. And their sum should equal (init_active + helpers).
    //
    size_t active_   = 0;  // Active master threads executing a task.
    size_t idle_     = 0;  // Idle helper threads waiting for a task.
    size_t waiting_  = 0;  // Suspended master threads waiting for their tasks.
    size_t ready_    = 0;  // Ready master thread waiting to become active.
    size_t starting_ = 0;  // Helper threads starting up.

    std::condition_variable idle_condv_;  // Idle helpers queue.
    std::condition_variable ready_condv_; // Ready masters queue.

    // Statistics counters.
    //
    size_t stat_max_waiters_;
    size_t stat_wait_collisions_;

    // Wait queue.
    //
    // A wait slot blocks a bunch of threads. When they are (all) unblocked,
    // they re-examine their respective conditions and either carry on or
    // block again.
    //
    // The wait queue is a shard of slots. A thread picks a slot based on the
    // address of its task count variable. How many slots do we need? This
    // depends on the number of waiters that we can have which cannot be
    // greater than the total number of threads.
    //
    struct wait_slot
    {
      std::mutex mutex;
      std::condition_variable condv;
      size_t waiters = 0;
      bool shutdown = true;
    };

    size_t wait_queue_size_; // Multiple of max_threads.
    unique_ptr<wait_slot[]> wait_queue_;

    // Task queue.
    //
    // Each queue has its own mutex. If the task_ flag above is true then
    // there *might* be a task in one of the queues. If it is false, then
    // it means there are either no tasks or someone is busy working on
    // them.
    //
    // For now we only support trivially-destructible tasks.
    //
    struct task_data
    {
      std::aligned_storage<sizeof (void*) * 7>::type data;
      void (*thunk) (scheduler&, lock&, void*);
    };

    // We have two requirements: Firstly, we want to keep the master thread
    // (the one that called wait()) busy working though its own queue for as
    // long as possible before (if at all) it "reincarnates" as a helper. The
    // main reason for this is the limited number of helpers we can create.
    //
    // Secondly, we don't want to block wait() longer than necessary since the
    // master thread can do some work with the result. Plus, overall, we want
    // to "unwind" task hierarchies as soon as possible since they hold up
    // resources such as thread's stack. All this means that the master thread
    // can only work through tasks that it has queued at this "level" of the
    // async()/wait() calls since we know that wait() cannot return until
    // they are done.
    //
    // To satisfy the first requirement, the master and helper threads get the
    // tasks from different ends of the queue: master from the back while
    // helpers from the front. And the master always adds new tasks to the
    // back.
    //
    // To satisfy the second requirement, the master thread stores the index
    // of the first task it has queued at this "level" and makes sure it
    // doesn't try to deque any task beyond that.
    //
    size_t task_queue_depth_; // Multiple of max_active.

    struct task_queue
    {
      std::mutex mutex;
      bool shutdown = false;

      size_t stat_full = 0; // Number of times pop() returned NULL.

      // Our task queue is circular with head being the index of the first
      // element and tail -- of the last. Since this makes the empty and one
      // element cases indistinguishable, we also keep the size.
      //
      // The mark is an index somewhere between (figuratively speaking) head
      // and tail, if enabled. If the mark is hit, then it is disabled until
      // the queue becomes empty.
      //
      size_t head = 0;
      size_t mark = 0;
      size_t tail = 0;
      size_t size = 0;

      unique_ptr<task_data[]> data;

      task_queue (size_t depth): data (new task_data[depth]) {}
    };

    // Task queue API. Expects the queue mutex to be locked.
    //

    // Push a new task to the queue returning a pointer to the task data to be
    // filled or NULL if the queue is full.
    //
    task_data*
    push (task_queue& tq)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);

      if (s != task_queue_depth_)
      {
        //                                         normal  wrap empty
        //                                         |       |    |
        t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
        s++;
        return &tq.data[t];
      }

      return nullptr;
    }

    bool
    empty_front (task_queue& tq) const {return tq.size == 0;}

    void
    pop_front (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& h (tq.head);
      size_t& m (tq.mark);

      bool a (h == m); // Adjust mark?
      task_data& td (tq.data[h]);

      //                                         normal  wrap empty
      //                                         |       |    |
      h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;

      if (--s == 0 || a)
        m = h; // Reset or adjust the mark.

      // The thunk moves the task data to its stack, releases the lock,
      // and continues to execute the task.
      //
      td.thunk (*this, ql, &td.data);
      ql.lock ();
    }

    bool
    empty_back (task_queue& tq) const
    {
      return tq.size == 0 || tq.mark == task_queue_depth_;
    }

    void
    pop_back (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);
      size_t& m (tq.mark);

      bool a (t == m); // Adjust mark?

      task_data& td (tq.data[t]);

      // Save the old queue mark and set the new one in case the task we are
      // about to run adds sub-tasks.
      //
      size_t om (m);
      m = t; // Where next push() will go.

      //                     normal  wrap                     empty
      //                     |       |                        |
      t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
      --s;

      td.thunk (*this, ql, &td.data);
      ql.lock ();

      // Restore the old mark (which we might have to adjust).
      //
      if (s == 0)
        m = t;                 // Reset the mark.
      else if (a)
        m = task_queue_depth_; // Disable the mark.
      else
        // What happens if head goes past the old mark? In this case we will
        // get into the empty queue state before we end up making any (wrong)
        // decisions based on this value. Unfortunately there is no way to
        // detect this (and do some sanity asserts) since things can wrap
        // around.
        //
        // To put it another way, the understanding here is that after the
        // task returns we will either have an empty queue or there will still
        // be tasks between the old mark and and the current tail, something
        // along these lines:
        //
        // OOOOOXXXXOOO
        //   |  |  |
        //   m  h  t
        //
        m = om;
    }

    // Each thread has its own queue. Instead of allocating all max_threads of
    // them up front, we will reserve the space but will only construct queues
    // on demand.
    //
    vector<unique_ptr<task_queue>> task_queues_;

    // TLS cache of thread's task queue.
    //
    static
    // Apparently Apple's Clang "temporarily disabled" C++11 thread_local
    // until they can implement a "fast" version, which reportedly happened in
    // XCode 8. So for now we will continue using __thread for this target.
    //
#if defined(__apple_build_version__) && __apple_build_version__ < 8000000
    __thread
#else
    thread_local
#endif
    task_queue* task_queue_;

    task_queue&
    create_queue ();
  };

  // Main (and only) scheduler. Started up and shut down in main().
  //
  extern scheduler sched;
}

#include <build2/scheduler.txx>

#endif // BUILD2_SCHEDULER