aboutsummaryrefslogtreecommitdiff
path: root/bbot/agent/agent.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r--bbot/agent/agent.cxx1248
1 files changed, 1248 insertions, 0 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
new file mode 100644
index 0000000..ff697db
--- /dev/null
+++ b/bbot/agent/agent.cxx
@@ -0,0 +1,1248 @@
+// file : bbot/agent/agent.cxx -*- C++ -*-
+// copyright : Copyright (c) 2014-2017 Code Synthesis Ltd
+// license : TBC; see accompanying LICENSE file
+
+#include <bbot/agent/agent.hxx>
+
+#include <pwd.h> // getpwuid()
+#include <limits.h> // PATH_MAX
+#include <signal.h> // signal()
+#include <stdlib.h> // rand_r()
+#include <unistd.h> // sleep(), realink(), getuid(), fsync()
+
+#include <net/if.h> // ifreq
+#include <netinet/in.h> // sockaddr_in
+#include <arpa/inet.h> // inet_ntop()
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+
+#include <chrono>
+#include <iostream>
+
+#include <libbutl/pager.hxx>
+#include <libbutl/sha256.hxx>
+#include <libbutl/openssl.hxx>
+#include <libbutl/filesystem.hxx> // dir_iterator
+
+#include <libbbot/manifest.hxx>
+
+#include <bbot/types.hxx>
+#include <bbot/utility.hxx>
+#include <bbot/diagnostics.hxx>
+
+#include <bbot/bootstrap-manifest.hxx>
+
+#include <bbot/agent/tftp.hxx>
+#include <bbot/agent/machine.hxx>
+#include <bbot/agent/machine-manifest.hxx>
+
+using namespace std;
+using namespace butl;
+using namespace bbot;
+
+namespace bbot
+{
+ agent_options ops;
+
+ const string bs_prot ("1");
+
+ string tc_name;
+ uint16_t tc_num;
+ standard_version tc_ver;
+ string tc_id;
+
+ string hname;
+ uid_t uid;
+ string uname;
+}
+
+static void
+file_sync (const path& f)
+{
+ auto_fd fd (fdopen (f, fdopen_mode::in));
+ if (fsync (fd.get ()) != 0)
+ throw_system_error (errno);
+}
+
+// The btrfs tool likes to print informational messages, like "Created
+// snapshot such and such". Luckily, it writes them to stdout while proper
+// diagnostics to stderr.
+//
+template <typename... A>
+inline void
+run_btrfs (tracer& t, A&&... a)
+{
+ if (verb >= 4)
+ run_io (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...);
+ else
+ run_io (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...);
+}
+
+template <typename... A>
+inline butl::process_exit::code_type
+btrfs_exit (tracer& t, A&&... a)
+{
+ return verb >= 4
+ ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...)
+ : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...);
+}
+
+// Bootstrap the machine. Return the bootstrapped machine manifest if
+// successful and nullopt otherwise (in which case the machine directory
+// should be cleaned and the machine ignored for now).
+//
+static optional<bootstrapped_machine_manifest>
+bootstrap_machine (const dir_path& md,
+ const machine_manifest& mm,
+ optional<bootstrapped_machine_manifest> obmm)
+{
+ tracer trace ("bootstrap_machine", md.string ().c_str ());
+
+ bootstrapped_machine_manifest r {
+ mm,
+ toolchain_manifest {tc_id.empty () ? "bogus" : tc_id},
+ bootstrap_manifest {
+ bootstrap_manifest::versions_type {
+ {"bbot", standard_version (BBOT_VERSION_STR)},
+ {"libbbot", standard_version (LIBBBOT_VERSION_STR)},
+ {"libbpkg", standard_version (LIBBPKG_VERSION_STR)},
+ {"libbutl", standard_version (LIBBUTL_VERSION_STR)}
+ }
+ }
+ };
+
+ if (ops.fake_bootstrap ())
+ {
+ r.machine.mac = "de:ad:be:ef:de:ad";
+ }
+ else
+ try
+ {
+ string br ("br1"); // Using private bridge for now.
+
+ // Start the TFTP server (server chroot is --tftp). Map:
+ //
+ // GET requests to .../toolchains/<name>/*
+ // PUT requests to .../bootstrap/<name>/*
+ //
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name);
+ try_mkdir_p (arm.path ());
+
+ // Bootstrap result manifest.
+ //
+ path mf (arm.path () / "manifest");
+ try_rmfile (mf);
+
+ // Note that unlike build, here we use the same VM snapshot for retries,
+ // which is not ideal.
+ //
+ for (size_t retry (0);; ++retry)
+ {
+ tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" +
+ "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n",
+ ops.tftp_port () + tc_num);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (md,
+ mm,
+ obmm ? obmm->machine.mac : nullopt,
+ br,
+ tftpd.port ()));
+
+ {
+ // If we are terminating with an exception then force the machine down.
+ // Failed that, the machine's destructor will block waiting for its
+ // completion.
+ //
+ auto mg (
+ make_exception_guard (
+ [&m, &md] ()
+ {
+ info << "trying to force machine " << md << " down";
+ try {m->forcedown ();} catch (const failed&) {}
+ }));
+
+ // What happens if the bootstrap process hangs? The simple thing would
+ // be to force the machine down after some timeout and then fail. But
+ // that won't be very helpful for investigating the cause. So instead
+ // the plan is to suspend it after some timeout, issue diagnostics
+ // (without failing and which Build OS monitor will relay to the
+ // admin), and wait for the external intervention.
+ //
+ auto soft_fail = [&md, &m] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << md << ", suspending";
+ m->print_info (dr);
+ }
+ m->suspend ();
+ m->wait ();
+ info << "resuming after machine suspension";
+ return nullopt;
+ };
+
+ // The first request should be the toolchain download. Wait for up to
+ // 5 minutes for that to arrive. In a sense we use it as an indication
+ // that the machine has booted and the bootstrap process has started.
+ // Why wait so long you may wonder? Well, we may be using a new MAC
+ // address and operating systems like Windows may need to digest that.
+ //
+ size_t to;
+ const size_t startup_to (5 * 60);
+ const size_t bootstrap_to (ops.bootstrap_timeout ());
+ const size_t shutdown_to (5 * 60);
+
+ // This can mean two things: machine mis-configuration or what we
+ // euphemistically call a "mis-boot": the VM failed to boot for some
+ // unknown/random reason. Mac OS is particularly know for suffering
+ // from this. So the strategy is to retry it a couple of times and
+ // then suspend for investigation.
+ //
+ if (!tftpd.serve ((to = startup_to)))
+ {
+ if (retry > ops.bootstrap_retries ())
+ return soft_fail ("bootstrap startup timeout");
+
+ warn << "machine " << mm.name << " appears to have "
+ << "mis-booted, retrying";
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ continue;
+ }
+
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+
+ // Next the bootstrap process may download additional toolchain
+ // archives, build things, and then upload the result manifest. So on
+ // our side we serve TFTP requests while periodically checking for the
+ // manifest file. To workaround some obscure filesystem races (the
+ // file's mtime/size is updated several seconds later; maybe tmpfs
+ // issue?), we periodically re-check.
+ //
+ for (to = bootstrap_to; to != 0; tftpd.serve (to, 2))
+ {
+ if (file_exists (mf))
+ {
+ file_sync (mf);
+ if (!file_empty (mf))
+ break;
+ }
+ }
+
+ if (to == 0)
+ return soft_fail ("bootstrap timeout");
+
+ l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";});
+
+ // Shut the machine down cleanly.
+ //
+ if (!m->shutdown ((to = shutdown_to)))
+ return soft_fail ("bootstrap shutdown timeout");
+
+ l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";});
+ }
+
+ // Parse the result manifest.
+ //
+ r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap");
+
+ r.machine.mac = m->mac; // Save the MAC address.
+
+ break;
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "bootstrap error: " << e;
+ }
+
+ serialize_manifest (r, md / "manifest", "bootstrapped machine");
+ return r;
+}
+
+// Return available machines and their directories as a parallel array.
+//
+static pair<bootstrapped_machine_manifests, dir_paths>
+enumerate_machines (const dir_path& machines)
+try
+{
+ tracer trace ("enumerate_machines", machines.string ().c_str ());
+
+ bootstrapped_machine_manifests rm;
+ dir_paths rd;
+
+ if (ops.fake_machine_specified ())
+ {
+ auto mh (
+ parse_manifest<machine_header_manifest> (
+ ops.fake_machine (), "machine header"));
+
+ rm.push_back (
+ bootstrapped_machine_manifest {
+ machine_manifest {
+ mh.id,
+ mh.name,
+ mh.summary,
+ machine_type::kvm,
+ string ("de:ad:be:ef:de:ad"),
+ nullopt},
+ toolchain_manifest {tc_id},
+ bootstrap_manifest {}
+ });
+
+ rd.push_back (dir_path (ops.machines ()) /= mh.name); // For diagnostics.
+
+ return make_pair (move (rm), move (rd));
+ }
+
+ // The first level are machine volumes.
+ //
+ for (const dir_entry& ve: dir_iterator (machines))
+ {
+ const string vn (ve.path ().string ());
+
+ // Ignore hidden directories.
+ //
+ if (ve.type () != entry_type::directory || vn[0] == '.')
+ continue;
+
+ const dir_path vd (dir_path (machines) /= vn);
+
+ // Inside we have machines.
+ //
+ try
+ {
+ for (const dir_entry& me: dir_iterator (vd))
+ {
+ const string mn (me.path ().string ());
+
+ if (me.type () != entry_type::directory || mn[0] == '.')
+ continue;
+
+ const dir_path md (dir_path (vd) /= mn);
+
+ // Our endgoal here is to obtain a bootstrapped snapshot of this
+ // machine while watching out for potential race conditions (machines
+ // being added/upgraded/removed; see the manual for details).
+ //
+ // So here is our overall plan:
+ //
+ // 1. Resolve current subvolume link for our bootstrap protocol.
+ //
+ // 2. If there is no link, cleanup and ignore this machine.
+ //
+ // 3. Try to create a snapshot of current subvolume (this operation is
+ // atomic). If failed (e.g., someone changed the link and removed
+ // the subvolume in the meantime), retry from #1.
+ //
+ // 4. Compare the snapshot to the already bootstrapped version (if
+ // any) and see if we need to re-bootstrap. If so, use the snapshot
+ // as a starting point. Rename to bootstrapped at the end (atomic).
+ //
+ dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
+ dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain>
+ bool te (dir_exists (tp));
+
+ auto delete_t = [&tp, &trace] ()
+ {
+ run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
+ run_btrfs (trace, "subvolume", "delete", tp);
+ };
+
+ for (size_t retry (0);; ++retry)
+ {
+ if (retry != 0)
+ sleep (1);
+
+ // Resolve the link to subvolume path.
+ //
+ dir_path sp; // <name>-<P>.<R>
+ try
+ {
+ char b [PATH_MAX + 1];
+ ssize_t r (readlink (lp.string ().c_str (), b, sizeof (b)));
+
+ if (r == -1)
+ {
+ if (errno != ENOENT)
+ throw_generic_error (errno);
+ }
+ else if (static_cast<size_t> (r) >= sizeof (b))
+ throw_generic_error (EINVAL);
+ else
+ {
+ b[r] = '\0';
+ sp = dir_path (b);
+ if (sp.relative ())
+ sp = md / sp;
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to read subvolume link " << lp << ": " << e;
+ }
+
+ // If the resolution fails, then this means there is no current
+ // machine subvolume (for this bootstrap protocol). In this case we
+ // clean up our toolchain subvolume (<name>-<toolchain>) and ignore
+ // this machine.
+ //
+ if (sp.empty ())
+ {
+ if (te)
+ delete_t ();
+
+ l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
+ break;
+ }
+
+ // <name>-<toolchain>-<xxx>
+ //
+ const dir_path xp (
+ dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name));
+
+ if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0)
+ {
+ if (retry >= 10)
+ fail << "unable to snapshot subvolume " << sp;
+
+ continue;
+ }
+
+ // Load the (original) machine manifest.
+ //
+ auto mm (
+ parse_manifest<machine_manifest> (sp / "manifest", "machine"));
+
+ // If we already have <name>-<toolchain>, see if it needs to be re-
+ // bootstrapped. Things that render it obsolete:
+ //
+ // 1. New machine revision (compare machine ids).
+ // 2. New toolchain (compare toolchain ids).
+ // 3. New bbot/libbbot (compare versions).
+ //
+ // The last case has a complication: what should we do if we have
+ // bootstrapped a newer version of bbot? This would mean that we are
+ // about to be stopped and upgraded (and the upgraded version will
+ // probably be able to use the result). So we simply ignore this
+ // machine for this run.
+
+ // Return -1 if older, 0 if the same, and +1 if newer.
+ //
+ auto compare_bbot = [] (const bootstrap_manifest& m) -> int
+ {
+ auto cmp = [&m] (const string& n, const char* v) -> int
+ {
+ standard_version sv (v);
+ auto i = m.versions.find (n);
+
+ return (i == m.versions.end () || i->second < sv
+ ? -1
+ : i->second > sv ? 1 : 0);
+ };
+
+ // Start from the top assuming a new dependency cannot be added
+ // without changing the dependent's version.
+ //
+ int r;
+ return
+ (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0;
+ };
+
+ optional<bootstrapped_machine_manifest> bmm;
+ if (te)
+ {
+ bmm = parse_manifest<bootstrapped_machine_manifest> (
+ tp / "manifest", "bootstrapped machine");
+
+ if (bmm->machine.id != mm.id)
+ {
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";});
+ te = false;
+ }
+
+ if (!tc_id.empty () && bmm->toolchain.id != tc_id)
+ {
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";});
+ te = false;
+ }
+
+ if (int i = compare_bbot (bmm->bootstrap))
+ {
+ if (i < 0)
+ {
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";});
+ te = false;
+ }
+ else
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+ }
+
+ if (!te)
+ delete_t ();
+ }
+ else
+ l3 ([&]{trace << "bootstrapping " << tp;});
+
+ if (!te)
+ {
+ // Use the <name>-<toolchain>-<xxx> snapshot that we have made to
+ // bootstrap the new machine. Then atomically rename it to
+ // <name>-<toolchain>.
+ //
+ bmm = bootstrap_machine (xp, mm, move (bmm));
+
+ if (!bmm)
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+
+ try
+ {
+ mvdir (xp, tp);
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to rename " << xp << " to " << tp;
+ }
+
+ l2 ([&]{trace << "bootstrapped " << bmm->machine.name;});
+
+ // Check the bootstrapped bbot version as above and ignore this
+ // machine if it's newer than us.
+ //
+ if (int i = compare_bbot (bmm->bootstrap))
+ {
+ if (i > 0)
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ break;
+ }
+ else
+ warn << "bootstrapped " << tp << " bbot worker is older "
+ << "than agent; assuming test setup";
+ }
+ }
+ else
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ // Add the machine to the lists.
+ //
+ rm.push_back (move (*bmm));
+ rd.push_back (move (tp));
+
+ break;
+ }
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to iterate over " << vd << ": " << e << endf;
+ }
+ }
+
+ return make_pair (move (rm), move (rd));
+}
+catch (const system_error& e)
+{
+ fail << "unable to iterate over " << machines << ": " << e << endf;
+}
+
+static result_manifest
+perform_task (const dir_path& md,
+ const bootstrapped_machine_manifest& mm,
+ const task_manifest& tm)
+try
+{
+ tracer trace ("perform_task", md.string ().c_str ());
+
+ result_manifest r {
+ tm.name,
+ tm.version,
+ result_status::abort,
+ operation_results {}};
+
+ if (ops.fake_build ())
+ return r;
+
+ // The overall plan is as follows:
+ //
+ // 1. Snapshot the (bootstrapped) machine.
+ //
+ // 2. Save the task manifest to the TFTP directory (to be accessed by the
+ // worker).
+ //
+ // 3. Start the TFTP server and the machine.
+ //
+ // 4. Serve TFTP requests while watching out for the result manifest.
+ //
+ // 5. Clean up (force the machine down and delete the snapshot).
+ //
+
+ // TFTP server mapping (server chroot is --tftp):
+ //
+ // GET requests to .../build/<name>/get/*
+ // PUT requests to .../build/<name>/put/*
+ //
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name);
+
+ dir_path gd (dir_path (arm.path ()) /= "get");
+ dir_path pd (dir_path (arm.path ()) /= "put");
+
+ try_mkdir_p (gd);
+ try_mkdir_p (pd);
+
+ path tf (gd / "manifest"); // Task manifest file.
+ path rf (pd / "manifest"); // Result manifest file.
+
+ serialize_manifest (tm, tf, "task");
+
+ if (ops.fake_machine_specified ())
+ {
+ // Simply wait for the file to appear.
+ //
+ for (size_t i (0); !file_exists (rf); sleep (1))
+ if (i++ % 10 == 0)
+ l3 ([&]{trace << "waiting for result manifest";});
+
+ r = parse_manifest<result_manifest> (rf, "result");
+ }
+ else
+ {
+ try_rmfile (rf);
+
+ // <name>-<toolchain>-<xxx>
+ //
+ const dir_path xp (
+ md.directory () /= path::traits::temp_name (md.leaf ().string ()));
+
+ string br ("br1"); // Using private bridge for now.
+
+ for (size_t retry (0);; ++retry)
+ {
+ if (retry != 0)
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ run_btrfs (trace, "subvolume", "snapshot", md, xp);
+
+ // Start the TFTP server.
+ //
+ tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" +
+ "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n",
+ ops.tftp_port () + tc_num);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ mm.machine.mac,
+ br,
+ tftpd.port ()));
+
+ // Note: the machine handling logic is similar to bootstrap.
+ //
+ {
+ auto mg (
+ make_exception_guard (
+ [&m, &xp] ()
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown ();} catch (const failed&) {}
+ }));
+
+ auto soft_fail = [&xp, &m, &r] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << xp << ", suspending";
+ m->print_info (dr);
+ }
+ m->suspend ();
+ m->wait ();
+ info << "resuming after machine suspension";
+ return r;
+ };
+
+ // The first request should be the task manifest download. Wait for up
+ // to 60 seconds for that to arrive. In a sense we use it as an
+ // indication that the machine has booted and the worker process has
+ // started.
+ //
+ size_t to;
+ const size_t startup_to (60);
+ const size_t build_to (ops.build_timeout ());
+
+ if (!tftpd.serve ((to = startup_to)))
+ {
+ if (retry > ops.build_retries ())
+ return soft_fail ("build startup timeout");
+
+ warn << "machine " << mm.machine.name << " appears to have "
+ << "mis-booted, retrying";
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ continue;
+ }
+
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+
+ // Next the worker builds things and then uploads the result manifest.
+ // So on our side we serve TFTP requests while checking for the
+ // manifest file. To workaround some obscure filesystem races (the
+ // file's mtime/size is updated several seconds later; maybe tmpfs
+ // issue?), we periodically re-check.
+ //
+ for (to = build_to; to != 0; tftpd.serve (to, 2))
+ {
+ if (file_exists (rf))
+ {
+ file_sync (rf);
+ if (!file_empty (rf))
+ break;
+ }
+ }
+
+ if (to == 0)
+ return soft_fail ("build timeout");
+
+ l3 ([&]{trace << "completed build in " << build_to - to << "s";});
+
+ // Parse the result manifest.
+ //
+ try
+ {
+ r = parse_manifest<result_manifest> (rf, "result", false);
+ }
+ catch (const failed&)
+ {
+ r.status = result_status::abnormal; // Soft-fail below.
+ }
+
+ if (r.status == result_status::abnormal)
+ {
+ // If the build terminated abnormally, suspend the machine for
+ // investigation.
+ //
+ return soft_fail ("build terminated abnormally");
+ }
+ else
+ {
+ // Force the machine down (there is no need wasting time on clean
+ // shutdown since the next step is to drop the snapshot). Also fail
+ // softly if things go badly.
+ //
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
+ }
+
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+ }
+
+ // Update package name/version if the returned value as "unknown".
+ //
+ if (r.version == bpkg::version ("0"))
+ {
+ assert (r.status == result_status::abnormal);
+
+ r.name = tm.name;
+ r.version = tm.version;
+ }
+
+ return r;
+}
+catch (const system_error& e)
+{
+ fail << "build error: " << e << endf;
+}
+
+extern "C" void
+handle_signal (int sig)
+{
+ switch (sig)
+ {
+ case SIGHUP: exit (3); // Unimplemented feature.
+ case SIGTERM: exit (0);
+ default: assert (false);
+ }
+}
+
+int
+main (int argc, char* argv[])
+try
+{
+ cli::argv_scanner scan (argc, argv, true);
+ ops.parse (scan);
+
+ verb = ops.verbose ();
+
+ if (ops.systemd_daemon ())
+ systemd_diagnostics (true); // With critical errors.
+
+ tracer trace ("main");
+
+ uid = getuid ();
+ uname = getpwuid (uid)->pw_name;
+
+ {
+ char buf[HOST_NAME_MAX + 1];
+
+ if (gethostname (buf, sizeof (buf)) == -1)
+ fail << "unable to obtain hostname: "
+ << system_error (errno, generic_category ()); // Sanitize.
+
+ hname = buf;
+ }
+
+ // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if
+ // the pipe reading end is closed. Note that by default this signal
+ // terminates a process. Also note that there is no way to disable this
+ // behavior on a file descriptor basis or for the write() function call.
+ //
+ if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
+ fail << "unable to ignore broken pipe (SIGPIPE) signal: "
+ << system_error (errno, generic_category ()); // Sanitize.
+
+ // Version.
+ //
+ if (ops.version ())
+ {
+ cout << "bbot-agent " << BBOT_VERSION_ID << endl
+ << "libbbot " << LIBBBOT_VERSION_ID << endl
+ << "libbpkg " << LIBBBOT_VERSION_ID << endl
+ << "libbutl " << LIBBUTL_VERSION_ID << endl
+ << "Copyright (c) 2014-2017 Code Synthesis Ltd" << endl
+ << "TBC; All rights reserved" << endl;
+
+ return 0;
+ }
+
+ // Help.
+ //
+ if (ops.help ())
+ {
+ pager p ("bbot-agent help", false);
+ print_bbot_agent_usage (p.stream ());
+
+ // If the pager failed, assume it has issued some diagnostics.
+ //
+ return p.wait () ? 0 : 1;
+ }
+
+ tc_name = ops.toolchain_name ();
+ tc_num = ops.toolchain_num ();
+ tc_ver = (ops.toolchain_ver_specified ()
+ ? ops.toolchain_ver ()
+ : standard_version (BBOT_VERSION_STR));
+ tc_id = ops.toolchain_id ();
+
+
+ // Controller URLs.
+ //
+ if (argc < 2 &&
+ !ops.dump_machines () &&
+ !ops.fake_request_specified ())
+ {
+ fail << "controller url expected" <<
+ info << "run " << argv[0] << " --help for details";
+ }
+
+ strings controllers;
+
+ for (int i (1); i != argc; ++i)
+ controllers.push_back (argv[i]);
+
+ // Handle SIGHUP and SIGTERM.
+ //
+ if (signal (SIGHUP, &handle_signal) == SIG_ERR ||
+ signal (SIGTERM, &handle_signal) == SIG_ERR)
+ fail << "unable to set signal handler: "
+ << system_error (errno, generic_category ()); // Sanitize.
+
+ optional<string> fingerprint;
+
+ if (ops.auth_key_specified ())
+ try
+ {
+ // Note that the process always prints to STDERR, so we redirect it to the
+ // null device. We also check for the key file existence to print more
+ // meaningful error message if that's not the case.
+ //
+ if (!file_exists (ops.auth_key ()))
+ throw_generic_error (ENOENT);
+
+ openssl os (trace,
+ ops.auth_key (), path ("-"), fdnull (),
+ ops.openssl (), "rsa",
+ ops.openssl_option (), "-pubout", "-outform", "DER");
+
+ vector<char> k (os.in.read_binary ());
+ os.in.close ();
+
+ if (!os.wait ())
+ throw_generic_error (EIO);
+
+ fingerprint = sha256 (k.data (), k.size ()).string ();
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to obtain authentication public key: " << e;
+ }
+
+ if (ops.systemd_daemon ())
+ {
+ diag_record dr;
+
+ dr << info << "bbot agent " << BBOT_VERSION_ID;
+
+ if (fingerprint)
+ dr << info << "auth key fp " << *fingerprint;
+
+ dr <<
+ info << "toolchain name " << tc_name <<
+ info << "toolchain num " << tc_num <<
+ info << "toolchain ver " << tc_ver.string () <<
+ info << "toolchain id " << tc_id <<
+ info << "CPU(s) " << ops.cpu () <<
+ info << "RAM(kB) " << ops.ram ();
+
+ for (const string& u: controllers)
+ dr << info << "controller url " << u;
+ }
+
+ // The work loop. The steps we go through are:
+ //
+ // 1. Enumerate the available machines, (re-)bootstrapping any if necessary.
+ //
+ // 2. Poll controller(s) for build tasks.
+ //
+ // 3. If no build tasks are available, go to #1 (after sleeping a bit).
+ //
+ // 4. If a build task is returned, do it, upload the result, and go to #1
+ // (immediately).
+ //
+ for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false)
+ {
+ // Enumerate the machines.
+ //
+ auto mp (enumerate_machines (ops.machines ()));
+ bootstrapped_machine_manifests& ms (mp.first);
+ dir_paths& ds (mp.second);
+
+ // Prepare task request.
+ //
+ task_request_manifest tq {
+ hname,
+ tc_name,
+ tc_ver,
+ fingerprint,
+ machine_header_manifests {}
+ };
+
+ for (const bootstrapped_machine_manifest& m: ms)
+ tq.machines.emplace_back (m.machine.id,
+ m.machine.name,
+ m.machine.summary);
+
+ if (ops.dump_machines ())
+ {
+ for (const machine_header_manifest& m: tq.machines)
+ serialize_manifest (m, cout, "stdout", "machine");
+
+ return 0;
+ }
+
+ if (tq.machines.empty ())
+ {
+ warn << "no build machines for toolchain " << tc_name;
+ sleep = true;
+ continue;
+ }
+
+ // Send task requests.
+ //
+ //
+ string url;
+ task_response_manifest tr;
+
+ if (ops.fake_request_specified ())
+ {
+ auto t (parse_manifest<task_manifest> (ops.fake_request (), "task"));
+
+ tr = task_response_manifest {
+ "fake-session", // Dummy session.
+ nullopt, // No challenge.
+ url, // Empty result URL.
+ move (t)};
+
+ url = "http://example.org";
+ }
+ else
+ {
+ for (const string& u: controllers)
+ {
+ try
+ {
+ http_curl c (trace,
+ path ("-"),
+ path ("-"),
+ curl::post,
+ u,
+ "--header", "Content-Type: text/manifest",
+ "--max-time", ops.request_timeout ());
+
+ // This is tricky/hairy: we may fail hard parsing the output before
+ // seeing that curl exited with an error and failing softly.
+ //
+ bool f (false);
+
+ try
+ {
+ serialize_manifest (tq, c.out, u, "task request", false);
+ }
+ catch (const failed&) {f = true;}
+
+ c.out.close ();
+
+ if (!f)
+ try
+ {
+ tr = parse_manifest<task_response_manifest> (
+ c.in, u, "task response", false);
+ }
+ catch (const failed&) {f = true;}
+
+ c.in.close ();
+
+ if (!c.wait () || f)
+ throw_generic_error (EIO);
+ }
+ catch (const system_error& e)
+ {
+ error << "unable to request task from " << u << ": " << e;
+ continue;
+ }
+
+ if (tr.challenge && !fingerprint) // Controller misbehaves.
+ {
+ error << "unexpected challenge from " << u << ": " << *tr.challenge;
+ continue;
+ }
+
+ if (!tr.session.empty ()) // Got a task.
+ {
+ url = u;
+
+ task_manifest& t (*tr.task);
+ l2 ([&]{trace << "task for " << t.name << '/' << t.version << " "
+ << "on " << t.machine << " "
+ << "from " << url;});
+ break;
+ }
+ }
+ }
+
+ if (tr.session.empty ()) // No task from any of the controllers.
+ {
+ l2 ([&]{trace << "no tasks from any controllers, sleeping";});
+ sleep = true;
+ continue;
+ }
+
+ // We have a build task.
+ //
+ // First find the index of the machine we were asked to use (and also
+ // verify it is one of those we sent).
+ //
+ size_t i (0);
+ for (const machine_header_manifest& m: tq.machines)
+ {
+ if (m.name == tr.task->machine)
+ break;
+
+ ++i;
+ }
+
+ if (i == ms.size ())
+ {
+ error << "task from " << url << " for unknown machine "
+ << tr.task->machine;
+
+ if (ops.dump_task ())
+ return 0;
+
+ continue;
+ }
+
+ task_manifest& t (*tr.task);
+
+ if (ops.dump_task ())
+ {
+ serialize_manifest (t, cout, "stdout", "task");
+ return 0;
+ }
+
+ // If we have our own repository certificate fingerprints, then use them
+ // to replace what we have received from the controller.
+ //
+ if (!ops.trust ().empty ())
+ t.trust = ops.trust ();
+
+ const dir_path& d (ds[i]); // The -<toolchain> directory.
+ const bootstrapped_machine_manifest& m (ms[i]);
+
+ result_manifest r (perform_task (d, m, t));
+
+ if (ops.dump_result ())
+ {
+ serialize_manifest (r, cout, "stdout", "result");
+ return 0;
+ }
+
+ // Prepare answer to the private key challenge.
+ //
+ optional<vector<char>> challenge;
+
+ if (tr.challenge)
+ try
+ {
+ assert (ops.auth_key_specified ());
+
+ openssl os (trace,
+ fdstream_mode::text, path ("-"), 2,
+ ops.openssl (), "rsautl",
+ ops.openssl_option (), "-sign", "-inkey", ops.auth_key ());
+
+ os.out << *tr.challenge;
+ os.out.close ();
+
+ challenge = os.in.read_binary ();
+ os.in.close ();
+
+ if (!os.wait ())
+ throw_generic_error (EIO);
+ }
+ catch (const system_error& e)
+ {
+ // The task response challenge is valid (verified by manifest parser),
+ // so there is something wrong with setup, and so the failure is fatal.
+ //
+ fail << "unable to sign task response challenge: " << e;
+ }
+
+ // Upload the result.
+ //
+ result_request_manifest rq {tr.session, move (challenge), move (r)};
+ {
+ const string& u (*tr.result_url);
+
+ try
+ {
+ http_curl c (trace,
+ path ("-"),
+ nullfd, // Not expecting any data in response.
+ curl::post,
+ u,
+ "--header", "Content-Type: text/manifest",
+ "--max-time", ops.request_timeout ());
+
+ // This is tricky/hairy: we may fail hard writing the input before
+ // seeing that curl exited with an error and failing softly.
+ //
+ bool f (false);
+
+ try
+ {
+ serialize_manifest (rq, c.out, u, "task request");
+ }
+ catch (const failed&) {f = true;}
+
+ c.out.close ();
+
+ if (!c.wait () || f)
+ throw_generic_error (EIO);
+ }
+ catch (const system_error& e)
+ {
+ error << "unable to upload result to " << u << ": " << e;
+ continue;
+ }
+ }
+
+ l2 ([&]{trace << "built " << t.name << '/' << t.version << " "
+ << "on " << t.machine << " "
+ << "for " << url;});
+ }
+}
+catch (const failed&)
+{
+ return 1; // Diagnostics has already been issued.
+}
+catch (const cli::exception& e)
+{
+ error << e;
+ return 1;
+}
+
+namespace bbot
+{
+ static unsigned int rand_seed; // Seed for rand_r();
+
+ size_t
+ genrand ()
+ {
+ if (rand_seed == 0)
+ rand_seed = static_cast<unsigned int> (
+ chrono::system_clock::now ().time_since_epoch ().count ());
+
+ return static_cast<size_t> (rand_r (&rand_seed));
+ }
+
+ // Note: Linux-specific implementation.
+ //
+ string
+ iface_addr (const string& i)
+ {
+ if (i.size () >= IFNAMSIZ)
+ throw invalid_argument ("interface nama too long");
+
+ auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0));
+
+ if (fd.get () == -1)
+ throw_system_error (errno);
+
+ ifreq ifr;
+ ifr.ifr_addr.sa_family = AF_INET;
+ strcpy (ifr.ifr_name, i.c_str ());
+
+ if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1)
+ throw_system_error (errno);
+
+ char buf[3 * 4 + 3 + 1]; // IPv4 address.
+ if (inet_ntop (AF_INET,
+ &reinterpret_cast<sockaddr_in*> (&ifr.ifr_addr)->sin_addr,
+ buf,
+ sizeof (buf)) == nullptr)
+ throw_system_error (errno);
+
+ return buf;
+ }
+}