aboutsummaryrefslogtreecommitdiff
path: root/build2/scheduler.txx
blob: c7e6ef2837b4fec6b4468550ec0c21b3d5e79d82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// file      : build2/scheduler.txx -*- C++ -*-
// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd
// license   : MIT; see accompanying LICENSE file

#include <cerrno>

namespace build2
{
  template <typename F, typename... A>
  bool scheduler::
  async (size_t start_count, atomic_count& task_count, F&& f, A&&... a)
  {
    using task = task_type<F, A...>;

    static_assert (sizeof (task) <= sizeof (task_data::data),
                   "insufficient space");

    static_assert (std::is_trivially_destructible<task>::value,
                   "not trivially destructible");

    // If running serially, then run the task synchronously. In this case
    // there is no need to mess with task count.
    //
    if (max_active_ == 1)
    {
      forward<F> (f) (forward<A> (a)...);

      // See if we need to call the monitor (see the concurrent version in
      // execute() for details).
      //
      if (monitor_count_ != nullptr)
      {
        size_t v (monitor_count_->load (memory_order_relaxed));
        if (v != monitor_init_)
        {
          size_t t (monitor_tshold_.load (memory_order_relaxed));
          if (v > monitor_init_ ? (v >= t) : (v <= t))
            monitor_tshold_.store (monitor_func_ (v), memory_order_relaxed);
        }
      }

      return false;
    }

    // Try to push the task into the queue falling back to running serially
    // if the queue is full.
    //
    task_queue* tq (task_queue_); // Single load.
    if (tq == nullptr)
      tq = &create_queue ();

    {
      lock ql (tq->mutex);

      if (tq->shutdown)
        throw_generic_error (ECANCELED);

      if (task_data* td = push (*tq))
      {
        // Package the task (under lock).
        //
        new (&td->data) task {
          &task_count,
          start_count,
          decay_copy (forward<F> (f)),
          typename task::args_type (decay_copy (forward<A> (a))...)};

        td->thunk = &task_thunk<F, A...>;
      }
      else
      {
        tq->stat_full++;

        // We have to perform the same mark adjust/restore as in pop_back()
        // since the task we are about to execute synchronously may try to
        // work the queue.
        //
        // It would have been cleaner to package all this logic into push()
        // but that would require dragging function/argument types into it.
        //
        size_t& s (tq->size);
        size_t& t (tq->tail);
        size_t& m (tq->mark);

        size_t om (m);
        m = task_queue_depth_;

        ql.unlock ();
        forward<F> (f) (forward<A> (a)...); // Should not throw.

        if (om != task_queue_depth_)
        {
          ql.lock ();
          m = s == 0 ? t : om;
        }

        return false;
      }
    }

    // Increment the task count.
    //
    task_count.fetch_add (1, std::memory_order_release);

    // If there is a spare active thread, wake up (or create) the helper
    // (unless someone already snatched it).
    //
    if (queued_task_count_.load (std::memory_order_consume) != 0)
    {
      lock l (mutex_);

      if (active_ < max_active_)
        activate_helper (l);
    }

    return true;
  }

  template <typename F, typename... A>
  void scheduler::
  task_thunk (scheduler& s, lock& ql, void* td)
  {
    using task = task_type<F, A...>;

    // Move the data and release the lock.
    //
    task t (move (*static_cast<task*> (td)));
    ql.unlock ();

    t.thunk (std::index_sequence_for<A...> ());

    atomic_count& tc (*t.task_count);
    if (tc.fetch_sub (1, memory_order_release) - 1 <= t.start_count)
      s.resume (tc); // Resume waiters, if any.
  }
}