aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bbot/agent/agent.cli25
-rw-r--r--bbot/agent/agent.cxx1362
-rw-r--r--bbot/agent/agent.hxx4
-rw-r--r--bbot/agent/machine.cxx86
-rw-r--r--bbot/agent/machine.hxx14
-rw-r--r--bbot/bbot-agent@.service2
-rw-r--r--bbot/buildfile2
-rw-r--r--bbot/utility.hxx1
8 files changed, 1281 insertions, 215 deletions
diff --git a/bbot/agent/agent.cli b/bbot/agent/agent.cli
index 060cba0..23765cf 100644
--- a/bbot/agent/agent.cli
+++ b/bbot/agent/agent.cli
@@ -119,7 +119,7 @@ namespace bbot
"<num>",
"Toolchain number, 1 by default. If agents are running for several
toolchains, then each of them should have a unique toolchain number
- between 1 and 99. This number is used as an offset for network ports,
+ between 1 and 9. This number is used as an offset for network ports,
interfaces, etc."
}
@@ -195,7 +195,7 @@ namespace bbot
"Amount of RAM (in KiB) to use for the build machine, 4GiB by default."
}
- size_t --auxiliary-ram
+ size_t --auxiliary-ram = 0
{
"<num>",
"Amount of RAM (in KiB) to use for auxiliary machines. To disable
@@ -251,28 +251,35 @@ namespace bbot
}
// Low 23401+, 23501+, 23601+, etc., all look good collision-wise with
- // with anything useful.
+ // anything useful.
//
uint16_t --tftp-port = 23400
{
"<num>",
"TFTP server port base, 23400 by default. The actual port is calculated
- by adding an offset calculated based on the toolchain and instance
- numbers."
+ by adding an offset calculated based on the toolchain, instance, and
+ machine numbers."
}
size_t --bootstrap-startup = 300
{
"<sec>",
- "Maximum number of seconds to wait for machine bootstrap startup,
+ "Maximum number of seconds to wait for build machine bootstrap startup,
300 (5 minutes) by default."
}
size_t --bootstrap-timeout = 3600
{
"<sec>",
- "Maximum number of seconds to wait for machine bootstrap completion,
- 3600 (60 minutes) by default."
+ "Maximum number of seconds to wait for build machine bootstrap
+ completion, 3600 (60 minutes) by default."
+ }
+
+ size_t --bootstrap-auxiliary = 900
+ {
+ "<sec>",
+ "Maximum number of seconds to wait for auxiliary machine bootstrap
+ completion, 900 (15 minutes) by default."
}
size_t --bootstrap-retries = 2
@@ -286,7 +293,7 @@ namespace bbot
{
"<sec>",
"Maximum number of seconds to wait for build startup, 240 (4 minutes) by
- default."
+ default. This value is used for both build and auxiliary machines."
}
size_t --build-timeout = 5400
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index 8f860a2..8316d1c 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -56,6 +56,27 @@ using namespace bbot;
using std::cout;
using std::endl;
+// If RAM minimum is not specified for a machine, then let's assume something
+// plausible like 256MiB. This way we won't end up with degenerate cases where
+// we attempt to start a machine with some absurd amount of RAM.
+//
+const std::uint64_t default_ram_minimum = 252144;
+
+static inline std::uint64_t
+effective_ram_minimum (const machine_header_manifest& m)
+{
+ // Note: neither ram_minimum nor ram_maximum should be 0.
+ //
+ assert ((!m.ram_minimum || *m.ram_minimum != 0) &&
+ (!m.ram_maximum || *m.ram_maximum != 0));
+
+ return (m.ram_minimum
+ ? *m.ram_minimum
+ : (m.ram_maximum && *m.ram_maximum < default_ram_minimum
+ ? *m.ram_maximum
+ : default_ram_minimum));
+}
+
static std::mt19937 rand_gen (std::random_device {} ());
// According to the standard, atomic's use in the signal handler is only safe
@@ -153,16 +174,16 @@ btrfs_exit (tracer& t, A&&... a)
"btrfs", forward<A> (a)...);
}
-// Bootstrap the machine. Return the bootstrapped machine manifest if
-// successful and nullopt otherwise (in which case the machine directory
-// should be cleaned and the machine ignored for now).
+// Bootstrap a build machine. Return the bootstrapped machine manifest if
+// successful and nullopt otherwise (in which case the caller should clean up
+// the machine directory and ignore the machine for now).
//
static optional<bootstrapped_machine_manifest>
-bootstrap_machine (const dir_path& md,
- const machine_manifest& mm,
- optional<bootstrapped_machine_manifest> obmm)
+bootstrap_build_machine (const dir_path& md,
+ const machine_manifest& mm,
+ optional<bootstrapped_machine_manifest> obmm)
{
- tracer trace ("bootstrap_machine", md.string ().c_str ());
+ tracer trace ("bootstrap_build_machine", md.string ().c_str ());
bootstrapped_machine_manifest r {
mm,
@@ -184,10 +205,12 @@ bootstrap_machine (const dir_path& md,
else
try
{
+ // Note: similar code in bootstrap_auxiliary_machine().
+
// Start the TFTP server (server chroot is --tftp). Map:
//
- // GET requests to .../toolchains/<name>/*
- // PUT requests to .../bootstrap/<name>-<instance>/*
+ // GET requests to .../toolchains/<toolchain>/*
+ // PUT requests to .../bootstrap/<toolchain>-<instance>/*
//
const string in_name (tc_name + '-' + to_string (inst));
auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name);
@@ -211,7 +234,7 @@ bootstrap_machine (const dir_path& md,
{
tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" +
"Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n",
- ops.tftp_port () + offset);
+ ops.tftp_port () + offset + 0 /* build machine */);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
@@ -220,6 +243,9 @@ bootstrap_machine (const dir_path& md,
unique_ptr<machine> m (
start_machine (md,
mm,
+ 0 /* machine_num (build) */,
+ ops.cpu (),
+ ops.build_ram (),
obmm ? obmm->machine.mac : nullopt,
ops.bridge (),
tftpd.port (),
@@ -260,7 +286,7 @@ bootstrap_machine (const dir_path& md,
m->cleanup ();
info << "resuming after machine suspension";
- // Note: snapshot cleaned up by the caller of bootstrap_machine().
+ // Note: snapshot cleaned up by the caller.
}
catch (const failed&) {}
@@ -313,8 +339,7 @@ bootstrap_machine (const dir_path& md,
if (!check_machine ())
{
- // Note: snapshot cleaned up by the caller of bootstrap_machine().
- return nullopt;
+ return nullopt; // Note: snapshot cleaned up by the caller.
}
}
@@ -336,6 +361,7 @@ bootstrap_machine (const dir_path& md,
m->print_info (dr);
try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
continue;
}
@@ -359,8 +385,7 @@ bootstrap_machine (const dir_path& md,
//
if (!(file_not_empty (mf) || file_not_empty (mfo)))
{
- // Note: snapshot cleaned up by the caller of bootstrap_machine().
- return nullopt;
+ return nullopt; // Note: snapshot cleaned up by the caller.
}
}
@@ -411,6 +436,220 @@ bootstrap_machine (const dir_path& md,
return r;
}
+// Bootstrap an auxiliary machine. Return the bootstrapped machine manifest if
+// successful and nullopt otherwise (in which case the caller should clean up
+// the machine directory and ignore the machine for now).
+//
+static vector<size_t>
+divide_auxiliary_ram (const vector<const machine_manifest*>&);
+
+static optional<bootstrapped_machine_manifest>
+bootstrap_auxiliary_machine (const dir_path& md,
+ const machine_manifest& mm,
+ optional<bootstrapped_machine_manifest> obmm)
+{
+ tracer trace ("bootstrap_auxiliary_machine", md.string ().c_str ());
+
+ bootstrapped_machine_manifest r {
+ mm,
+ toolchain_manifest {}, // Unused for auxiliary,
+ bootstrap_manifest {} // Unused for auxiliary.
+ };
+
+ if (ops.fake_bootstrap ())
+ {
+ r.machine.mac = "de:ad:be:ef:de:ad";
+ }
+ else
+ try
+ {
+ // Similar to bootstrap_build_machine() except here we just wait for the
+ // upload of the environment.
+
+ // Start the TFTP server (server chroot is --tftp). Map:
+ //
+ // GET requests to /dev/null
+ // PUT requests to .../bootstrap/<toolchain>-<instance>/*
+ //
+ const string in_name (tc_name + '-' + to_string (inst));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name);
+ try_mkdir_p (arm.path);
+
+ // Environment upload.
+ //
+ path ef (arm.path / "environment");
+ try_rmfile (ef);
+
+ // Note that unlike build, here we use the same VM snapshot for retries,
+ // which is not ideal.
+ //
+ for (size_t retry (0);; ++retry)
+ {
+ tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' +
+ "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n",
+ ops.tftp_port () + offset + 1 /* auxiliary machine */);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // If the machine specified RAM minimum, use that to make sure the
+ // machine can actually function with this amount of RAM. Otherwise, use
+ // the minium of RAM maximum (if specified) and the available auxiliary
+ // RAM (so we know this machine will at least work alone). For the
+ // latter case use divide_auxiliary_ram() to be consistent with the
+ // build case (see that function implementation for nuances).
+ //
+ size_t ram;
+ if (mm.ram_minimum)
+ ram = *mm.ram_minimum;
+ else
+ {
+ vector<size_t> rams (divide_auxiliary_ram ({&mm}));
+ assert (!rams.empty ()); // We should have skipped such a machine.
+ ram = rams.front ();
+ }
+
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (md,
+ mm,
+ 1 /* machine_num (first auxiliary) */,
+ ops.cpu (),
+ ram,
+ obmm ? obmm->machine.mac : nullopt,
+ ops.bridge (),
+ tftpd.port (),
+ false /* pub_vnc */));
+
+ {
+ // NOTE: see bootstrap_build_machine() for comments.
+
+ auto mg (
+ make_exception_guard (
+ [&m, &md] ()
+ {
+ info << "trying to force machine " << md << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }));
+
+ auto soft_fail = [&md, &m] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << md << ", suspending";
+ m->print_info (dr);
+ }
+
+ try
+ {
+ m->suspend (false);
+ m->wait (false);
+ m->cleanup ();
+ info << "resuming after machine suspension";
+
+ // Note: snapshot cleaned up by the caller.
+ }
+ catch (const failed&) {}
+
+ return nullopt;
+ };
+
+ auto check_machine = [&md, &m] ()
+ {
+ try
+ {
+ size_t t (0);
+ if (!m->wait (t /* seconds */, false /* fail_hard */))
+ return true; // Still running.
+
+ // Exited successfully.
+ }
+ catch (const failed&)
+ {
+ // Failed, exit code diagnostics has already been issued.
+ }
+
+ diag_record dr (error);
+ dr << "machine " << md << " exited unexpectedly";
+ m->print_info (dr);
+
+ return false;
+ };
+
+ // Wait up to the specified timeout for the auxiliary machine to
+ // bootstrap. Note that such a machine may do extra setup work on the
+ // first boot (such as install some packages, etc) which may take some
+ // time.
+ //
+ size_t to;
+ const size_t bootstrap_to (ops.bootstrap_auxiliary ());
+ const size_t shutdown_to (5 * 60);
+
+ // Serve TFTP requests while periodically checking for the environment
+ // file.
+ //
+ for (to = bootstrap_to; to != 0; )
+ {
+ if (tftpd.serve (to, 2))
+ continue;
+
+ if (!check_machine ())
+ {
+ if (!file_not_empty (ef))
+ {
+ return nullopt; // Note: snapshot cleaned up by the caller.
+ }
+ }
+
+ if (file_not_empty (ef))
+ {
+ if (!tftpd.serve (to, 5))
+ break;
+ }
+ }
+
+ if (to == 0)
+ {
+ if (retry > ops.bootstrap_retries ())
+ return soft_fail ("bootstrap timeout");
+
+ // Note: keeping the logs behind (no cleanup).
+
+ diag_record dr (warn);
+ dr << "machine " << mm.name << " mis-booted, retrying";
+ m->print_info (dr);
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
+ continue;
+ }
+
+ l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";});
+
+ // Shut the machine down cleanly.
+ //
+ if (!m->shutdown ((to = shutdown_to)))
+ return soft_fail ("bootstrap shutdown timeout");
+
+ l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";});
+
+ m->cleanup ();
+ }
+
+ r.machine.mac = m->mac; // Save the MAC address.
+
+ break;
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "bootstrap error: " << e;
+ }
+
+ serialize_manifest (r, md / "manifest", "bootstrapped machine");
+ return r;
+}
+
// Global toolchain lock.
//
// The overall locking protocol is as follows:
@@ -426,10 +665,11 @@ bootstrap_machine (const dir_path& md,
// proceeds to bootstrap the machine, releases its lock, and restarts the
// process from scratch.
//
-// 4. Otherwise, upon receiving a task response for one of the machines, the
-// agent releases all the other machine locks followed by the global lock,
-// proceeds to perform the task on the selected machine, releases its lock,
-// and restarts the process from scratch.
+// 4. Otherwise, upon receiving a task response for one of the machines (plus,
+// potentially, a number of auxiliary machines), the agent releases all the
+// other machine locks followed by the global lock, proceeds to perform the
+// task on the selected machine(s), releases their locks, and restarts the
+// process from scratch.
//
// One notable implication of this protocol is that the machine locks are
// only acquired while holding the global toolchain lock but can be released
@@ -528,6 +768,13 @@ lock_toolchain (unsigned int timeout)
// guaranteed to be atomic (in case later we want to support exclusive
// bootstrap and shared build).
//
+// Note also that we per-toolchain lock auxiliary machines even though they
+// are not toolchain-specific. Doing it this way allows us to handle both
+// types of machines consistently with regards to priorities, interrupts, etc.
+// It also means we will have each auxiliary machine available per-toolchain
+// rather than a single machine shared between all the toolchains, which is
+// a good thing.
+//
class machine_lock
{
public:
@@ -807,10 +1054,13 @@ compare_bbot (const bootstrap_manifest& m)
// bootstrapping/suspended machines have to be returned to get the correct
// count of currently active instances for the inst_max comparison.)
//
+// Note that both build and auxiliary machines are returned. For auxiliary,
+// toolchain and bootstrap manifests are unused and therefore always empty.
+//
struct bootstrapped_machine
{
- dir_path path;
machine_lock lock;
+ const dir_path path;
bootstrapped_machine_manifest manifest;
};
using bootstrapped_machines = vector<bootstrapped_machine>;
@@ -850,8 +1100,8 @@ try
r.push_back (
bootstrapped_machine {
- dir_path (ops.machines ()) /= mh.name, // For diagnostics.
machine_lock (path (), nullfd), // Fake lock.
+ dir_path (ops.machines ()) /= mh.name, // For diagnostics.
bootstrapped_machine_manifest {
machine_manifest {
move (mh.id),
@@ -867,8 +1117,8 @@ try
return pr;
}
- // Notice and warn if there are no machines (as opposed to all of them
- // being busy).
+ // Notice and warn if there are no build machines (as opposed to all of
+ // them being busy).
//
bool none (true);
@@ -976,8 +1226,6 @@ try
fail << "unable to read subvolume link " << lp << ": " << e;
}
- none = none && sp.empty ();
-
// Try to lock the machine.
//
machine_lock ml (lock_machine (tl, tp));
@@ -1010,18 +1258,25 @@ try
mm = parse_manifest<machine_manifest> (
sp / "manifest", "machine");
+
+ none = none && mm.effective_role () == machine_role::auxiliary;
}
else // Bootstrapping/suspended.
{
l3 ([&]{trace << "keeping " << md << ": being bootstrapped "
<< "or suspened by " << ml.pid;});
+
+ // Assume it is a build machine (we cannot determine whether
+ // it is build or auxiliary without loading its manifest).
+ //
+ none = false;
}
// Add the machine to the lists and bail out.
//
r.push_back (bootstrapped_machine {
- move (tp),
move (ml),
+ move (tp),
bootstrapped_machine_manifest {move (mm), {}, {}}});
break;
@@ -1057,15 +1312,29 @@ try
// Load the (original) machine manifest.
//
- auto mm (
+ machine_manifest mm (
parse_manifest<machine_manifest> (sp / "manifest", "machine"));
+ bool aux (mm.effective_role () == machine_role::auxiliary);
+
+ // Skip machines for which we don't have sufficient RAM.
+ //
+ if (effective_ram_minimum (mm) <
+ (aux ? ops.auxiliary_ram () : ops.build_ram ()))
+ {
+ l3 ([&]{trace << "skipping " << md << ": insufficient RAM";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+
+ none = none && aux;
+
// If we already have <name>-<toolchain>, see if it needs to be
// re-bootstrapped. Things that render it obsolete:
//
// 1. New machine revision (compare machine ids).
- // 2. New toolchain (compare toolchain ids).
- // 3. New bbot/libbbot (compare versions).
+ // 2. New toolchain (compare toolchain ids, not auxiliary).
+ // 3. New bbot/libbbot (compare versions, not auxiliary).
//
// The last case has a complication: what should we do if we have
// bootstrapped a newer version of bbot? This would mean that we
@@ -1087,24 +1356,27 @@ try
te = false;
}
- if (!tc_id.empty () && bmm->toolchain.id != tc_id)
+ if (!aux)
{
- l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
- te = false;
- }
-
- if (int i = compare_bbot (bmm->bootstrap))
- {
- if (i < 0)
+ if (!tc_id.empty () && bmm->toolchain.id != tc_id)
{
- l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";});
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
te = false;
}
- else
+
+ if (int i = compare_bbot (bmm->bootstrap))
{
- l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
- run_btrfs (trace, "subvolume", "delete", xp);
- break;
+ if (i < 0)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";});
+ te = false;
+ }
+ else
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
}
}
@@ -1134,7 +1406,7 @@ try
// Add the machine to the lists.
//
r.push_back (
- bootstrapped_machine {move (tp), move (ml), move (*bmm)});
+ bootstrapped_machine {move (ml), move (tp), move (*bmm)});
break;
} // Retry loop.
@@ -1161,12 +1433,19 @@ try
// Determine how many machines are busy (locked by other processes) and
// make sure it's below the --instance-max limit, if specified.
//
+ // We should only count build machines unless being bootstrapped (see
+ // above).
+ //
if (inst_max != 0)
{
size_t busy (0);
for (const bootstrapped_machine& m: r)
- if (!m.lock.locked ())
+ {
+ if (!m.lock.locked () &&
+ (!m.lock.prio ||
+ m.manifest.machine.effective_role () != machine_role::auxiliary))
++busy;
+ }
assert (busy <= inst_max);
@@ -1196,8 +1475,12 @@ try
ml.bootstrap (tl);
tl.unlock ();
+ bool aux (pboot->mm.effective_role () == machine_role::auxiliary);
+
optional<bootstrapped_machine_manifest> bmm (
- bootstrap_machine (xp, pboot->mm, move (pboot->bmm)));
+ aux
+ ? bootstrap_auxiliary_machine (xp, pboot->mm, move (pboot->bmm))
+ : bootstrap_build_machine (xp, pboot->mm, move (pboot->bmm)));
if (!bmm)
{
@@ -1217,16 +1500,19 @@ try
l2 ([&]{trace << "bootstrapped " << bmm->machine.name;});
- // Check the bootstrapped bbot version as above and ignore this machine
- // if it's newer than us.
+ // Check the bootstrapped bbot version as above and ignore this build
+ // machine if it's newer than us.
//
- if (int i = compare_bbot (bmm->bootstrap))
+ if (!aux)
{
- if (i > 0)
- l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
- else
- warn << "bootstrapped " << tp << " bbot worker is older "
- << "than agent; assuming test setup";
+ if (int i = compare_bbot (bmm->bootstrap))
+ {
+ if (i > 0)
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ else
+ warn << "bootstrapped " << tp << " bbot worker is older "
+ << "than agent; assuming test setup";
+ }
}
continue; // Re-enumerate from scratch.
@@ -1250,6 +1536,495 @@ catch (const system_error& e)
//
struct interrupt {};
+// Start an auxiliary machine (steps 1-3 described in perfrom_task() below).
+//
+// Note that if the returned machine is NULL, then it means it has failed to
+// start up (in which case the diagnostics has already been issued and
+// snapshot cleaned up).
+//
+// Note: can throw interrupt.
+//
+struct auxiliary_machine_result
+{
+ dir_path snapshot;
+ unique_ptr<bbot::machine> machine;
+};
+
+using auxiliary_machine_results = vector<auxiliary_machine_result>;
+
+static pair<auxiliary_machine_result, string /* environment */>
+start_auxiliary_machine (bootstrapped_machine& am,
+ const string& env_name,
+ uint16_t machine_num,
+ size_t ram,
+ const string& in_name, // <toolchain>-<instance>
+ const dir_path& tftp_put_dir,
+ optional<size_t> boost_cpus)
+try
+{
+ tracer trace ("start_auxiliary_machine", am.path.string ().c_str ());
+
+ // NOTE: a simplified version of perform_task() below.
+
+ machine_lock& ml (am.lock);
+ const dir_path& md (am.path);
+ const bootstrapped_machine_manifest& mm (am.manifest);
+
+ path ef (tftp_put_dir / "environment"); // Environment upload file.
+ try_rmfile (ef);
+
+ // <name>-<toolchain>-<xxx>
+ //
+ const dir_path xp (snapshot_path (md));
+
+ for (size_t retry (0);; ++retry)
+ {
+ if (retry != 0)
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ run_btrfs (trace, "subvolume", "snapshot", md, xp);
+
+ // Start the TFTP server. Map:
+ //
+ // GET requests to /dev/null
+ // PUT requests to .../build/<toolchain>-<instance>/put/*
+ //
+ // Note that we only need to run the TFTP server until we get the
+ // environment upload. Which means we could have reused the same port as
+ // the build machine. But let's keep things parallel to the VNC ports and
+ // use a seperate TFTP port for each auxiliary machine.
+ //
+ tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' +
+ "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n",
+ ops.tftp_port () + offset + machine_num);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // Note: the machine handling logic is similar to bootstrap. Except here
+ // we have to cleanup the snapshot ourselves in case of suspension or
+ // unexpected exit.
+
+ // Start the machine.
+ //
+ // Note that for now we don't support logging in into auxiliary machines
+ // in the interactive mode. Maybe one day.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ machine_num,
+ boost_cpus ? *boost_cpus : ops.cpu (),
+ ram,
+ mm.machine.mac,
+ ops.bridge (),
+ tftpd.port (),
+ false /* public_vnc */));
+
+ auto mg (
+ make_exception_guard (
+ [&m, &xp] ()
+ {
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
+ }));
+
+ auto soft_fail = [&trace, &ml, &xp, &m] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << xp << ", suspending";
+ m->print_info (dr);
+ }
+
+ try
+ {
+ // Update the information in the machine lock to signal that the
+ // machine is suspended and cannot be interrupted.
+ //
+ ml.suspend_task ();
+
+ m->suspend (false);
+ m->wait (false);
+ m->cleanup ();
+ run_btrfs (trace, "subvolume", "delete", xp);
+ info << "resuming after machine suspension";
+ }
+ catch (const failed&) {}
+
+ return make_pair (auxiliary_machine_result {move (xp), nullptr},
+ string ());
+ };
+
+ auto check_machine = [&xp, &m] ()
+ {
+ try
+ {
+ size_t t (0);
+ if (!m->wait (t /* seconds */, false /* fail_hard */))
+ return true;
+ }
+ catch (const failed&) {}
+
+ diag_record dr (warn);
+ dr << "machine " << xp << " exited unexpectedly";
+ m->print_info (dr);
+
+ return false;
+ };
+
+ auto check_interrupt = [&trace, &xp, &m] ()
+ {
+ if (sigurs1.load (std::memory_order_consume) == 0)
+ return;
+
+ l2 ([&]{trace << "machine " << xp << " interruped";});
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ throw interrupt ();
+ };
+
+ // Wait for up to 4 minutes (by default) for the environment upload (the
+ // same logic as in bootstrap_auxiliary_machine() except here the machine
+ // cannot just exit).
+ //
+ size_t to;
+ const size_t startup_to (ops.build_startup ());
+
+ for (to = startup_to; to != 0; )
+ {
+ check_interrupt ();
+
+ if (tftpd.serve (to, 2))
+ continue;
+
+ if (!check_machine ())
+ {
+ // An auxiliary machine should not just exit.
+ //
+ return make_pair (auxiliary_machine_result {move (xp), nullptr},
+ string ());
+ }
+
+ if (file_not_empty (ef))
+ {
+ if (!tftpd.serve (to, 5))
+ break;
+ }
+ }
+
+ if (to == 0)
+ {
+ if (retry > ops.build_retries ())
+ return soft_fail ("build startup timeout");
+
+ // Note: keeping the logs behind (no cleanup).
+
+ diag_record dr (warn);
+ dr << "machine " << mm.machine.name << " mis-booted, retrying";
+ m->print_info (dr);
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
+ continue;
+ }
+
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+
+ // Read the uploaded environment and, if necessary, append the name prefix
+ // (which we first make a valid C identifier).
+ //
+ // Note that it may seem like a good idea to validate the format here.
+ // But that means we will essentially need to parse it twice (here and in
+ // worker). Plus, in worker we can comminucate some diagnostics by writing
+ // it to the build log (here all we can easily do is abort the task). So
+ // here we just append the name prefix to trimmed non-blank/comment lines.
+ //
+ string env_pfx (env_name.empty ()
+ ? string ()
+ : sanitize_identifier (env_name) + '_');
+ string env;
+ try
+ {
+ ifdstream is (ef, ifdstream::badbit);
+ for (string l; !eof (getline (is, l)); )
+ {
+ trim (l);
+
+ if (!env_pfx.empty () && !l.empty () && l.front () != '#')
+ l.insert (0, env_pfx);
+
+ env += l; env += '\n';
+ }
+ }
+ catch (const io_error& e)
+ {
+ fail << "unable to read from " << ef << ": " << e;
+ }
+
+ // Rename and keep the environment file for debugging (it will be removed
+ // at the end as part of the tftp_put_dir cleanup).
+ //
+ mvfile (ef, ef + '-' + mm.machine.name);
+
+ return make_pair (auxiliary_machine_result {move (xp), move (m)},
+ move (env));
+ }
+
+ // Unreachable.
+}
+catch (const system_error& e)
+{
+ fail << "auxiliary machine startup error: " << e << endf;
+}
+
+// Divide the auxiliary RAM among the specified machines.
+//
+// Issue diagnostics and return empty vector if the auxiliary RAM is
+// insufficient.
+//
+static vector<size_t> // Parallel to mms.
+divide_auxiliary_ram (const vector<const machine_manifest*>& mms)
+{
+ size_t ram (ops.auxiliary_ram ());
+
+ vector<size_t> rams;
+ vector<size_t> rnds; // Allocation rounds (see below).
+
+ // First pass: allocate the minimums.
+ //
+ for (const machine_manifest* mm: mms)
+ {
+ size_t v (effective_ram_minimum (*mm));
+
+ assert (!mm->ram_maximum || v <= *mm->ram_maximum); // Sanity check.
+
+ rams.push_back (v);
+ rnds.push_back (0);
+
+ if (ram >= v)
+ ram -= v;
+ else
+ {
+ diag_record dr (error);
+ dr << "insufficient auxiliary RAM " << ops.auxiliary_ram () << "KiB";
+
+ for (size_t i (0); i != rams.size (); ++i)
+ dr << info << mms[i]->name << " requires minimum " << rams[i] << "KiB";
+
+ return {};
+ }
+ }
+
+ // Second pass: distribute the remaining RAM.
+ //
+ // We are going to do it in the ram_minimum increments to avoid ending up
+ // with odd amounts (while Linux can probably grok anything, who knows about
+ // Windows).
+ //
+ // To make the distribution fair we are going to count how many times we
+ // have increased each machine's allocation (the rnds vector).
+ //
+ for (size_t a (1); ram != 0; ) // Allocation round.
+ {
+ // Find a machine that would be satisfied with the least amount of RAM but
+ // which hasn't yet been given anything on this allocation round.
+ //
+ size_t min_i; // Min index.
+ size_t min_v (0); // Min value.
+
+ // We are done if we couldn't give out any RAM and haven't seen any
+ // machines that have already been given something on this allocation
+ // round.
+ //
+ bool done (true);
+
+ for (size_t i (0); i != rams.size (); ++i)
+ {
+ if (rnds[i] != a)
+ {
+ const machine_manifest& mm (*mms[i]);
+
+ size_t o (rams[i]);
+ size_t v (effective_ram_minimum (mm));
+
+ // Don't allocate past maximum.
+ //
+ if (mm.ram_maximum && *mm.ram_maximum < o + v)
+ {
+ v = *mm.ram_maximum - o;
+
+ if (v == 0)
+ continue;
+ }
+
+ if (v <= ram && (min_v == 0 || min_v > v))
+ {
+ min_i = i;
+ min_v = v;
+ }
+ }
+ else
+ done = false;
+ }
+
+ if (min_v != 0)
+ {
+ rnds[min_i] = a;
+ rams[min_i] += min_v;
+ ram -= min_v;
+ }
+ else
+ {
+ if (done)
+ break;
+
+ ++a; // Next allocation round.
+ }
+ }
+
+ return rams;
+}
+
+// Stop all the auxiliary machines and clear the passed list.
+//
+static void
+stop_auxiliary_machines (auxiliary_machine_results& amrs)
+{
+ tracer trace ("stop_auxiliary_machines");
+
+ if (!amrs.empty ())
+ {
+ // Do it in two passes to make sure all the machines are at least down.
+ //
+ for (const auxiliary_machine_result& amr: amrs)
+ {
+ if (amr.machine != nullptr)
+ {
+ try {amr.machine->forcedown (false);} catch (const failed&) {}
+ }
+ }
+
+ // Make sure we don't retry the above even if the below fails.
+ //
+ auxiliary_machine_results tmp;
+ tmp.swap (amrs);
+
+ for (const auxiliary_machine_result& amr: tmp)
+ {
+ if (amr.machine != nullptr)
+ {
+ amr.machine->cleanup ();
+ run_btrfs (trace, "subvolume", "delete", amr.snapshot);
+ }
+ }
+ }
+};
+
+// Start all the auxiliary machines and patch in their combined environment
+// into tm.auxiliary_environment.
+//
+// Return the started machines or empty list if any of them failed to start up
+// (which means this function should only be called for non-empty ams).
+//
+// Note that the order of auxiliary machines in ams may not match that in
+// tm.auxiliary_machines.
+//
+static auxiliary_machine_results
+start_auxiliary_machines (const vector<bootstrapped_machine*>& ams,
+ task_manifest& tm,
+ const string& in_name, // <toolchain>-<instance>
+ const dir_path& tftp_put_dir,
+ optional<size_t> boost_cpus)
+{
+ tracer trace ("start_auxiliary_machines");
+
+ size_t n (tm.auxiliary_machines.size ());
+
+ assert (n != 0 && ams.size () == n);
+
+ auxiliary_machine_results amrs;
+
+ // Divide the auxiliary RAM among the machines.
+ //
+ vector<size_t> rams;
+ {
+ vector<const machine_manifest*> mms;
+ mms.reserve (n);
+ for (bootstrapped_machine* am: ams)
+ mms.push_back (&am->manifest.machine);
+
+ rams = divide_auxiliary_ram (mms);
+ if (rams.empty ())
+ return amrs;
+
+ if (verb > 3) // l3
+ for (size_t i (0); i != n; ++i)
+ trace << mms[i]->name << " allocated " << rams[i] << "KiB";
+ }
+
+ // Start the machines.
+ //
+ // Let's use the order in which they were specified in the task manifest
+ // (which will naturally be the order in which they are specified in the
+ // package manifest). This way amrs and tm.auxiliary_machines will be
+ // parallel.
+ //
+ string envs; // Combined environments.
+
+ for (size_t i (0); i != n; ++i)
+ {
+ const auxiliary_machine& tam (tm.auxiliary_machines[i]);
+
+ auto b (ams.begin ()), e (ams.end ());
+ auto j (find_if (b, e,
+ [&tam] (const bootstrapped_machine* m)
+ {
+ return m->manifest.machine.name == tam.name;
+ }));
+ assert (j != e);
+
+ pair<auxiliary_machine_result, string> p (
+ start_auxiliary_machine (**j,
+ tam.environment_name,
+ i + 1,
+ rams[j - b], // Parallel to ams.
+ in_name,
+ tftp_put_dir,
+ boost_cpus));
+
+ if (p.first.machine == nullptr)
+ {
+ if (!amrs.empty ())
+ {
+ info << "trying to force auxiliary machines down";
+ stop_auxiliary_machines (amrs); // amrs is now empty.
+ }
+
+ return amrs;
+ }
+
+ amrs.push_back (move (p.first));
+
+ // Add the machine name as a header before its environment.
+ //
+ envs += "# "; envs += tam.name; envs += '\n';
+ envs += "#\n";
+ envs += p.second;
+ envs += '\n';
+ }
+
+ tm.auxiliary_environment = move (envs);
+
+ return amrs;
+}
+
struct perform_task_result
{
auto_rmdir work_dir; // <tftp>/build/<toolchain>-<instance>/
@@ -1278,16 +2053,18 @@ struct perform_task_result
upload_archive (move (a)) {}
};
+// Note that the task manifest is not const since we may need to patch in the
+// auxiliary_environment value.
+//
static perform_task_result
perform_task (toolchain_lock tl, // Note: assumes ownership.
- machine_lock& ml,
- const dir_path& md,
- const bootstrapped_machine_manifest& mm,
- const task_manifest& tm,
+ bootstrapped_machine& bm, // Build machine.
+ const vector<bootstrapped_machine*>& ams, // Auxiliary machines.
+ task_manifest& tm,
optional<size_t> boost_cpus)
try
{
- tracer trace ("perform_task", md.string ().c_str ());
+ tracer trace ("perform_task", bm.path.string ().c_str ());
// Arm the interrupt handler and release the global toolchain lock.
//
@@ -1296,6 +2073,10 @@ try
sigurs1.store (0, std::memory_order_release);
tl.unlock ();
+ machine_lock& ml (bm.lock);
+ const dir_path& md (bm.path);
+ const bootstrapped_machine_manifest& mm (bm.manifest);
+
const string in_name (tc_name + '-' + to_string (inst));
auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name);
@@ -1314,7 +2095,7 @@ try
// The overall plan is as follows:
//
- // 1. Snapshot the (bootstrapped) machine.
+ // 1. Snapshot the (bootstrapped) build machine.
//
// 2. Save the task manifest to the TFTP directory (to be accessed by the
// worker).
@@ -1326,6 +2107,18 @@ try
//
// 5. Clean up (force the machine down and delete the snapshot).
//
+ // If the task requires any auxiliary machines, then for each such machine
+ // perform the following steps 1-3 before step 1 above, and step 4 after
+ // step 5 above (that is, start all the auxiliary machines before the build
+ // machine and clean them up after):
+ //
+ // 1. Snapshot the (bootstrapped) auxiliary machine.
+ //
+ // 2. Start the TFTP server and the machine.
+ //
+ // 3. Handle TFTP upload requests until received the environment upload.
+ //
+ // 4. Clean up (force the machine down and delete the snapshot).
// TFTP server mapping (server chroot is --tftp):
//
@@ -1342,11 +2135,12 @@ try
path rf (pd / "result.manifest.lz4"); // Result manifest file.
path af (pd / "upload.tar"); // Archive of build artifacts to upload.
- serialize_manifest (tm, tf, "task");
-
if (ops.fake_machine_specified ())
{
- // Note: not handling interrupts here.
+ // Note: not handling interrupts here. Nor starting any auxiliary
+ // machines, naturally.
+
+ serialize_manifest (tm, tf, "task");
// Simply wait for the file to appear.
//
@@ -1368,6 +2162,38 @@ try
}
else
{
+ // Start the auxiliary machines if any.
+ //
+ // If anything goes wrong, force them all down (failed that, the machine
+ // destructor will block waiting for their exit).
+ //
+ auxiliary_machine_results amrs;
+
+ auto amg (
+ make_exception_guard (
+ [&amrs] ()
+ {
+ if (!amrs.empty ())
+ {
+ info << "trying to force auxiliary machines down";
+ stop_auxiliary_machines (amrs);
+ }
+ }));
+
+ if (!ams.empty ())
+ {
+ amrs = start_auxiliary_machines (ams, tm, in_name, pd, boost_cpus);
+
+ if (amrs.empty ())
+ return perform_task_result (move (arm), move (r)); // Abort.
+ }
+
+ // Note: tm.auxiliary_environment patched in by start_auxiliary_machines().
+ //
+ serialize_manifest (tm, tf, "task");
+
+ // Start the build machine and perform the build.
+ //
try_rmfile (rf);
try_rmfile (af);
@@ -1386,7 +2212,7 @@ try
//
tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" +
"Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n",
- ops.tftp_port () + offset);
+ ops.tftp_port () + offset + 0 /* build machine */);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
@@ -1394,17 +2220,21 @@ try
// we have to cleanup the snapshot ourselves in case of suspension or
// unexpected exit.
//
+ // NOTE: see similar code in start_auxiliary_machine() above.
+ //
{
// Start the machine.
//
unique_ptr<machine> m (
start_machine (xp,
mm.machine,
+ 0 /* machine_num (build) */,
+ boost_cpus ? *boost_cpus : ops.cpu (),
+ ops.build_ram (),
mm.machine.mac,
ops.bridge (),
tftpd.port (),
- tm.interactive.has_value (),
- boost_cpus));
+ tm.interactive.has_value () /* public_vnc */));
auto mg (
make_exception_guard (
@@ -1417,7 +2247,10 @@ try
}
}));
- auto soft_fail = [&trace, &ml, &xp, &m, &arm, &r] (const char* msg)
+ auto soft_fail = [&trace,
+ &amrs,
+ &ml, &xp, &m,
+ &arm, &r] (const char* msg)
{
{
diag_record dr (error);
@@ -1425,6 +2258,18 @@ try
m->print_info (dr);
}
+ // What should we do about auxiliary machines? We could force them
+ // all down before suspending (and thus freeing them for use). That
+ // is the easy option. We could suspend them as well, but that feels
+ // like it will be a pain (will need to resume all of them when done
+ // investigating). Theoretically we could just let them run, but
+ // that won't play well with our interrupt logic since someone may
+ // attempt to interrupt us via one of them. So let's do easy for
+ // now.
+ //
+ // Note: always stop/suspend the build machine before the auxiliary
+ // machines to avoid any errors due the auxiliary machines being
+ // unavailable.
try
{
// Update the information in the machine lock to signal that the
@@ -1433,8 +2278,10 @@ try
ml.suspend_task ();
m->suspend (false);
+ stop_auxiliary_machines (amrs);
m->wait (false);
m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
run_btrfs (trace, "subvolume", "delete", xp);
info << "resuming after machine suspension";
}
@@ -1460,7 +2307,29 @@ try
return false;
};
- auto check_interrupt = [&trace, &xp, &m] ()
+ auto check_auxiliary_machines = [&amrs] ()
+ {
+ for (auxiliary_machine_result& amr: amrs)
+ {
+ try
+ {
+ size_t t (0);
+ if (!amr.machine->wait (t /* seconds */, false /* fail_hard */))
+ continue;
+ }
+ catch (const failed&) {}
+
+ diag_record dr (warn);
+ dr << "machine " << amr.snapshot << " exited unexpectedly";
+ amr.machine->print_info (dr);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ auto check_interrupt = [&trace, &amrs, &xp, &m] ()
{
if (sigurs1.load (std::memory_order_consume) == 0)
return;
@@ -1468,6 +2337,7 @@ try
l2 ([&]{trace << "machine " << xp << " interruped";});
try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
m->cleanup ();
m = nullptr; // Disable exceptions guard above.
run_btrfs (trace, "subvolume", "delete", xp);
@@ -1497,8 +2367,13 @@ try
if (tftpd.serve (to, 2))
break;
- if (!check_machine ())
+ bool bm; // Build machine still running.
+ if (!(bm = check_machine ()) || !check_auxiliary_machines ())
{
+ if (bm)
+ try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
+ m = nullptr; // Disable exceptions guard above.
run_btrfs (trace, "subvolume", "delete", xp);
return perform_task_result (move (arm), move (r));
}
@@ -1516,6 +2391,7 @@ try
m->print_info (dr);
try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
continue;
}
@@ -1535,10 +2411,15 @@ try
if (tftpd.serve (to, 2))
continue;
- if (!check_machine ())
+ bool bm; // Build machine still running.
+ if (!(bm = check_machine ()) || !check_auxiliary_machines ())
{
- if (!file_not_empty (rf))
+ if (bm || !file_not_empty (rf))
{
+ if (bm)
+ try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
+ m = nullptr; // Disable exceptions guard above.
run_btrfs (trace, "subvolume", "delete", xp);
return perform_task_result (move (arm), move (r));
}
@@ -1594,7 +2475,9 @@ try
// lease instead of a new one.
//
try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
}
}
@@ -1755,17 +2638,20 @@ try
: standard_version (BBOT_VERSION_STR));
tc_id = ops.toolchain_id ();
- if (tc_num == 0 || tc_num > 99)
- fail << "invalid --toolchain-num value " << tc_num;
+ if (tc_num == 0 || tc_num > 9)
+ fail << "--toolchain-num value " << tc_num << " out of range";
inst = ops.instance ();
if (inst == 0 || inst > 99)
- fail << "invalid --instance value " << inst;
+ fail << "--instance value " << inst << " out of range";
inst_max = ops.instance_max ();
- offset = (tc_num - 1) * 100 + inst;
+ // The last decimal position is used for machine number, 0 for the build
+ // machine, non-0 for auxiliary machines (of which we can have maximum 9).
+ //
+ offset = (tc_num - 1) * 1000 + inst * 10;
// Controller priority to URLs map.
//
@@ -1917,7 +2803,7 @@ try
if (ops.interactive () != interactive_mode::false_)
{
imode = ops.interactive ();
- ilogin = machine_vnc (true /* public */);
+ ilogin = machine_vnc (0 /* machine_num */, true /* public */);
}
// Use the pkeyutl openssl command for signing the task response challenge
@@ -1959,25 +2845,29 @@ try
uint64_t prio_max (0);
bool prio_mon (false);
{
- uint16_t busy (0); // Number of machines locked by other processes.
- bool task (false); // There is a machine performing a task.
+ uint16_t busy (0); // Number of build machines locked by other processes.
+ bool task (false); // There is a build machine performing a task.
for (const bootstrapped_machine& m: ms)
{
if (!m.lock.locked ())
{
- ++busy;
-
if (m.lock.prio) // Not bootstrapping/suspended.
{
- task = true;
+ if (m.manifest.machine.effective_role () != machine_role::auxiliary)
+ {
+ ++busy;
+ task = true;
- if (prio_min > *m.lock.prio)
- prio_min = *m.lock.prio;
+ if (prio_min > *m.lock.prio)
+ prio_min = *m.lock.prio;
- if (prio_max < *m.lock.prio)
- prio_max = *m.lock.prio;
+ if (prio_max < *m.lock.prio)
+ prio_max = *m.lock.prio;
+ }
}
+ else
+ ++busy; // Assume build machine (see enumerate_machines()).
}
}
@@ -2067,15 +2957,19 @@ try
imode,
ilogin,
fingerprint,
- nullopt /* auxiliary_ram */, // @@ TMP AUXILIARY
+ ops.auxiliary_ram (),
machine_header_manifests {}};
// Determine which machines we need to offer for this priority.
//
+ bool aux_only (true); // Only auxiliary machines are available.
{
- bool interruptable (false);
+ bool interruptable (false); // There is build machine we can interrupt.
for (const bootstrapped_machine& m: ms)
{
+ const machine_manifest& mm (m.manifest.machine);
+ machine_role role (mm.effective_role ());
+
if (!m.lock.locked ())
{
if (!m.lock.prio) // Skip bootstrapping/suspended.
@@ -2093,18 +2987,18 @@ try
if ((prio / 100) <= (eprio / 100))
continue;
- interruptable = true;
+ if (role != machine_role::auxiliary)
+ interruptable = true;
}
- tq.machines.emplace_back (m.manifest.machine.id,
- m.manifest.machine.name,
- m.manifest.machine.summary,
- //
- // @@ TMP AUXILIARY
- //
- nullopt /* role */,
- nullopt /* ram_minimum */,
- nullopt /* ram_maximum */);
+ tq.machines.emplace_back (mm.id,
+ mm.name,
+ mm.summary,
+ role,
+ effective_ram_minimum (mm),
+ mm.ram_maximum);
+
+ aux_only = aux_only && role == machine_role::auxiliary;
}
// Sanity check: in the priority monitor mode we should only ask for a
@@ -2122,10 +3016,13 @@ try
return 0;
}
+ if (aux_only)
+ tq.machines.clear ();
+
if (tq.machines.empty ())
{
- // If we have no machines for this priority then we won't have any
- // for any lower priority so bail out.
+ // If we have no build machines for this priority then we won't have
+ // any for any lower priority so bail out.
//
break;
}
@@ -2133,7 +3030,7 @@ try
// Send task requests.
//
// Note that we have to do it while holding the lock on all the machines
- // since we don't know which machine we will need.
+ // since we don't know which machine(s) we will need.
//
vector<strings::const_iterator> rurls (urls.size ());
std::iota (rurls.begin (), rurls.end (), urls.begin ());
@@ -2174,9 +3071,8 @@ try
"--max-time", ops.request_timeout (),
"--connect-timeout", ops.connect_timeout ());
- // This is tricky/hairy: we may fail hard parsing the output
- // before seeing that curl exited with an error and failing
- // softly.
+ // This is tricky/hairy: we may fail hard parsing the output before
+ // seeing that curl exited with an error and failing softly.
//
bool f (false);
@@ -2259,7 +3155,7 @@ try
} // prio loop.
- if (tq.machines.empty ()) // No machines.
+ if (tq.machines.empty ()) // No machines (auxiliary-only already handled).
{
// Normally this means all the machines are busy so sleep a bit less.
//
@@ -2279,15 +3175,67 @@ try
//
task_manifest& t (*tr.task);
- // First verify the requested machine is one of those we sent in tq.
+ // First verify the requested machines are from those we sent in tq and
+ // their roles match.
+ //
+ // Also verify the same machine is not picked multiple times by blanking
+ // out the corresponding entry in tq.machines. (Currently we are only
+ // capable of running one instance of each machine though we may want to
+ // relax that in the future, at which point we should send as many entries
+ // for the same machine in the task request as we are capable of running,
+ // applying the priority logic for each entry, etc).
//
- if (find_if (tq.machines.begin (), tq.machines.end (),
- [&t] (const machine_header_manifest& mh)
- {
- return mh.name == t.machine; // Yes, names, not ids.
- }) == tq.machines.end ())
{
- error << "task from " << url << " for unknown machine " << t.machine;
+ auto check = [&tq, &url] (const string& name, machine_role r)
+ {
+ auto i (find_if (tq.machines.begin (), tq.machines.end (),
+ [&name] (const machine_header_manifest& mh)
+ {
+ return mh.name == name; // Yes, names, not ids.
+ }));
+
+ if (i == tq.machines.end ())
+ {
+ error << "task from " << url << " for unknown machine " << name;
+ return false;
+ }
+
+ if (i->effective_role () != r)
+ {
+ error << "task from " << url << " with mismatched role "
+ << " for machine " << name;
+ return false;
+ }
+
+ i->name.clear (); // Blank out.
+
+ return true;
+ };
+
+ auto check_aux = [&check] (const vector<auxiliary_machine>& ams)
+ {
+ for (const auxiliary_machine& am: ams)
+ if (!check (am.name, machine_role::auxiliary))
+ return false;
+ return true;
+ };
+
+ if (!check (t.machine, machine_role::build) ||
+ !check_aux (t.auxiliary_machines))
+ {
+ if (ops.dump_task ())
+ return 0;
+
+ continue;
+ }
+ }
+
+ // Also verify there are no more than 9 auxiliary machines (see the offset
+ // global variable for details).
+ //
+ if (t.auxiliary_machines.size () > 9)
+ {
+ error << "task from " << url << " with more than 9 auxiliary machines";
if (ops.dump_task ())
return 0;
@@ -2325,21 +3273,42 @@ try
// feels like the most sensible option).
//
perform_task_result r;
- bootstrapped_machine* pm (nullptr);
+ bootstrapped_machine* pm (nullptr); // Build machine.
+ vector<bootstrapped_machine*> ams; // Auxiliary machines.
try
{
- // Next find the corresponding bootstrapped_machine instance in ms. Also
- // unlock all the other machines.
+ // First find the bootstrapped_machine instance in ms corresponding to
+ // the requested build machine. Also unlock all the other machines.
//
// While at it also see if we need to interrupt the selected machine (if
// busy), one of the existing (if we are at the max allowed instances,
// that is in the priority monitor mode), or all existing (if this is a
// priority level 4 task).
//
- vector<bootstrapped_machine*> ims;
+ // Auxiliary machines complicate the matter a bit: we may now need to
+ // interrupt some subset of {build machine, auxiliary machines} that are
+ // necessary to perform this task. Note, however, that auxiliary
+ // machines are always subordinate to build machines, meaning that if
+ // there is a busy auxiliary machine, then there will be a busy build
+ // machine with the same pid/priority (and so if we interrup one
+ // auxiliary, then we will also interrupt the corresponding build plus
+ // any other auxiliaries it may be using). Based on that let's try to
+ // divide and conquer this by first dealing with build machines and then
+ // adding any auxiliary ones.
+ //
+ vector<bootstrapped_machine*> ims; // Machines to be interrupted.
+ size_t imt (0); // Number of "target" machines to interrupt (see below).
+
+ // First pass: build machines.
+ //
for (bootstrapped_machine& m: ms)
{
- if (m.manifest.machine.name == t.machine)
+ const machine_manifest& mm (m.manifest.machine);
+
+ if (mm.effective_role () == machine_role::auxiliary)
+ continue;
+
+ if (mm.name == t.machine)
{
assert (pm == nullptr); // Sanity check.
pm = &m;
@@ -2367,16 +3336,71 @@ try
}
}
- assert (pm != nullptr);
+ assert (pm != nullptr); // Sanity check.
if (!pm->lock.locked ())
{
+ assert (pm->lock.prio); // Sanity check (not bootstrapping/suspended).
+
if (prio >= 1000)
ims.insert (ims.begin (), pm); // Interrupt first (see below).
else
ims = {pm};
+
+ imt++;
}
+ // Second pass: auxiliary machines.
+ //
+ for (bootstrapped_machine& m: ms)
+ {
+ const machine_manifest& mm (m.manifest.machine);
+
+ if (mm.effective_role () != machine_role::auxiliary)
+ continue;
+
+ if (find_if (t.auxiliary_machines.begin (), t.auxiliary_machines.end (),
+ [&mm] (const auxiliary_machine& am)
+ {
+ return am.name == mm.name;
+ }) != t.auxiliary_machines.end ())
+ {
+ if (!m.lock.locked ())
+ {
+ assert (m.lock.prio); // Sanity check (not bootstrapping/suspended).
+
+ if (ims.empty ())
+ {
+ ims.push_back (&m);
+ }
+ else if (ims.front () == pm)
+ {
+ ims.insert (ims.begin () + 1, &m); // Interrupt early (see below).
+ }
+ else if (prio < 1000 && prio_mon && ams.empty () /* first */)
+ {
+ // Tricky: replace the lowest priority task we have picked on
+ // the first pass with this one.
+ //
+ assert (ims.size () == 1); // Sanity check.
+ ims.back () = &m;
+ }
+ else
+ ims.insert (ims.begin (), &m); // Interrupt first (see below).
+
+ imt++;
+ }
+
+ ams.push_back (&m);
+ }
+ else if (m.lock.locked ())
+ m.lock.unlock ();
+ }
+
+ // Note: the order of machines may not match.
+ //
+ assert (ams.size () == t.auxiliary_machines.size ()); // Sanity check.
+
assert (!prio_mon || !ims.empty ()); // We should have at least one.
// Move the toolchain lock into this scope so that it's automatically
@@ -2389,23 +3413,26 @@ try
// Interrupt the machines, if necessary.
//
// Note that if we are interrupting multiple machines, then the target
- // machine, if needs to be interrupted, must be first. This way if we
- // are unable to successfully interrupt it, we don't interrupt the rest.
+ // build machine, if needs to be interrupted, must be first, followed
+ // but all the target auxiliary machines. This way if we are unable to
+ // successfully interrupt them, we don't interrupt the rest.
//
- for (bootstrapped_machine* im: ims)
+ vector<pid_t> pids; // Avoid re-interrupting the same pid.
+ for (size_t i (0); i != ims.size (); ++i)
{
- bool first (im == ims.front ());
+ bootstrapped_machine* im (ims[i]);
// Sanity checks.
//
assert (!im->lock.locked () && im->lock.prio);
- assert (im != pm || first);
+ assert (im != pm || i == 0);
const dir_path& tp (im->path); // -<toolchain> path.
+ pid_t pid (im->lock.pid);
l2 ([&]{trace << "interrupting "
- << (im == pm ? "target" : "lower priority")
- << " machine " << tp << ", pid " << im->lock.pid;});
+ << (i < imt ? "target" : "lower priority")
+ << " machine " << tp << ", pid " << pid;});
// The plan is to send the interrupt and then wait for the lock.
//
@@ -2417,21 +3444,26 @@ try
// But what can happen is the other task becomes suspended, which we
// will not be able to interrupt.
//
- if (kill (im->lock.pid, SIGUSR1) == -1)
+ if (find (pids.begin (), pids.end (), pid) == pids.end ())
{
- // Ignore the case where there is no such process (e.g., the other
- // process has terminated in which case the lock should be released
- // automatically).
- //
- if (errno != ESRCH)
- throw_generic_error (errno);
+ if (kill (pid, SIGUSR1) == -1)
+ {
+ // Ignore the case where there is no such process (e.g., the other
+ // process has terminated in which case the lock should be
+ // released automatically).
+ //
+ if (errno != ESRCH)
+ throw_generic_error (errno);
+ }
+
+ pids.push_back (pid);
}
- // If we are interrupting multiple machine, there is no use acquiring
- // the lock (or failing if unable to) for subsequent machines since
- // this is merely a performance optimization.
+ // If we are interrupting additional machine in order to free up
+ // resources, there is no use acquiring their lock (or failing if
+ // unable to) since this is merely a performance optimization.
//
- if (!first)
+ if (i >= imt)
continue;
// Try to lock the machine.
@@ -2453,7 +3485,7 @@ try
if (ml.locked ())
break;
- if (ml.pid != im->lock.pid)
+ if (ml.pid != pid)
{
error << "interrupted machine " << tp << " changed pid";
throw interrupt ();
@@ -2473,26 +3505,27 @@ try
throw interrupt ();
}
- // If the interrupted machine is what we will use, see if it needs a
- // re-bootstrap, the same as in enumerate_machines(). If not, then
- // transfer the bootstrap manifest and lock.
+ // This is an interrupted machine (build or auxiliary) that we will be
+ // using. See if it needs a re-bootstrap, the same as in
+ // enumerate_machines(). If not, then transfer the bootstrap manifest
+ // and lock.
//
- if (im == pm)
- {
- const machine_manifest& mm (im->manifest.machine);
+ const machine_manifest& mm (im->manifest.machine);
- bootstrapped_machine_manifest bmm (
- parse_manifest<bootstrapped_machine_manifest> (
- tp / "manifest", "bootstrapped machine"));
+ bootstrapped_machine_manifest bmm (
+ parse_manifest<bootstrapped_machine_manifest> (
+ tp / "manifest", "bootstrapped machine"));
- bool rb (false);
+ bool rb (false);
- if (bmm.machine.id != mm.id)
- {
- l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";});
- rb = true;
- }
+ if (bmm.machine.id != mm.id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";});
+ rb = true;
+ }
+ if (im == pm) // Only for build machine.
+ {
if (!tc_id.empty () && bmm.toolchain.id != tc_id)
{
l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
@@ -2512,15 +3545,15 @@ try
rb = true;
}
}
+ }
- // We are not going to try to re-bootstrap this machine "inline".
- //
- if (rb)
- throw interrupt ();
+ // We are not going to try to re-bootstrap this machine "inline".
+ //
+ if (rb)
+ throw interrupt ();
- im->manifest = move (bmm);
- im->lock = move (ml);
- }
+ im->manifest = move (bmm);
+ im->lock = move (ml);
}
// Check if we need to boost the number of CPUs to the full hardware
@@ -2530,8 +3563,11 @@ try
if (prio >= 10000)
bcpus = std::thread::hardware_concurrency ();
- pm->lock.perform_task (tl, prio);
- r = perform_task (move (tl), pm->lock, pm->path, pm->manifest, t, bcpus);
+ pm->lock.perform_task (tl, prio); // Build machine.
+ for (bootstrapped_machine* am: ams) // Auxiliary machines.
+ am->lock.perform_task (tl, prio);
+
+ r = perform_task (move (tl), *pm, ams, t, bcpus);
}
catch (const interrupt&)
{
@@ -2548,8 +3584,14 @@ try
nullopt /* dependency_checksum */});
}
+ // No need to hold the locks any longer.
+ //
if (pm != nullptr && pm->lock.locked ())
- pm->lock.unlock (); // No need to hold the lock any longer.
+ pm->lock.unlock ();
+
+ for (bootstrapped_machine* am: ams)
+ if (am->lock.locked ())
+ am->lock.unlock ();
result_manifest& rm (r.manifest);
diff --git a/bbot/agent/agent.hxx b/bbot/agent/agent.hxx
index 72b819b..9c8400f 100644
--- a/bbot/agent/agent.hxx
+++ b/bbot/agent/agent.hxx
@@ -22,14 +22,14 @@ namespace bbot
extern standard_version tc_ver; // Toolchain version.
extern string tc_id; // Toolchain id.
- extern uint16_t inst; // Instance number.
+ extern uint16_t inst; // Instance number (1-based).
extern string hname; // Our host name.
extern string hip; // Our IP address.
extern uid_t uid; // Our effective user id.
extern string uname; // Our effective user name.
- extern uint16_t offset; // Agent offset.
+ extern uint16_t offset; // Agent offset (10-9990; used for ports).
// Random number generator (currently not MT-safe and limited to RAND_MAX).
//
diff --git a/bbot/agent/machine.cxx b/bbot/agent/machine.cxx
index 2d9ad4f..74c9b93 100644
--- a/bbot/agent/machine.cxx
+++ b/bbot/agent/machine.cxx
@@ -83,9 +83,9 @@ namespace bbot
}
static string
- create_tap (const string& br, uint16_t port)
+ create_tap (const string& br, uint16_t machine_num, uint16_t port)
{
- string t ("tap" + to_string (offset));
+ string t ("tap" + to_string (offset + machine_num));
tracer trace ("create_tap", t.c_str ());
@@ -126,8 +126,10 @@ namespace bbot
string bridge; // Bridge interface to which this tap belongs
uint16_t port; // UDP port to forward TFTP traffic to.
- tap (string b, uint16_t p)
- : iface (create_tap (b, p)), bridge (move (b)), port (p) {}
+ tap (string b, uint16_t machine_num, uint16_t p)
+ : iface (create_tap (b, machine_num, p)),
+ bridge (move (b)),
+ port (p) {}
~tap ()
{
@@ -169,11 +171,13 @@ namespace bbot
public:
kvm_machine (const dir_path&,
const machine_manifest&,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
- bool pub_vnc,
- optional<size_t> boost_cpus = nullopt);
+ bool pub_vnc);
virtual bool
shutdown (size_t& seconds) override;
@@ -214,31 +218,47 @@ namespace bbot
kvm_machine::
kvm_machine (const dir_path& md,
const machine_manifest& mm,
+ uint16_t m_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& omac,
const string& br,
- uint16_t port,
- bool pub_vnc,
- optional<size_t> bcpus)
+ uint16_t tftp_port,
+ bool pub_vnc)
: machine (mm.mac ? *mm.mac : // Fixed mac from machine manifest.
omac ? *omac : // Generated mac from previous bootstrap.
generate_mac ()),
kvm ("kvm"),
- net (br, port),
- vnc (machine_vnc (pub_vnc)),
+ net (br, m_num, tftp_port),
+ vnc (machine_vnc (m_num, pub_vnc)),
monitor ("/tmp/monitor-" + tc_name + '-' + to_string (inst))
{
tracer trace ("kvm_machine", md.string ().c_str ());
+ // Monitor path.
+ //
+ if (m_num != 0)
+ {
+ monitor += '-';
+ monitor += to_string (m_num);
+ }
+
if (sizeof (sockaddr_un::sun_path) <= monitor.size ())
throw invalid_argument ("monitor unix socket path too long");
// Machine name.
//
// While we currently can only have one running machine per toolchain, add
- // the instance number for debuggability.
+ // the instance number and non-0 machine number for debuggability.
//
string name (mm.name + '-' + tc_name + '-' + to_string (inst));
+ if (m_num != 0)
+ {
+ name += '-';
+ name += to_string (m_num);
+ }
+
// Machine log. Note that it is only removed with an explicit cleanup()
// call.
//
@@ -252,7 +272,7 @@ namespace bbot
// Note that for best results you may want to adjust (e.g., by over-
// committing) the number of CPUs to be power of 2.
//
- size_t cpus (bcpus ? *bcpus : ops.cpu ()), cores (cpus);
+ size_t cores (cpus);
size_t sockets (cores >= 256 && cores % 8 == 0 ? 4 :
cores >= 128 && cores % 4 == 0 ? 2 : 1);
@@ -261,26 +281,6 @@ namespace bbot
size_t threads (cores >= 16 && cores % 4 == 0 ? 2 : 1);
cores /= threads;
- // We probably don't want to commit all the available RAM to the VM since
- // some of it could be used on the host side for caching, etc. So the
- // heuristics that we will use is 4G or 1G per CPU, whichever is greater
- // and the rest divide equally between the host and the VM.
- //
- // But the experience showed that we actually want to be able to precisely
- // control the amount of RAM assigned to VMs (e.g., for tmpfs size) and
- // without back-fudging for this heuristics.
- //
-#if 0
- size_t ram ((cpus < 4 ? 4 : cpus) * 1024 * 1024); // Kb.
-
- if (ram > ops.ram ())
- ram = ops.ram ();
- else
- ram += (ops.ram () - ram) / 2;
-#else
- size_t ram (ops.build_ram ());
-#endif
-
// If we have options, use that instead of the default network and
// disk configuration.
//
@@ -434,7 +434,7 @@ namespace bbot
// collision-wise with anything useful.
//
"-vnc",
- (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset), // 5900 + offset
+ (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset + m_num), // 5900-base
// QMP.
//
@@ -672,31 +672,37 @@ namespace bbot
unique_ptr<machine>
start_machine (const dir_path& md,
const machine_manifest& mm,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
- bool pub_vnc,
- optional<size_t> bcpus)
+ bool pub_vnc)
{
+ assert (machine_num < 10);
+
switch (mm.type)
{
case machine_type::kvm:
return make_unique<kvm_machine> (
- md, mm, mac, br_iface, tftp_port, pub_vnc, bcpus);
+ md, mm, machine_num, cpus, ram, mac, br_iface, tftp_port, pub_vnc);
case machine_type::nspawn:
- assert (false); //@@ TODO
+ assert (false); // @@ TODO
}
return nullptr;
}
string
- machine_vnc (bool pub)
+ machine_vnc (uint16_t num, bool pub)
{
+ assert (num < 10);
+
string r (pub ? hip : "127.0.0.1");
r += ':';
- r += to_string (5900 + offset);
+ r += to_string (5900 + offset + num);
return r;
}
}
diff --git a/bbot/agent/machine.hxx b/bbot/agent/machine.hxx
index 0bb74b9..13646db 100644
--- a/bbot/agent/machine.hxx
+++ b/bbot/agent/machine.hxx
@@ -78,20 +78,28 @@ namespace bbot
class machine_manifest;
+ // The machine number should be between 0-9 with 0 for the build machine and
+ // 1-9 for the auxiliary machines.
+ //
+ // Note that tftp_port is not a base (in other words, it is expected to
+ // already be appropriately offset).
+ //
unique_ptr<machine>
start_machine (const dir_path&,
const machine_manifest&,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram, // In KiB.
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
- bool pub_vnc,
- optional<size_t> boost_cpus = nullopt);
+ bool public_vnc);
// Return the machine's public or private VNC session endpoint in the
// '<ip>:<port>' form.
//
string
- machine_vnc (bool pub_vnc);
+ machine_vnc (uint16_t machine_num, bool public_vnc);
}
#endif // BBOT_AGENT_MACHINE_HXX
diff --git a/bbot/bbot-agent@.service b/bbot/bbot-agent@.service
index d379b3c..253cc61 100644
--- a/bbot/bbot-agent@.service
+++ b/bbot/bbot-agent@.service
@@ -23,6 +23,7 @@ Environment=AUTH_KEY=
Environment=INTERACTIVE=false
Environment=BOOTSTRAP_TIMEOUT=3600
+Environment=BOOTSTRAP_AUXILIARY=900
Environment=BOOTSTRAP_RETRIES=2
Environment=BUILD_TIMEOUT=5400
@@ -56,6 +57,7 @@ ExecStart=/build/bots/default/bin/bbot-agent \
--auth-key ${AUTH_KEY} \
--interactive ${INTERACTIVE} \
--bootstrap-timeout ${BOOTSTRAP_TIMEOUT} \
+ --bootstrap-auxiliary ${BOOTSTRAP_AUXILIARY} \
--bootstrap-retries ${BOOTSTRAP_RETRIES} \
--build-timeout ${BUILD_TIMEOUT} \
--build-retries ${BUILD_RETRIES} \
diff --git a/bbot/buildfile b/bbot/buildfile
index cb7b576..bbca810 100644
--- a/bbot/buildfile
+++ b/bbot/buildfile
@@ -99,7 +99,7 @@ if $cli.configured
# Usage options.
#
cli.options += --suppress-undocumented --long-usage --ansi-color \
---page-usage 'bbot::print_$name$_' --option-length 23
+--page-usage 'bbot::print_$name$_' --option-length 25
agent/cli.cxx{agent-options}: cli.options += --include-prefix bbot/agent \
--guard-prefix BBOT_AGENT
diff --git a/bbot/utility.hxx b/bbot/utility.hxx
index 9bc517c..7758db4 100644
--- a/bbot/utility.hxx
+++ b/bbot/utility.hxx
@@ -37,6 +37,7 @@ namespace bbot
// <libbutl/utility.hxx>
//
using butl::icasecmp;
+ using butl::sanitize_identifier;
using butl::reverse_iterate;
using butl::make_guard;