aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler.txx
blob: c9d2d14911cf6cd8b77b3c6453b99568fdf1040d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// file      : build2/scheduler.txx -*- C++ -*-
// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd
// license   : MIT; see accompanying LICENSE file

#include <cerrno>

namespace build2
{
  template <typename F, typename... A>
  bool scheduler::
  async (size_t start_count, atomic_count& task_count, F&& f, A&&... a)
  {
    using task = task_type<F, A...>;

    static_assert (sizeof (task) <= sizeof (task_data::data),
                   "insufficient space");

    static_assert (std::is_trivially_destructible<task>::value,
                   "not trivially destructible");

    // Push into the queue unless we are running serially or the queue is
    // full.
    //
    task_data* td (nullptr);

    if (max_active_ != 1)
    {
      task_queue* tq (task_queue_); // Single load.
      if (tq == nullptr)
        tq = &create_queue ();

      lock ql (tq->mutex);

      if (tq->shutdown)
        throw system_error (ECANCELED, std::system_category ());

      if ((td = push (*tq)) != nullptr)
      {
        // Package the task (under lock).
        //
        new (&td->data) task {
          &task_count,
          start_count,
          decay_copy (forward<F> (f)),
          typename task::args_type (decay_copy (forward<A> (a))...)};

        td->thunk = &task_thunk<F, A...>;
      }
      else
        tq->stat_full++;
    }

    // If serial/full, then run the task synchronously. In this case there is
    // no need to mess with task count.
    //
    if (td == nullptr)
    {
      forward<F> (f) (forward<A> (a)...);
      return false;
    }

    // Increment the task count.
    //
    task_count.fetch_add (1, std::memory_order_release);

    // If there is a spare active thread, wake up (or create) the helper
    // (unless someone already snatched it).
    //
    if (queued_task_count_.load (std::memory_order_consume) != 0)
    {
      lock l (mutex_);

      if (active_ < max_active_)
        activate_helper (l);
    }

    return true;
  }

  template <typename F, typename... A>
  void scheduler::
  task_thunk (scheduler& s, lock& ql, void* td)
  {
    using task = task_type<F, A...>;

    // Move the data and release the lock.
    //
    task t (move (*static_cast<task*> (td)));
    ql.unlock ();

    t.thunk (std::index_sequence_for<A...> ());

    atomic_count& tc (*t.task_count);
    if (tc.fetch_sub (1, memory_order_release) - 1 <= t.start_count)
      s.resume (tc); // Resume waiters, if any.
  }
}