diff options
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r-- | bbot/agent/agent.cxx | 1248 |
1 files changed, 1248 insertions, 0 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx new file mode 100644 index 0000000..ff697db --- /dev/null +++ b/bbot/agent/agent.cxx @@ -0,0 +1,1248 @@ +// file : bbot/agent/agent.cxx -*- C++ -*- +// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd +// license : TBC; see accompanying LICENSE file + +#include <bbot/agent/agent.hxx> + +#include <pwd.h> // getpwuid() +#include <limits.h> // PATH_MAX +#include <signal.h> // signal() +#include <stdlib.h> // rand_r() +#include <unistd.h> // sleep(), realink(), getuid(), fsync() + +#include <net/if.h> // ifreq +#include <netinet/in.h> // sockaddr_in +#include <arpa/inet.h> // inet_ntop() +#include <sys/ioctl.h> +#include <sys/socket.h> + +#include <chrono> +#include <iostream> + +#include <libbutl/pager.hxx> +#include <libbutl/sha256.hxx> +#include <libbutl/openssl.hxx> +#include <libbutl/filesystem.hxx> // dir_iterator + +#include <libbbot/manifest.hxx> + +#include <bbot/types.hxx> +#include <bbot/utility.hxx> +#include <bbot/diagnostics.hxx> + +#include <bbot/bootstrap-manifest.hxx> + +#include <bbot/agent/tftp.hxx> +#include <bbot/agent/machine.hxx> +#include <bbot/agent/machine-manifest.hxx> + +using namespace std; +using namespace butl; +using namespace bbot; + +namespace bbot +{ + agent_options ops; + + const string bs_prot ("1"); + + string tc_name; + uint16_t tc_num; + standard_version tc_ver; + string tc_id; + + string hname; + uid_t uid; + string uname; +} + +static void +file_sync (const path& f) +{ + auto_fd fd (fdopen (f, fdopen_mode::in)); + if (fsync (fd.get ()) != 0) + throw_system_error (errno); +} + +// The btrfs tool likes to print informational messages, like "Created +// snapshot such and such". Luckily, it writes them to stdout while proper +// diagnostics to stderr. +// +template <typename... A> +inline void +run_btrfs (tracer& t, A&&... a) +{ + if (verb >= 4) + run_io (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...); + else + run_io (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...); +} + +template <typename... A> +inline butl::process_exit::code_type +btrfs_exit (tracer& t, A&&... a) +{ + return verb >= 4 + ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...) + : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...); +} + +// Bootstrap the machine. Return the bootstrapped machine manifest if +// successful and nullopt otherwise (in which case the machine directory +// should be cleaned and the machine ignored for now). +// +static optional<bootstrapped_machine_manifest> +bootstrap_machine (const dir_path& md, + const machine_manifest& mm, + optional<bootstrapped_machine_manifest> obmm) +{ + tracer trace ("bootstrap_machine", md.string ().c_str ()); + + bootstrapped_machine_manifest r { + mm, + toolchain_manifest {tc_id.empty () ? "bogus" : tc_id}, + bootstrap_manifest { + bootstrap_manifest::versions_type { + {"bbot", standard_version (BBOT_VERSION_STR)}, + {"libbbot", standard_version (LIBBBOT_VERSION_STR)}, + {"libbpkg", standard_version (LIBBPKG_VERSION_STR)}, + {"libbutl", standard_version (LIBBUTL_VERSION_STR)} + } + } + }; + + if (ops.fake_bootstrap ()) + { + r.machine.mac = "de:ad:be:ef:de:ad"; + } + else + try + { + string br ("br1"); // Using private bridge for now. + + // Start the TFTP server (server chroot is --tftp). Map: + // + // GET requests to .../toolchains/<name>/* + // PUT requests to .../bootstrap/<name>/* + // + auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name); + try_mkdir_p (arm.path ()); + + // Bootstrap result manifest. + // + path mf (arm.path () / "manifest"); + try_rmfile (mf); + + // Note that unlike build, here we use the same VM snapshot for retries, + // which is not ideal. + // + for (size_t retry (0);; ++retry) + { + tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + + "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n", + ops.tftp_port () + tc_num); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // Start the machine. + // + unique_ptr<machine> m ( + start_machine (md, + mm, + obmm ? obmm->machine.mac : nullopt, + br, + tftpd.port ())); + + { + // If we are terminating with an exception then force the machine down. + // Failed that, the machine's destructor will block waiting for its + // completion. + // + auto mg ( + make_exception_guard ( + [&m, &md] () + { + info << "trying to force machine " << md << " down"; + try {m->forcedown ();} catch (const failed&) {} + })); + + // What happens if the bootstrap process hangs? The simple thing would + // be to force the machine down after some timeout and then fail. But + // that won't be very helpful for investigating the cause. So instead + // the plan is to suspend it after some timeout, issue diagnostics + // (without failing and which Build OS monitor will relay to the + // admin), and wait for the external intervention. + // + auto soft_fail = [&md, &m] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << md << ", suspending"; + m->print_info (dr); + } + m->suspend (); + m->wait (); + info << "resuming after machine suspension"; + return nullopt; + }; + + // The first request should be the toolchain download. Wait for up to + // 5 minutes for that to arrive. In a sense we use it as an indication + // that the machine has booted and the bootstrap process has started. + // Why wait so long you may wonder? Well, we may be using a new MAC + // address and operating systems like Windows may need to digest that. + // + size_t to; + const size_t startup_to (5 * 60); + const size_t bootstrap_to (ops.bootstrap_timeout ()); + const size_t shutdown_to (5 * 60); + + // This can mean two things: machine mis-configuration or what we + // euphemistically call a "mis-boot": the VM failed to boot for some + // unknown/random reason. Mac OS is particularly know for suffering + // from this. So the strategy is to retry it a couple of times and + // then suspend for investigation. + // + if (!tftpd.serve ((to = startup_to))) + { + if (retry > ops.bootstrap_retries ()) + return soft_fail ("bootstrap startup timeout"); + + warn << "machine " << mm.name << " appears to have " + << "mis-booted, retrying"; + + try {m->forcedown (false);} catch (const failed&) {} + continue; + } + + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + + // Next the bootstrap process may download additional toolchain + // archives, build things, and then upload the result manifest. So on + // our side we serve TFTP requests while periodically checking for the + // manifest file. To workaround some obscure filesystem races (the + // file's mtime/size is updated several seconds later; maybe tmpfs + // issue?), we periodically re-check. + // + for (to = bootstrap_to; to != 0; tftpd.serve (to, 2)) + { + if (file_exists (mf)) + { + file_sync (mf); + if (!file_empty (mf)) + break; + } + } + + if (to == 0) + return soft_fail ("bootstrap timeout"); + + l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); + + // Shut the machine down cleanly. + // + if (!m->shutdown ((to = shutdown_to))) + return soft_fail ("bootstrap shutdown timeout"); + + l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); + } + + // Parse the result manifest. + // + r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap"); + + r.machine.mac = m->mac; // Save the MAC address. + + break; + } + } + catch (const system_error& e) + { + fail << "bootstrap error: " << e; + } + + serialize_manifest (r, md / "manifest", "bootstrapped machine"); + return r; +} + +// Return available machines and their directories as a parallel array. +// +static pair<bootstrapped_machine_manifests, dir_paths> +enumerate_machines (const dir_path& machines) +try +{ + tracer trace ("enumerate_machines", machines.string ().c_str ()); + + bootstrapped_machine_manifests rm; + dir_paths rd; + + if (ops.fake_machine_specified ()) + { + auto mh ( + parse_manifest<machine_header_manifest> ( + ops.fake_machine (), "machine header")); + + rm.push_back ( + bootstrapped_machine_manifest { + machine_manifest { + mh.id, + mh.name, + mh.summary, + machine_type::kvm, + string ("de:ad:be:ef:de:ad"), + nullopt}, + toolchain_manifest {tc_id}, + bootstrap_manifest {} + }); + + rd.push_back (dir_path (ops.machines ()) /= mh.name); // For diagnostics. + + return make_pair (move (rm), move (rd)); + } + + // The first level are machine volumes. + // + for (const dir_entry& ve: dir_iterator (machines)) + { + const string vn (ve.path ().string ()); + + // Ignore hidden directories. + // + if (ve.type () != entry_type::directory || vn[0] == '.') + continue; + + const dir_path vd (dir_path (machines) /= vn); + + // Inside we have machines. + // + try + { + for (const dir_entry& me: dir_iterator (vd)) + { + const string mn (me.path ().string ()); + + if (me.type () != entry_type::directory || mn[0] == '.') + continue; + + const dir_path md (dir_path (vd) /= mn); + + // Our endgoal here is to obtain a bootstrapped snapshot of this + // machine while watching out for potential race conditions (machines + // being added/upgraded/removed; see the manual for details). + // + // So here is our overall plan: + // + // 1. Resolve current subvolume link for our bootstrap protocol. + // + // 2. If there is no link, cleanup and ignore this machine. + // + // 3. Try to create a snapshot of current subvolume (this operation is + // atomic). If failed (e.g., someone changed the link and removed + // the subvolume in the meantime), retry from #1. + // + // 4. Compare the snapshot to the already bootstrapped version (if + // any) and see if we need to re-bootstrap. If so, use the snapshot + // as a starting point. Rename to bootstrapped at the end (atomic). + // + dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> + dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain> + bool te (dir_exists (tp)); + + auto delete_t = [&tp, &trace] () + { + run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); + run_btrfs (trace, "subvolume", "delete", tp); + }; + + for (size_t retry (0);; ++retry) + { + if (retry != 0) + sleep (1); + + // Resolve the link to subvolume path. + // + dir_path sp; // <name>-<P>.<R> + try + { + char b [PATH_MAX + 1]; + ssize_t r (readlink (lp.string ().c_str (), b, sizeof (b))); + + if (r == -1) + { + if (errno != ENOENT) + throw_generic_error (errno); + } + else if (static_cast<size_t> (r) >= sizeof (b)) + throw_generic_error (EINVAL); + else + { + b[r] = '\0'; + sp = dir_path (b); + if (sp.relative ()) + sp = md / sp; + } + } + catch (const system_error& e) + { + fail << "unable to read subvolume link " << lp << ": " << e; + } + + // If the resolution fails, then this means there is no current + // machine subvolume (for this bootstrap protocol). In this case we + // clean up our toolchain subvolume (<name>-<toolchain>) and ignore + // this machine. + // + if (sp.empty ()) + { + if (te) + delete_t (); + + l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); + break; + } + + // <name>-<toolchain>-<xxx> + // + const dir_path xp ( + dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); + + if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) + { + if (retry >= 10) + fail << "unable to snapshot subvolume " << sp; + + continue; + } + + // Load the (original) machine manifest. + // + auto mm ( + parse_manifest<machine_manifest> (sp / "manifest", "machine")); + + // If we already have <name>-<toolchain>, see if it needs to be re- + // bootstrapped. Things that render it obsolete: + // + // 1. New machine revision (compare machine ids). + // 2. New toolchain (compare toolchain ids). + // 3. New bbot/libbbot (compare versions). + // + // The last case has a complication: what should we do if we have + // bootstrapped a newer version of bbot? This would mean that we are + // about to be stopped and upgraded (and the upgraded version will + // probably be able to use the result). So we simply ignore this + // machine for this run. + + // Return -1 if older, 0 if the same, and +1 if newer. + // + auto compare_bbot = [] (const bootstrap_manifest& m) -> int + { + auto cmp = [&m] (const string& n, const char* v) -> int + { + standard_version sv (v); + auto i = m.versions.find (n); + + return (i == m.versions.end () || i->second < sv + ? -1 + : i->second > sv ? 1 : 0); + }; + + // Start from the top assuming a new dependency cannot be added + // without changing the dependent's version. + // + int r; + return + (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : + (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0; + }; + + optional<bootstrapped_machine_manifest> bmm; + if (te) + { + bmm = parse_manifest<bootstrapped_machine_manifest> ( + tp / "manifest", "bootstrapped machine"); + + if (bmm->machine.id != mm.id) + { + l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); + te = false; + } + + if (!tc_id.empty () && bmm->toolchain.id != tc_id) + { + l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); + te = false; + } + + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i < 0) + { + l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); + te = false; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + } + + if (!te) + delete_t (); + } + else + l3 ([&]{trace << "bootstrapping " << tp;}); + + if (!te) + { + // Use the <name>-<toolchain>-<xxx> snapshot that we have made to + // bootstrap the new machine. Then atomically rename it to + // <name>-<toolchain>. + // + bmm = bootstrap_machine (xp, mm, move (bmm)); + + if (!bmm) + { + l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + + try + { + mvdir (xp, tp); + } + catch (const system_error& e) + { + fail << "unable to rename " << xp << " to " << tp; + } + + l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + + // Check the bootstrapped bbot version as above and ignore this + // machine if it's newer than us. + // + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i > 0) + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + break; + } + else + warn << "bootstrapped " << tp << " bbot worker is older " + << "than agent; assuming test setup"; + } + } + else + run_btrfs (trace, "subvolume", "delete", xp); + + // Add the machine to the lists. + // + rm.push_back (move (*bmm)); + rd.push_back (move (tp)); + + break; + } + } + } + catch (const system_error& e) + { + fail << "unable to iterate over " << vd << ": " << e << endf; + } + } + + return make_pair (move (rm), move (rd)); +} +catch (const system_error& e) +{ + fail << "unable to iterate over " << machines << ": " << e << endf; +} + +static result_manifest +perform_task (const dir_path& md, + const bootstrapped_machine_manifest& mm, + const task_manifest& tm) +try +{ + tracer trace ("perform_task", md.string ().c_str ()); + + result_manifest r { + tm.name, + tm.version, + result_status::abort, + operation_results {}}; + + if (ops.fake_build ()) + return r; + + // The overall plan is as follows: + // + // 1. Snapshot the (bootstrapped) machine. + // + // 2. Save the task manifest to the TFTP directory (to be accessed by the + // worker). + // + // 3. Start the TFTP server and the machine. + // + // 4. Serve TFTP requests while watching out for the result manifest. + // + // 5. Clean up (force the machine down and delete the snapshot). + // + + // TFTP server mapping (server chroot is --tftp): + // + // GET requests to .../build/<name>/get/* + // PUT requests to .../build/<name>/put/* + // + auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name); + + dir_path gd (dir_path (arm.path ()) /= "get"); + dir_path pd (dir_path (arm.path ()) /= "put"); + + try_mkdir_p (gd); + try_mkdir_p (pd); + + path tf (gd / "manifest"); // Task manifest file. + path rf (pd / "manifest"); // Result manifest file. + + serialize_manifest (tm, tf, "task"); + + if (ops.fake_machine_specified ()) + { + // Simply wait for the file to appear. + // + for (size_t i (0); !file_exists (rf); sleep (1)) + if (i++ % 10 == 0) + l3 ([&]{trace << "waiting for result manifest";}); + + r = parse_manifest<result_manifest> (rf, "result"); + } + else + { + try_rmfile (rf); + + // <name>-<toolchain>-<xxx> + // + const dir_path xp ( + md.directory () /= path::traits::temp_name (md.leaf ().string ())); + + string br ("br1"); // Using private bridge for now. + + for (size_t retry (0);; ++retry) + { + if (retry != 0) + run_btrfs (trace, "subvolume", "delete", xp); + + run_btrfs (trace, "subvolume", "snapshot", md, xp); + + // Start the TFTP server. + // + tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" + + "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n", + ops.tftp_port () + tc_num); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // Start the machine. + // + unique_ptr<machine> m ( + start_machine (xp, + mm.machine, + mm.machine.mac, + br, + tftpd.port ())); + + // Note: the machine handling logic is similar to bootstrap. + // + { + auto mg ( + make_exception_guard ( + [&m, &xp] () + { + info << "trying to force machine " << xp << " down"; + try {m->forcedown ();} catch (const failed&) {} + })); + + auto soft_fail = [&xp, &m, &r] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << xp << ", suspending"; + m->print_info (dr); + } + m->suspend (); + m->wait (); + info << "resuming after machine suspension"; + return r; + }; + + // The first request should be the task manifest download. Wait for up + // to 60 seconds for that to arrive. In a sense we use it as an + // indication that the machine has booted and the worker process has + // started. + // + size_t to; + const size_t startup_to (60); + const size_t build_to (ops.build_timeout ()); + + if (!tftpd.serve ((to = startup_to))) + { + if (retry > ops.build_retries ()) + return soft_fail ("build startup timeout"); + + warn << "machine " << mm.machine.name << " appears to have " + << "mis-booted, retrying"; + + try {m->forcedown (false);} catch (const failed&) {} + continue; + } + + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + + // Next the worker builds things and then uploads the result manifest. + // So on our side we serve TFTP requests while checking for the + // manifest file. To workaround some obscure filesystem races (the + // file's mtime/size is updated several seconds later; maybe tmpfs + // issue?), we periodically re-check. + // + for (to = build_to; to != 0; tftpd.serve (to, 2)) + { + if (file_exists (rf)) + { + file_sync (rf); + if (!file_empty (rf)) + break; + } + } + + if (to == 0) + return soft_fail ("build timeout"); + + l3 ([&]{trace << "completed build in " << build_to - to << "s";}); + + // Parse the result manifest. + // + try + { + r = parse_manifest<result_manifest> (rf, "result", false); + } + catch (const failed&) + { + r.status = result_status::abnormal; // Soft-fail below. + } + + if (r.status == result_status::abnormal) + { + // If the build terminated abnormally, suspend the machine for + // investigation. + // + return soft_fail ("build terminated abnormally"); + } + else + { + // Force the machine down (there is no need wasting time on clean + // shutdown since the next step is to drop the snapshot). Also fail + // softly if things go badly. + // + try {m->forcedown (false);} catch (const failed&) {} + } + } + + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + } + + // Update package name/version if the returned value as "unknown". + // + if (r.version == bpkg::version ("0")) + { + assert (r.status == result_status::abnormal); + + r.name = tm.name; + r.version = tm.version; + } + + return r; +} +catch (const system_error& e) +{ + fail << "build error: " << e << endf; +} + +extern "C" void +handle_signal (int sig) +{ + switch (sig) + { + case SIGHUP: exit (3); // Unimplemented feature. + case SIGTERM: exit (0); + default: assert (false); + } +} + +int +main (int argc, char* argv[]) +try +{ + cli::argv_scanner scan (argc, argv, true); + ops.parse (scan); + + verb = ops.verbose (); + + if (ops.systemd_daemon ()) + systemd_diagnostics (true); // With critical errors. + + tracer trace ("main"); + + uid = getuid (); + uname = getpwuid (uid)->pw_name; + + { + char buf[HOST_NAME_MAX + 1]; + + if (gethostname (buf, sizeof (buf)) == -1) + fail << "unable to obtain hostname: " + << system_error (errno, generic_category ()); // Sanitize. + + hname = buf; + } + + // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if + // the pipe reading end is closed. Note that by default this signal + // terminates a process. Also note that there is no way to disable this + // behavior on a file descriptor basis or for the write() function call. + // + if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) + fail << "unable to ignore broken pipe (SIGPIPE) signal: " + << system_error (errno, generic_category ()); // Sanitize. + + // Version. + // + if (ops.version ()) + { + cout << "bbot-agent " << BBOT_VERSION_ID << endl + << "libbbot " << LIBBBOT_VERSION_ID << endl + << "libbpkg " << LIBBBOT_VERSION_ID << endl + << "libbutl " << LIBBUTL_VERSION_ID << endl + << "Copyright (c) 2014-2017 Code Synthesis Ltd" << endl + << "TBC; All rights reserved" << endl; + + return 0; + } + + // Help. + // + if (ops.help ()) + { + pager p ("bbot-agent help", false); + print_bbot_agent_usage (p.stream ()); + + // If the pager failed, assume it has issued some diagnostics. + // + return p.wait () ? 0 : 1; + } + + tc_name = ops.toolchain_name (); + tc_num = ops.toolchain_num (); + tc_ver = (ops.toolchain_ver_specified () + ? ops.toolchain_ver () + : standard_version (BBOT_VERSION_STR)); + tc_id = ops.toolchain_id (); + + + // Controller URLs. + // + if (argc < 2 && + !ops.dump_machines () && + !ops.fake_request_specified ()) + { + fail << "controller url expected" << + info << "run " << argv[0] << " --help for details"; + } + + strings controllers; + + for (int i (1); i != argc; ++i) + controllers.push_back (argv[i]); + + // Handle SIGHUP and SIGTERM. + // + if (signal (SIGHUP, &handle_signal) == SIG_ERR || + signal (SIGTERM, &handle_signal) == SIG_ERR) + fail << "unable to set signal handler: " + << system_error (errno, generic_category ()); // Sanitize. + + optional<string> fingerprint; + + if (ops.auth_key_specified ()) + try + { + // Note that the process always prints to STDERR, so we redirect it to the + // null device. We also check for the key file existence to print more + // meaningful error message if that's not the case. + // + if (!file_exists (ops.auth_key ())) + throw_generic_error (ENOENT); + + openssl os (trace, + ops.auth_key (), path ("-"), fdnull (), + ops.openssl (), "rsa", + ops.openssl_option (), "-pubout", "-outform", "DER"); + + vector<char> k (os.in.read_binary ()); + os.in.close (); + + if (!os.wait ()) + throw_generic_error (EIO); + + fingerprint = sha256 (k.data (), k.size ()).string (); + } + catch (const system_error& e) + { + fail << "unable to obtain authentication public key: " << e; + } + + if (ops.systemd_daemon ()) + { + diag_record dr; + + dr << info << "bbot agent " << BBOT_VERSION_ID; + + if (fingerprint) + dr << info << "auth key fp " << *fingerprint; + + dr << + info << "toolchain name " << tc_name << + info << "toolchain num " << tc_num << + info << "toolchain ver " << tc_ver.string () << + info << "toolchain id " << tc_id << + info << "CPU(s) " << ops.cpu () << + info << "RAM(kB) " << ops.ram (); + + for (const string& u: controllers) + dr << info << "controller url " << u; + } + + // The work loop. The steps we go through are: + // + // 1. Enumerate the available machines, (re-)bootstrapping any if necessary. + // + // 2. Poll controller(s) for build tasks. + // + // 3. If no build tasks are available, go to #1 (after sleeping a bit). + // + // 4. If a build task is returned, do it, upload the result, and go to #1 + // (immediately). + // + for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false) + { + // Enumerate the machines. + // + auto mp (enumerate_machines (ops.machines ())); + bootstrapped_machine_manifests& ms (mp.first); + dir_paths& ds (mp.second); + + // Prepare task request. + // + task_request_manifest tq { + hname, + tc_name, + tc_ver, + fingerprint, + machine_header_manifests {} + }; + + for (const bootstrapped_machine_manifest& m: ms) + tq.machines.emplace_back (m.machine.id, + m.machine.name, + m.machine.summary); + + if (ops.dump_machines ()) + { + for (const machine_header_manifest& m: tq.machines) + serialize_manifest (m, cout, "stdout", "machine"); + + return 0; + } + + if (tq.machines.empty ()) + { + warn << "no build machines for toolchain " << tc_name; + sleep = true; + continue; + } + + // Send task requests. + // + // + string url; + task_response_manifest tr; + + if (ops.fake_request_specified ()) + { + auto t (parse_manifest<task_manifest> (ops.fake_request (), "task")); + + tr = task_response_manifest { + "fake-session", // Dummy session. + nullopt, // No challenge. + url, // Empty result URL. + move (t)}; + + url = "http://example.org"; + } + else + { + for (const string& u: controllers) + { + try + { + http_curl c (trace, + path ("-"), + path ("-"), + curl::post, + u, + "--header", "Content-Type: text/manifest", + "--max-time", ops.request_timeout ()); + + // This is tricky/hairy: we may fail hard parsing the output before + // seeing that curl exited with an error and failing softly. + // + bool f (false); + + try + { + serialize_manifest (tq, c.out, u, "task request", false); + } + catch (const failed&) {f = true;} + + c.out.close (); + + if (!f) + try + { + tr = parse_manifest<task_response_manifest> ( + c.in, u, "task response", false); + } + catch (const failed&) {f = true;} + + c.in.close (); + + if (!c.wait () || f) + throw_generic_error (EIO); + } + catch (const system_error& e) + { + error << "unable to request task from " << u << ": " << e; + continue; + } + + if (tr.challenge && !fingerprint) // Controller misbehaves. + { + error << "unexpected challenge from " << u << ": " << *tr.challenge; + continue; + } + + if (!tr.session.empty ()) // Got a task. + { + url = u; + + task_manifest& t (*tr.task); + l2 ([&]{trace << "task for " << t.name << '/' << t.version << " " + << "on " << t.machine << " " + << "from " << url;}); + break; + } + } + } + + if (tr.session.empty ()) // No task from any of the controllers. + { + l2 ([&]{trace << "no tasks from any controllers, sleeping";}); + sleep = true; + continue; + } + + // We have a build task. + // + // First find the index of the machine we were asked to use (and also + // verify it is one of those we sent). + // + size_t i (0); + for (const machine_header_manifest& m: tq.machines) + { + if (m.name == tr.task->machine) + break; + + ++i; + } + + if (i == ms.size ()) + { + error << "task from " << url << " for unknown machine " + << tr.task->machine; + + if (ops.dump_task ()) + return 0; + + continue; + } + + task_manifest& t (*tr.task); + + if (ops.dump_task ()) + { + serialize_manifest (t, cout, "stdout", "task"); + return 0; + } + + // If we have our own repository certificate fingerprints, then use them + // to replace what we have received from the controller. + // + if (!ops.trust ().empty ()) + t.trust = ops.trust (); + + const dir_path& d (ds[i]); // The -<toolchain> directory. + const bootstrapped_machine_manifest& m (ms[i]); + + result_manifest r (perform_task (d, m, t)); + + if (ops.dump_result ()) + { + serialize_manifest (r, cout, "stdout", "result"); + return 0; + } + + // Prepare answer to the private key challenge. + // + optional<vector<char>> challenge; + + if (tr.challenge) + try + { + assert (ops.auth_key_specified ()); + + openssl os (trace, + fdstream_mode::text, path ("-"), 2, + ops.openssl (), "rsautl", + ops.openssl_option (), "-sign", "-inkey", ops.auth_key ()); + + os.out << *tr.challenge; + os.out.close (); + + challenge = os.in.read_binary (); + os.in.close (); + + if (!os.wait ()) + throw_generic_error (EIO); + } + catch (const system_error& e) + { + // The task response challenge is valid (verified by manifest parser), + // so there is something wrong with setup, and so the failure is fatal. + // + fail << "unable to sign task response challenge: " << e; + } + + // Upload the result. + // + result_request_manifest rq {tr.session, move (challenge), move (r)}; + { + const string& u (*tr.result_url); + + try + { + http_curl c (trace, + path ("-"), + nullfd, // Not expecting any data in response. + curl::post, + u, + "--header", "Content-Type: text/manifest", + "--max-time", ops.request_timeout ()); + + // This is tricky/hairy: we may fail hard writing the input before + // seeing that curl exited with an error and failing softly. + // + bool f (false); + + try + { + serialize_manifest (rq, c.out, u, "task request"); + } + catch (const failed&) {f = true;} + + c.out.close (); + + if (!c.wait () || f) + throw_generic_error (EIO); + } + catch (const system_error& e) + { + error << "unable to upload result to " << u << ": " << e; + continue; + } + } + + l2 ([&]{trace << "built " << t.name << '/' << t.version << " " + << "on " << t.machine << " " + << "for " << url;}); + } +} +catch (const failed&) +{ + return 1; // Diagnostics has already been issued. +} +catch (const cli::exception& e) +{ + error << e; + return 1; +} + +namespace bbot +{ + static unsigned int rand_seed; // Seed for rand_r(); + + size_t + genrand () + { + if (rand_seed == 0) + rand_seed = static_cast<unsigned int> ( + chrono::system_clock::now ().time_since_epoch ().count ()); + + return static_cast<size_t> (rand_r (&rand_seed)); + } + + // Note: Linux-specific implementation. + // + string + iface_addr (const string& i) + { + if (i.size () >= IFNAMSIZ) + throw invalid_argument ("interface nama too long"); + + auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)); + + if (fd.get () == -1) + throw_system_error (errno); + + ifreq ifr; + ifr.ifr_addr.sa_family = AF_INET; + strcpy (ifr.ifr_name, i.c_str ()); + + if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1) + throw_system_error (errno); + + char buf[3 * 4 + 3 + 1]; // IPv4 address. + if (inet_ntop (AF_INET, + &reinterpret_cast<sockaddr_in*> (&ifr.ifr_addr)->sin_addr, + buf, + sizeof (buf)) == nullptr) + throw_system_error (errno); + + return buf; + } +} |