aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2023-05-14 12:02:53 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2023-05-14 12:03:35 +0200
commit46ea50772238213fd1696b9d0094714c77faed9c (patch)
treed66aa91d95ec4b18a639f696037546efdeab32c9
parent67d3980a2aefaa9962c3011804171fc529dc5b2b (diff)
Handle SIGUSR1 interrupt during perform_task()
Also add a few missing snapshot cleanups (after suspension and unexpected exit).
-rw-r--r--bbot/agent/agent.cxx133
1 files changed, 101 insertions, 32 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index d58743b..1647fcf 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -21,6 +21,7 @@
#include <sys/socket.h>
#include <map>
+#include <atomic>
#include <chrono>
#include <random>
#include <iomanip> // setw()
@@ -51,6 +52,34 @@ using namespace bbot;
using std::cout;
using std::endl;
+// According to the standard, atomic's use in the signal handler is only safe
+// if it's lock-free.
+//
+#if !defined(ATOMIC_INT_LOCK_FREE) || ATOMIC_INT_LOCK_FREE != 2
+#error int is not lock-free on this architecture
+#endif
+
+// While we can use memory_order_relaxed in a single-threaded program, let's
+// use consume/release in case this process becomes multi-threaded in the
+// future.
+//
+static std::atomic<unsigned int> sigurs1;
+
+using std::memory_order_consume;
+using std::memory_order_release;
+
+extern "C" void
+handle_signal (int sig)
+{
+ switch (sig)
+ {
+ case SIGHUP: exit (3); // Unimplemented feature.
+ case SIGTERM: exit (0);
+ case SIGUSR1: sigurs1.fetch_add (1, std::memory_order_release); break;
+ default: assert (false);
+ }
+}
+
namespace bbot
{
agent_options ops;
@@ -224,6 +253,8 @@ bootstrap_machine (const dir_path& md,
m->wait (false);
m->cleanup ();
info << "resuming after machine suspension";
+
+ // Note: snapshot cleaned up by the caller of bootstrap_machine().
}
catch (const failed&) {}
@@ -274,7 +305,10 @@ bootstrap_machine (const dir_path& md,
break;
if (!check_machine ())
+ {
+ // Note: snapshot cleaned up by the caller of bootstrap_machine().
return nullopt;
+ }
}
// This can mean two things: machine mis-configuration or what we
@@ -317,7 +351,10 @@ bootstrap_machine (const dir_path& md,
// The exit/upload is racy so we re-check.
//
if (!(file_not_empty (mf) || file_not_empty (mfo)))
+ {
+ // Note: snapshot cleaned up by the caller of bootstrap_machine().
return nullopt;
+ }
}
bool old (false);
@@ -1211,7 +1248,12 @@ try
{
tracer trace ("perform_task", md.string ().c_str ());
- tl.unlock (); // @@ TMP (we have to arm the signal handler under lock).
+ // Arm the interrupt handler and release the global toolchain lock.
+ //
+ // Note that there can be no interrupt while we are holding the global lock.
+ //
+ sigurs1.store (0, std::memory_order_release);
+ tl.unlock ();
result_manifest r {
tm.name,
@@ -1261,6 +1303,8 @@ try
if (ops.fake_machine_specified ())
{
+ // Note: not handling interrupts here.
+
// Simply wait for the file to appear.
//
for (size_t i (0);; sleep (1))
@@ -1335,28 +1379,33 @@ try
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
- // Start the machine.
- //
- unique_ptr<machine> m (
- start_machine (xp,
- mm.machine,
- mm.machine.mac,
- ops.bridge (),
- tftpd.port (),
- tm.interactive.has_value ()));
-
- // Note: the machine handling logic is similar to bootstrap.
+ // Note: the machine handling logic is similar to bootstrap. Except here
+ // we have to cleanup the snapshot ourselves in case of suspension or
+ // unexpected exit.
//
{
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ mm.machine.mac,
+ ops.bridge (),
+ tftpd.port (),
+ tm.interactive.has_value ()));
+
auto mg (
make_exception_guard (
[&m, &xp] ()
{
- info << "trying to force machine " << xp << " down";
- try {m->forcedown (false);} catch (const failed&) {}
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
}));
- auto soft_fail = [&ml, &xp, &m, &r] (const char* msg)
+ auto soft_fail = [&trace, &ml, &xp, &m, &r] (const char* msg)
{
{
diag_record dr (error);
@@ -1374,6 +1423,7 @@ try
m->suspend (false);
m->wait (false);
m->cleanup ();
+ run_btrfs (trace, "subvolume", "delete", xp);
info << "resuming after machine suspension";
}
catch (const failed&) {}
@@ -1389,9 +1439,7 @@ try
if (!m->wait (t /* seconds */, false /* fail_hard */))
return true;
}
- catch (const failed&)
- {
- }
+ catch (const failed&) {}
diag_record dr (warn);
dr << "machine " << xp << " exited unexpectedly";
@@ -1400,6 +1448,22 @@ try
return false;
};
+ auto check_interrupt = [&trace, &xp, &m] ()
+ {
+ if (sigurs1.load (std::memory_order_consume) == 0)
+ return;
+
+ // @@ l3
+ l1 ([&]{trace << "machine " << xp << " interruped";});
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ throw interrupt ();
+ };
+
// The first request should be the task manifest download. Wait for up
// to 2 minutes for that to arrive (again, that long to deal with
// flaky Windows networking). In a sense we use it as an indication
@@ -1411,15 +1475,21 @@ try
? ops.intactive_timeout ()
: ops.build_timeout ());
- // Wait periodically making sure the machine is still alive.
+ // Wait periodically making sure the machine is still alive and
+ // checking for interrupts.
//
for (to = startup_to; to != 0; )
{
+ check_interrupt ();
+
if (tftpd.serve (to, 2))
break;
if (!check_machine ())
+ {
+ run_btrfs (trace, "subvolume", "delete", xp);
return r;
+ }
}
if (to == 0)
@@ -1448,13 +1518,18 @@ try
//
for (to = build_to; to != 0; )
{
+ check_interrupt ();
+
if (tftpd.serve (to, 2))
continue;
if (!check_machine ())
{
if (!file_not_empty (rf))
+ {
+ run_btrfs (trace, "subvolume", "delete", xp);
return r;
+ }
}
if (file_not_empty (rf))
@@ -1490,6 +1565,10 @@ try
// while uploading the archive and so the partially uploaded file
// may exist. Thus, we check if the result status is not an error.
//
+ // Note also that we will not bother with interrupting this process
+ // assuming it will be quick (relative to the amount of work that
+ // would be wasted).
+ //
bool err (!rm || !rm->status);
if (!err && file_exists (af))
{
@@ -1579,7 +1658,7 @@ try
}
}
- // Update package name/version if the returned value as "unknown".
+ // Update package name/version if the returned value is "unknown".
//
if (r.version == bpkg::version ("0"))
{
@@ -1596,17 +1675,6 @@ catch (const system_error& e)
fail << "build error: " << e << endf;
}
-extern "C" void
-handle_signal (int sig)
-{
- switch (sig)
- {
- case SIGHUP: exit (3); // Unimplemented feature.
- case SIGTERM: exit (0);
- default: assert (false);
- }
-}
-
static const string agent_checksum ("2"); // Logic version.
int
@@ -1807,7 +1875,8 @@ try
// Handle SIGHUP and SIGTERM.
//
if (signal (SIGHUP, &handle_signal) == SIG_ERR ||
- signal (SIGTERM, &handle_signal) == SIG_ERR)
+ signal (SIGTERM, &handle_signal) == SIG_ERR ||
+ signal (SIGUSR1, &handle_signal) == SIG_ERR)
fail << "unable to set signal handler: "
<< system_error (errno, std::generic_category ()); // Sanitize.