aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler
blob: c6f4f7fecebd550d02a6d7bcd1c7a05ec596db73 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
// file      : build2/scheduler -*- C++ -*-
// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd
// license   : MIT; see accompanying LICENSE file

#ifndef BUILD2_SCHEDULER
#define BUILD2_SCHEDULER

#include <list>
#include <mutex>
#include <tuple>
#include <atomic>
#include <type_traits>        // aligned_storage, etc
#include <condition_variable>

#include <build2/types>
#include <build2/utility>

namespace build2
{
  // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g.,
  // running a process), where in comparison thread synchronization overhead
  // is negligible.
  //
  // A thread (called "master") may need to perform several tasks which can be
  // done in parallel (e.g., update all the prerequisites or run all the
  // tests). To acomplish this, the master, via a call to async(), can ask the
  // scheduler to run a task in another thread (called "helper"). If a helper
  // is available, then the task is executed asynchronously by such helper.
  // Otherwise, the task is (normally) executed synchronously as part of the
  // wait() call below. However, in certain cases (serial execution or full
  // queue), the task may be executed synchronously as part of the async()
  // call itself. Once the master thread has scheduled all the tasks, it calls
  // wait() to await for their completion.
  //
  // The scheduler makes sure that only a certain number of threads (for
  // example, the number of available hardware threads) are "active" at any
  // given time. When a master thread calls wait(), it is "suspended" until
  // all its asynchronous tasks are completed (at which point it becomes
  // "ready"). A suspension of a master results in either another ready master
  // being "resumed" or another helper thread becoming available.
  //
  // On completion of a task a helper thread returns to the scheduler which
  // can again lead either to a ready master being resumed (in which case the
  // helper is suspended) or the helper becoming available to perform another
  // task.
  //
  // Note that suspended threads are not reused as helpers. Rather, a new
  // helper thread is always created if none is available. This is done to
  // allow a ready master to continue as soon as possible. If it were reused
  // as a helper, then it could be blocked on a nested wait() further down the
  // stack. All this means that the number of threads created by the scheduler
  // will normally exceed the maximum active allowed.
  //
  class scheduler
  {
  public:
    using atomic_count = std::atomic<size_t>;

    // F should return void and not throw any exceptions. The way the result
    // of a task is communicated back to the master thread is ad hoc, usually
    // via "out" arguments. Such result(s) can only be retrieved by the master
    // once its task count reaches the start count.
    //
    // The argument passing semantics is the same as for std::thread. In
    // particular, lvalue-references are passed as copies (use ref()/cref()
    // for the by-reference semantics), except the case where the task is
    // executed synchronously and as part of the async() call itself (this
    // subtlety can become important when passing shared locks; you would
    // only want it to be copied if the task is queued).
    //
    // Return true if the task was queued and false if it was executed
    // synchronously.
    //
    // If the scheduler is shutdown, throw system_error(ECANCELED).
    //
    template <typename F, typename... A>
    bool
    async (size_t start_count, atomic_count& task_count, F&&, A&&...);

    template <typename F, typename... A>
    bool
    async (atomic_count& task_count, F&& f, A&&... a)
    {
      return async (0, task_count, forward<F> (f), forward<A> (a)...);
    }

    // Wait until the task count reaches the start count or less. If the
    // scheduler is shutdown while waiting, throw system_error(ECANCELED).
    // Return the value of task count. Note that this is a synchronizaiton
    // point (i.e., the task count is checked with memory_order_acquire).
    //
    // Note that it is valid to wait on another thread's task count (that is,
    // without making any async() calls in this thread). However, if the start
    // count differs from the one passed to async(), then whomever sets the
    // start count to this alternative value must also call resume() below
    // in order to signal waiting threads.
    //
    // Note also that in this case (waiting on someone else's start count),
    // the async() call could execute the tasks synchronously without ever
    // incrementing the task count. Thus if waiting on another thread's start
    // count starts before/during async() calls, then it must be "gated" with
    // an alternative (lower) start count.
    //
    // Finally, if waiting on someone else's start count, it may be unsafe
    // (from the deadlock's point of view) to continue working through our own
    // queue (i.e., we may block waiting on a task that has been queued before
    // us which in turn may end up waiting on "us").
    //
    enum work_queue
    {
      work_none, // Don't work own queue.
      work_one,  // Work own queue rechecking the task count after every task.
      work_all   // Work own queue before rechecking the task count.
    };

    size_t
    wait (size_t start_count,
          const atomic_count& task_count,
          work_queue = work_all);

    size_t
    wait (const atomic_count& task_count, work_queue wq = work_all)
    {
      return wait (0, task_count, wq);
    }

    // Resume threads waiting on this task count.
    //
    void
    resume (const atomic_count& task_count);

    // An active thread that is about to wait for potentially significant time
    // on something other than task_count (e.g., mutex, condition variable)
    // should deactivate itself with the scheduler and then reactivate once
    // done waiting.
    //
    void
    deactivate ();

    void
    activate (bool collision = false);

    // Startup and shutdown.
    //
  public:
    // Unless already shut down, call shutdown() but ignore errors.
    //
    ~scheduler ();

    // Create a shut down scheduler.
    //
    scheduler () = default;

    // Create a started up scheduler.
    //
    // The initial active argument is the number of threads to assume are
    // already active (e.g., the calling thread). It must not be 0 (since
    // someone has to schedule the first task).
    //
    // If the maximum threads or task queue depth arguments are unspecified,
    // then appropriate defaults are used.
    //
    scheduler (size_t max_active,
               size_t init_active = 1,
               size_t max_threads = 0,
               size_t queue_depth = 0)
    {
      startup (max_active, init_active, max_threads, queue_depth);
    }

    // Start the scheduler.
    //
    void
    startup (size_t max_active,
             size_t init_active = 1,
             size_t max_threads = 0,
             size_t queue_depth = 0);

    // Return true if the scheduler was started up.
    //
    // Note: can only be called from threads that have observed creation,
    // startup, or shutdown.
    //
    bool
    started () const {return !shutdown_;}

    // Tune a started up scheduler.
    //
    // Currently one cannot increase the number of max_active. Pass 0 to
    // restore the initial value.
    //
    // Note that tuning can only be done while the scheduler is inactive, that
    // is, no threads are executing a task or are suspended. For example, in a
    // setup with a single initial active thread that would be after a return
    // from the top-level wait() call.
    //
    void
    tune (size_t max_active);

    // Return true if the scheduler is running serial.
    //
    // Note: can only be called from threads that have observed startup.
    //
    bool
    serial () const {return max_active_ == 1;}

    // Wait for all the helper threads to terminate. Throw system_error on
    // failure. Note that the initially active threads are not waited for.
    // Return scheduling statistics.
    //
    struct stat
    {
      size_t thread_max_active     = 0; // max # of active threads allowed.
      size_t thread_max_total      = 0; // max # of total threads allowed.
      size_t thread_helpers        = 0; // # of helper threads created.
      size_t thread_max_waiting    = 0; // max # of waiters at any time.

      size_t task_queue_depth      = 0; // # of entries in a queue (capacity).
      size_t task_queue_full       = 0; // # of times task queue was full.
      size_t task_queue_remain     = 0; // # of tasks remaining in queue.

      size_t wait_queue_slots      = 0; // # of wait slots (buckets).
      size_t wait_queue_collisions = 0; // # of times slot had been occupied.
    };

    stat
    shutdown ();

    // If initially active thread(s) (besides the one that calls startup())
    // exist before the call to startup(), then they must call join() before
    // executing any tasks. The two common cases where you don't have to call
    // join are a single active thread that calls startup()/shutdown() or
    // active thread(s) that are created after startup().
    //
    void
    join ()
    {
      assert (task_queue_ = nullptr);

      // Lock the mutex to make sure the values set in startup() are visible
      // in this thread.
      //
      lock l (mutex_);
    }

    // If initially active thread(s) participate in multiple schedulers and/or
    // sessions (intervals between startup() and shutdown()), then they must
    // call leave() before joining another scheduler/session. Note that this
    // applies to the active thread that calls shutdown(). Note that a thread
    // can only participate in one scheduler at a time.
    //
    void
    leave ()
    {
      task_queue_ = nullptr;
    }

    // Return the number of hardware threads or 0 if unable to determine.
    //
    static size_t
    hardware_concurrency ()
    {
      return std::thread::hardware_concurrency ();
    }

    // Return a prime number that can be used as a lock shard size that's
    // appropriate for the scheduler's concurrency. Use power of two values
    // for mul for higher-contention shards and for div for lower-contention
    // ones. Always return 1 for serial execution.
    //
    // Note: can only be called from threads that have observed startup.
    //
    size_t
    shard_size (size_t mul = 1, size_t div = 1) const;

  private:
    using lock = std::unique_lock<std::mutex>;

    void
    activate_helper (lock&);

    void
    create_helper (lock&);

    // We restrict ourselves to a single pointer as an argument in hope of
    // a small object optimization.
    //
    static void
    helper (void*);

    size_t
    suspend (size_t start_count, const atomic_count& task_count);

    // Task encapsulation.
    //
    template <typename F, typename... A>
    struct task_type
    {
      using func_type = std::decay_t<F>;
      using args_type = std::tuple<std::decay_t<A>...>;

      atomic_count* task_count;
      size_t start_count;
      func_type func;
      args_type args;

      template <size_t... i>
      void
      thunk (std::index_sequence<i...>) noexcept
      {
        move (func) (std::get<i> (move (args))...);
      }
    };

    template <typename F, typename... A>
    static void
    task_thunk (scheduler&, lock&, void*);

    template <typename T>
    static std::decay_t<T>
    decay_copy (T&& x) {return forward<T> (x);}

  private:
    std::mutex mutex_;

    bool shutdown_ = true;  // Shutdown flag.

    // The constraints that we must maintain:
    //
    //                  active <= max_active
    // (init_active + helpers) <= max_threads (soft; see activate_helper())
    //
    // Note that the first three are immutable between startup() and
    // shutdown() so can be accessed without a lock (but see join()).
    //
    size_t init_active_ = 0; // Initially active threads.
    size_t max_active_  = 0; // Maximum number of active threads.
    size_t max_threads_ = 0; // Maximum number of total threads.

    size_t helpers_     = 0; // Number of helper threads created so far.

    // Every thread that we manage must be accounted for in one of these
    // counters. And their sum should equal (init_active + helpers).
    //
    size_t active_   = 0;  // Active master threads executing a task.
    size_t idle_     = 0;  // Idle helper threads waiting for a task.
    size_t waiting_  = 0;  // Suspended master threads waiting for their tasks.
    size_t ready_    = 0;  // Ready master thread waiting to become active.
    size_t starting_ = 0;  // Helper threads starting up.

    // Original values (as specified during startup) that can be altered via
    // tuning.
    //
    size_t orig_max_active_ = 0;

    std::condition_variable idle_condv_;  // Idle helpers queue.
    std::condition_variable ready_condv_; // Ready masters queue.

    // Statistics counters.
    //
    size_t stat_max_waiters_;
    size_t stat_wait_collisions_;

    // Wait queue.
    //
    // A wait slot blocks a bunch of threads. When they are (all) unblocked,
    // they re-examine their respective conditions and either carry on or
    // block again.
    //
    // The wait queue is a shard of slots. A thread picks a slot based on the
    // address of its task count variable. How many slots do we need? This
    // depends on the number of waiters that we can have which cannot be
    // greater than the total number of threads.
    //
    // The pointer to the task count is used to identify the already waiting
    // group of threads for collision statistics.
    //
    struct wait_slot
    {
      std::mutex mutex;
      std::condition_variable condv;
      size_t waiters = 0;
      const atomic_count* task_count;
      bool shutdown = true;
    };

    size_t wait_queue_size_; // Proportional to max_threads.
    unique_ptr<wait_slot[]> wait_queue_;

    // Task queue.
    //
    // Each queue has its own mutex plus we have an atomic total count of the
    // queued tasks. Note that it should only be modified while holding one
    // of the queue lock.
    //
    atomic_count queued_task_count_;

    // For now we only support trivially-destructible tasks.
    //
    struct task_data
    {
      std::aligned_storage<sizeof (void*) * 8>::type data;
      void (*thunk) (scheduler&, lock&, void*);
    };

    // We have two requirements: Firstly, we want to keep the master thread
    // (the one that called wait()) busy working though its own queue for as
    // long as possible before (if at all) it "reincarnates" as a helper. The
    // main reason for this is the limited number of helpers we can create.
    //
    // Secondly, we don't want to block wait() longer than necessary since the
    // master thread can do some work with the result. Plus, overall, we want
    // to "unwind" task hierarchies as soon as possible since they hold up
    // resources such as thread's stack. All this means that the master thread
    // can only work through tasks that it has queued at this "level" of the
    // async()/wait() calls since we know that wait() cannot return until
    // they are done.
    //
    // To satisfy the first requirement, the master and helper threads get the
    // tasks from different ends of the queue: master from the back while
    // helpers from the front. And the master always adds new tasks to the
    // back.
    //
    // To satisfy the second requirement, the master thread stores the index
    // of the first task it has queued at this "level" and makes sure it
    // doesn't try to deque any task beyond that.
    //
    size_t task_queue_depth_; // Multiple of max_active.

    struct task_queue
    {
      std::mutex mutex;
      bool shutdown = false;

      size_t stat_full = 0; // Number of times push() returned NULL.

      // Our task queue is circular with head being the index of the first
      // element and tail -- of the last. Since this makes the empty and one
      // element cases indistinguishable, we also keep the size.
      //
      // The mark is an index somewhere between (figuratively speaking) head
      // and tail, if enabled. If the mark is hit, then it is disabled until
      // the queue becomes empty or it is reset by a push.
      //
      size_t head = 0;
      size_t mark = 0;
      size_t tail = 0;
      size_t size = 0;

      unique_ptr<task_data[]> data;

      task_queue (size_t depth): data (new task_data[depth]) {}
    };

    // Task queue API. Expects the queue mutex to be locked.
    //

    // Push a new task to the queue returning a pointer to the task data to be
    // filled or NULL if the queue is full.
    //
    task_data*
    push (task_queue& tq)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);
      size_t& m (tq.mark);

      if (s != task_queue_depth_)
      {
        //                                         normal  wrap empty
        //                                         |       |    |
        t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
        s++;

        if (m == task_queue_depth_) // Enable the mark if first push.
          m = t;

        queued_task_count_.fetch_add (1, std::memory_order_release);
        return &tq.data[t];
      }

      return nullptr;
    }

    bool
    empty_front (task_queue& tq) const {return tq.size == 0;}

    void
    pop_front (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& h (tq.head);
      size_t& m (tq.mark);

      bool a (h == m); // Adjust mark?
      task_data& td (tq.data[h]);

      //                                         normal  wrap empty
      //                                         |       |    |
      h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;

      if (--s == 0 || a)
        m = h; // Reset or adjust the mark.

      queued_task_count_.fetch_sub (1, std::memory_order_release);

      // The thunk moves the task data to its stack, releases the lock,
      // and continues to execute the task.
      //
      td.thunk (*this, ql, &td.data);
      ql.lock ();
    }

    bool
    empty_back (task_queue& tq) const
    {
      return tq.size == 0 || tq.mark == task_queue_depth_;
    }

    void
    pop_back (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);
      size_t& m (tq.mark);

      bool a (t == m); // Adjust mark?

      task_data& td (tq.data[t]);

      // Save the old queue mark and disable it in case the task we are about
      // to run adds sub-tasks. The first push(), if any, will reset it.
      //
      size_t om (m);
      m = task_queue_depth_;

      //                     normal  wrap                     empty
      //                     |       |                        |
      t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
      --s;

      queued_task_count_.fetch_sub (1, std::memory_order_release);

      td.thunk (*this, ql, &td.data);
      ql.lock ();

      // Restore the old mark (which we might have to adjust).
      //
      if (s == 0)
        m = t;                 // Reset the mark.
      else if (a)
        m = task_queue_depth_; // Disable the mark.
      else
        // What happens if head goes past the old mark? In this case we will
        // get into the empty queue state before we end up making any (wrong)
        // decisions based on this value. Unfortunately there is no way to
        // detect this (and do some sanity asserts) since things can wrap
        // around.
        //
        // To put it another way, the understanding here is that after the
        // task returns we will either have an empty queue or there will still
        // be tasks between the old mark and the current tail, something along
        // these lines:
        //
        // OOOOOXXXXOOO
        //   |  |  |
        //   m  h  t
        //
        m = om;
    }

    // Each thread has its own queue which are stored in this list.
    //
    std::list<task_queue> task_queues_;

    // TLS cache of thread's task queue.
    //
    static
#ifdef __cpp_thread_local
    thread_local
#else
    __thread
#endif
    task_queue* task_queue_;

    task_queue&
    create_queue ();
  };
}

#include <build2/scheduler.txx>

#endif // BUILD2_SCHEDULER