diff options
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r-- | bbot/agent/agent.cxx | 235 |
1 files changed, 156 insertions, 79 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index a8b7c77..850f2b1 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -20,8 +20,10 @@ #include <sys/ioctl.h> #include <sys/socket.h> +#include <map> #include <chrono> #include <random> +#include <iomanip> // setw() #include <iostream> #include <system_error> // generic_category() @@ -1635,20 +1637,57 @@ try offset = (tc_num - 1) * 100 + inst; - // Controller URLs. + // Controller priority to URLs map. // - if (argc < 2 && - !ops.dump_machines () && - !ops.fake_request_specified ()) + std::map<uint64_t, strings> controllers; + + for (int i (1); i != argc; ++i) { - fail << "controller url expected" << - info << "run " << argv[0] << " --help for details"; - } + // [<prio>=]<url> + // + string a (argv[i]); + + // See if we have priority, falling back to priority 0 if absent. + // + uint64_t prio (0); - strings controllers; + // Note that we can also have `=` in <url> (e.g., parameters) so we will + // only consider `=` as ours if prior to it we only have digits. + // + size_t p (a.find ('=')); + if (p != string::npos && a.find_first_not_of ("0123456789") == p) + { + // Require exactly four or five digits in case we later need to extend + // the priority levels beyond the 10 possible values (e.g., DDCCBBAA). + // + if (p != 4 && p != 5) + fail << "four or five-digit controller url priority expected in '" + << a << "'"; - for (int i (1); i != argc; ++i) - controllers.push_back (argv[i]); + char* e; + errno = 0; + prio = strtoull (a.c_str (), &e, 10); + assert (errno != ERANGE && e == a.c_str () + p); + + if (prio > 19999) + fail << "out of bounds controller url priority in '" << a << "'"; + + a.erase (0, p + 1); + } + + controllers[prio].push_back (move (a)); + } + + if (controllers.empty ()) + { + if (ops.dump_machines () || ops.fake_request_specified ()) + { + controllers[0].push_back ("https://example.org"); + } + else + fail << "controller url expected" << + info << "run " << argv[0] << " --help for details"; + } // Handle SIGHUP and SIGTERM. // @@ -1710,8 +1749,16 @@ try if (inst_max != 0) dr << info << "instance max " << inst_max; - for (const string& u: controllers) - dr << info << "controller url " << u; + // Note: keep last since don't restore fill/setw. + // + for (const pair<const uint64_t, strings>& p: controllers) + { + for (const string& u: p.second) + { + dr.os.fill ('0'); + dr << info << "controller url " << std::setw (4) << p.first << '=' << u; + } + } } // The work loop. The steps we go through are: @@ -1815,79 +1862,94 @@ try continue; } - // Prepare task request. + // If we get a task, these contain all the corresponding information. // - task_request_manifest tq { - hname, - tc_name, - tc_ver, - imode, - ilogin, - fingerprint, - machine_header_manifests {} - }; - - // Note: do not assume tq.machines.size () == ms.size (). + task_request_manifest tq; + task_response_manifest tr; + uint64_t prio; + string url; + + // Iterate over controller priorities in reverse, that is, from highest to + // lowest. // - for (const bootstrapped_machine& m: ms) - { - // @@ For now skip machines locked by other processes. - // - // @@ Note: skip machines being bootstrapped. - // - if (m.lock.locked ()) - tq.machines.emplace_back (m.manifest.machine.id, - m.manifest.machine.name, - m.manifest.machine.summary); - } + // @@ Note: doing it in terms of direct iterators in anticipation for + // lower_bound(). + // + auto cb (controllers.begin ()); + auto ce (controllers.end ()); - if (ops.dump_machines ()) + for (; cb != ce; ) { - for (const machine_header_manifest& m: tq.machines) - serialize_manifest (m, cout, "stdout", "machine"); + const pair<const uint64_t, strings>& pu (*--ce); - return 0; - } + prio = pu.first; + const strings& urls (pu.second); - if (tq.machines.empty ()) - { - // Normally this means all the machines are locked so sleep a bit less. + // Prepare task request (it will be the same within a given priority). // - sleep = rand_sleep () / 2; - continue; - } + tq = task_request_manifest { + hname, + tc_name, + tc_ver, + imode, + ilogin, + fingerprint, + machine_header_manifests {}}; + + // Note: do not assume tq.machines.size () == ms.size (). + // + for (const bootstrapped_machine& m: ms) + { + // @@ For now skip machines locked by other processes. + // + // @@ Note: skip machines being bootstrapped. + // + if (m.lock.locked ()) + tq.machines.emplace_back (m.manifest.machine.id, + m.manifest.machine.name, + m.manifest.machine.summary); + } - // Send task requests. - // - // Note that we have to do it while holding the lock on all the machines - // since we don't know which machine we will need. - // - string url; - task_response_manifest tr; + if (ops.dump_machines ()) + { + for (const machine_header_manifest& m: tq.machines) + serialize_manifest (m, cout, "stdout", "machine"); - if (ops.fake_request_specified ()) - { - auto t (parse_manifest<task_manifest> (ops.fake_request (), "task")); + return 0; + } - tr = task_response_manifest { - "fake-session", // Dummy session. - nullopt, // No challenge. - url, // Empty result URL. - agent_checksum, - move (t)}; + if (tq.machines.empty ()) + { + // If we have no machines for this priority then we won't have any + // for any lower priority so bail out. + // + break; + } - url = "http://example.org"; - } - else - { - // Note that after completing each task we always start from the - // beginning of the list. This fact can be used to implement a poor - // man's priority system where we will continue serving the first listed - // controller for as long as it has tasks (and maybe in the future we - // will implement a proper priority system). + // Send task requests. + // + // Note that we have to do it while holding the lock on all the machines + // since we don't know which machine we will need. + // + // @@ TODO: need to iterate in random order somehow. // - for (const string& u: controllers) + for (const string& u: urls) { + if (ops.fake_request_specified ()) + { + auto t (parse_manifest<task_manifest> (ops.fake_request (), "task")); + + tr = task_response_manifest { + "fake-session", // Dummy session. + nullopt, // No challenge. + string (), // Empty result URL. + agent_checksum, + move (t)}; + + url = u; + break; + } + task_response_manifest r; try @@ -1903,8 +1965,9 @@ try "--max-time", ops.request_timeout (), "--connect-timeout", ops.connect_timeout ()); - // This is tricky/hairy: we may fail hard parsing the output before - // seeing that curl exited with an error and failing softly. + // This is tricky/hairy: we may fail hard parsing the output + // before seeing that curl exited with an error and failing + // softly. // bool f (false); @@ -1949,8 +2012,8 @@ try { const task_manifest& t (*r.task); - // For security reasons let's require the repository location to be - // remote. + // For security reasons let's require the repository location to + // be remote. // if (t.repository.local ()) { @@ -1973,13 +2036,27 @@ try l2 ([&]{trace << "task for " << t.name << '/' << t.version << " " << "on " << t.machine << " " - << "from " << u;}); + << "from " << u << " " + << "priority " << prio;}); tr = move (r); url = u; break; } - } + } // url loop. + + if (!tr.session.empty ()) // Got a task. + break; + + } // prio loop. + + if (tq.machines.empty ()) // No machines. + { + // Normally this means all the machines are busy so sleep a bit less. + // + l2 ([&]{trace << "all machines are busy, sleeping";}); + sleep = rand_sleep () / 2; + continue; } if (tr.session.empty ()) // No task from any of the controllers. @@ -2006,7 +2083,7 @@ try { if (mh.name == m.manifest.machine.name) { - m.lock.write (tl, 1234 /* prio */); + m.lock.write (tl, prio); pm = &m; } else |