// file : bbot/agent.cxx -*- C++ -*- // copyright : Copyright (c) 2014-2017 Code Synthesis Ltd // license : TBC; see accompanying LICENSE file #include #include // getpwuid() #include // PATH_MAX #include // signal() #include // rand_r() #include // sleep(), realink(), getuid() #include // ifreq #include // sockaddr_in #include // inet_ntop() #include #include #include #include #include #include // dir_iterator #include #include #include #include #include #include #include #include using namespace std; using namespace butl; using namespace bbot; namespace bbot { agent_options ops; const string bs_prot ("1"); string tc_name; size_t tc_num; string tc_id; string hname; uid_t uid; string uname; } // The btrfs tool likes to print informational messages, like "Created // snapshot such and such". Luckily, it writes them to stdout while proper // diagnostics to stderr. // template inline void run_btrfs (tracer& t, A&&... a) { if (verb >= 4) run_io (t, fdnull (), 2, 2, "btrfs", forward (a)...); else run_io (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } template inline butl::process_exit::code_type btrfs_exit (tracer& t, A&&... a) { return verb >= 4 ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward (a)...) : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } // Bootstrap the machine. Return the bootstrapped machine manifest if // successful and nullopt otherwise (in which case the machine directory // should be cleaned and the machine ignored for now). // static optional bootstrap_machine (const dir_path& md, const machine_manifest& mm, optional obmm) { tracer trace ("bootstrap_machine"); bootstrapped_machine_manifest r { mm, toolchain_manifest {tc_id.empty () ? "bogus" : tc_id}, bootstrap_manifest { bootstrap_manifest::versions_type { {"bbot", BBOT_VERSION}, {"libbbot", LIBBBOT_VERSION}, {"libbpkg", LIBBPKG_VERSION}, {"libbutl", LIBBUTL_VERSION} } } }; if (ops.fake_bootstrap ()) { r.machine.mac = "de:ad:be:ef:de:ad"; } else try { string br ("br1"); // Using private bridge for now. // Start the TFTP server (server chroot is --tftp). Map: // // GET requests to .../toolchains//* // PUT requests to .../bootstrap//* // auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name); try_mkdir_p (arm.path ()); // Bootstrap result manifest. // path mf (arm.path () / "manifest"); try_rmfile (mf); tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n"); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // unique_ptr m ( start_machine (md, mm, obmm ? obmm->machine.mac : nullopt, br, tftpd.port ())); { // If we are terminating with an exception then force the machine down. // Failed that, the machine's destructor will block waiting for its // completion. // auto mg ( make_exception_guard ( [&m, &md] () { info << "trying to force machine " << md << " down"; try {m->forcedown ();} catch (const failed&) {} })); // What happens if the bootstrap process hangs? The simple thing would // be to force the machine down after some timeout and then fail. But // that won't be very helpful for investigating the cause. So instead // the plan is to suspend it after some timeout, issue diagnostics // (without failing and which Build OS monitor will relay to the admin), // and wait for the external intervention. // auto soft_fail = [&md, &m] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << md << ", suspending"; m->print_info (dr); } m->suspend (); m->wait (); return nullopt; }; // The first request should be the toolchain download. Wait for up to 5 // minutes for that to arrive. In a sense we use it as an indication // that the machine has booted and the bootstrap process has started. // Why wait so long you may wonder? Well, we may be using a new MAC // address and operating systems like Windows may need to digest that. // size_t to; const size_t startup_to (5 * 60); const size_t bootstrap_to (ops.bootstrap_timeout ()); const size_t shutdown_to (5 * 60); if (!tftpd.serve ((to = startup_to))) return soft_fail ("bootstrap startup timeout"); l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the bootstrap process may download additional toolchain // archives, build things, and then upload the result manifest. So on // our side we serve TFTP requests while periodically checking for the // manifest file. // for (to = bootstrap_to; to != 0 && !file_exists (mf); tftpd.serve (to)) ; if (to == 0) return soft_fail ("bootstrap timeout"); l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); // Shut the machine down cleanly. // if (!m->shutdown ((to = shutdown_to))) return soft_fail ("bootstrap shutdown timeout"); l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); } // Parse the result manifest. // r.bootstrap = parse_manifest (mf, "bootstrap"); r.machine.mac = m->mac; // Save the MAC address. } catch (const system_error& e) { fail << "bootstrap error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } // Return available machines and their directories as a parallel array. // static pair enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines"); bootstrapped_machine_manifests rm; dir_paths rd; if (ops.fake_machine_specified ()) { auto mh ( parse_manifest ( ops.fake_machine (), "machine header")); rm.push_back ( bootstrapped_machine_manifest { machine_manifest { mh.id, mh.name, mh.summary, machine_type::kvm, string ("de:ad:be:ef:de:ad"), nullopt}, toolchain_manifest {tc_id}, bootstrap_manifest {} }); rd.push_back (dir_path (ops.machines ()) /= mh.name); // For diagnostics. return make_pair (move (rm), move (rd)); } // The first level are machine volumes. // for (const dir_entry& ve: dir_iterator (machines)) { const string vn (ve.path ().string ()); // Ignore hidden directories. // if (ve.type () != entry_type::directory || vn[0] == '.') continue; const dir_path vd (dir_path (machines) /= vn); // Inside we have machines. // try { for (const dir_entry& me: dir_iterator (vd)) { const string mn (me.path ().string ()); if (me.type () != entry_type::directory || mn[0] == '.') continue; const dir_path md (dir_path (vd) /= mn); // Our endgoal here is to obtain a bootstrapped snapshot of this // machine while watching out for potential race conditions (machines // being added/upgraded/removed; see the manual for details). // // So here is our overall plan: // // 1. Resolve current subvolume link for our bootstrap protocol. // // 2. If there is no link, cleanup and ignore this machine. // // 3. Try to create a snapshot of current subvolume (this operation is // atomic). If failed (e.g., someone changed the link and removed // the subvolume in the meantime), retry from #1. // // 4. Compare the snapshot to the already bootstrapped version (if // any) and see if we need to re-bootstrap. If so, use the snapshot // as a starting point. Rename to bootstrapped at the end (atomic). // dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - bool te (dir_exists (tp)); auto delete_t = [&tp, &trace] () { run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); run_btrfs (trace, "subvolume", "delete", tp); }; for (size_t retry (0);; ++retry) { if (retry != 0) sleep (1); // Resolve the link to subvolume path. // dir_path sp; // -

. try { char b [PATH_MAX + 1]; ssize_t r (readlink (lp.string ().c_str (), b, sizeof (b))); if (r == -1) { if (errno != ENOENT) throw_generic_error (errno); } else if (static_cast (r) >= sizeof (b)) throw_generic_error (EINVAL); else { b[r] = '\0'; sp = dir_path (b); if (sp.relative ()) sp = md / sp; } } catch (const system_error& e) { fail << "unable to read subvolume link " << lp << ": " << e; } // If the resolution fails, then this means there is no current // machine subvolume (for this bootstrap protocol). In this case we // clean up our toolchain subvolume (-) and ignore // this machine. // if (sp.empty ()) { if (te) delete_t (); l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } // -- // const dir_path xp ( dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { if (retry >= 10) fail << "unable to snapshot subvolume " << sp; continue; } // Load the (original) machine manifest. // auto mm ( parse_manifest (sp / "manifest", "machine")); // If we already have -, see if it needs to be re- // bootstrapped. Things that render it obsolete: // // 1. New machine revision (compare machine ids). // 2. New toolchain (compare toolchain ids). // 3. New bbot/libbbot (compare versions). // // The last case has a complication: what should we do if we have // bootstrapped a newer version of bbot? This would mean that we are // about to be stopped and upgraded (and the upgraded version will // probably be able to use the result). So we simply ignore this // machine for this run. // Return -1 if older, 0 if the same, and +1 if newer. // auto compare_bbot = [] (const bootstrap_manifest& m) -> int { auto cmp = [&m] (const string& n, uint64_t v) -> int { auto i = m.versions.find (n); return i == m.versions.end () || i->second < v ? -1 : i->second > v ? 1 : 0; }; // Start from the top assuming a new dependency cannot be added // without changing the dependent's version. // int r; return (r = cmp ("bbot", BBOT_VERSION)) != 0 ? r : (r = cmp ("libbbot", LIBBBOT_VERSION)) != 0 ? r : (r = cmp ("libbpkg", LIBBPKG_VERSION)) != 0 ? r : (r = cmp ("libbutl", LIBBUTL_VERSION)) != 0 ? r : 0; }; optional bmm; if (te) { bmm = parse_manifest ( tp / "manifest", "bootstrapped machine"); if (bmm->machine.id != mm.id) { l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); te = false; } if (!tc_id.empty () && bmm->toolchain.id != tc_id) { l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); te = false; } if (int i = compare_bbot (bmm->bootstrap)) { if (i < 0) { l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); te = false; } else { l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); run_btrfs (trace, "subvolume", "delete", xp); break; } } if (!te) delete_t (); } else l3 ([&]{trace << "bootstrapping " << tp;}); if (!te) { // Use the -- snapshot that we have made to // bootstrap the new machine. Then atomically rename it to // -. // bmm = bootstrap_machine (xp, mm, move (bmm)); if (!bmm) { l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); run_btrfs (trace, "subvolume", "delete", xp); break; } try { mvdir (xp, tp); } catch (const system_error& e) { fail << "unable to rename " << xp << " to " << tp; } l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); // Check the bootstrapped bbot version as above and ignore this // machine if it's newer than us. // if (int i = compare_bbot (bmm->bootstrap)) { assert (i > 0); l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); break; } } else run_btrfs (trace, "subvolume", "delete", xp); // Add the machine to the lists. // rm.push_back (move (*bmm)); rd.push_back (move (tp)); break; } } } catch (const system_error& e) { fail << "unable to iterate over " << vd << ": " << e << endf; } } return make_pair (move (rm), move (rd)); } catch (const system_error& e) { fail << "unable to iterate over " << machines << ": " << e << endf; } static result_manifest perform_task (const dir_path& md, const bootstrapped_machine_manifest& mm, const task_manifest& tm) try { tracer trace ("perform_task"); result_manifest r { tm.name, tm.version, result_status::abort, operation_results {}}; if (ops.fake_build ()) return r; // The overall plan is as follows: // // 1. Snapshot the (bootstrapped) machine. // // 2. Save the task manifest to the TFTP directory (to be accessed by the // worker). // // 3. Start the TFTP server and the machine. // // 4. Serve TFTP requests while watching out for the result manifest. // // 5. Clean up (force the machine down and delete the snapshot). // // TFTP server mapping (server chroot is --tftp): // // GET requests to .../build//get/* // PUT requests to .../build//put/* // auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name); dir_path gd (dir_path (arm.path ()) /= "get"); dir_path pd (dir_path (arm.path ()) /= "put"); try_mkdir_p (gd); try_mkdir_p (pd); path tf (gd / "manifest"); // Task manifest file. path rf (pd / "manifest"); // Result manifest file. serialize_manifest (tm, tf, "task"); if (ops.fake_machine_specified ()) { // Simply wait for the file to appear. // for (size_t i (0); !file_exists (rf); sleep (1)) if (i++ % 10 == 0) l3 ([&]{trace << "waiting for result manifest";}); r = parse_manifest (rf, "result"); } else { // -- // const dir_path xp ( md.directory () /= path::traits::temp_name (md.leaf ().string ())); run_btrfs (trace, "subvolume", "snapshot", md, xp); string br ("br1"); // Using private bridge for now. // Start the TFTP server. // tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" + "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n"); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // unique_ptr m ( start_machine (xp, mm.machine, mm.machine.mac, br, tftpd.port ())); // Note: the machine handling logic is similar to bootstrap. // { auto mg ( make_exception_guard ( [&m, &xp] () { info << "trying to force machine " << xp << " down"; try {m->forcedown ();} catch (const failed&) {} })); auto soft_fail = [&xp, &m, &r] (const char* msg, bool wait = true) { { diag_record dr (error); dr << msg << " for machine " << xp << ", suspending"; m->print_info (dr); } m->suspend (); if (wait) m->wait (); return r; }; // The first request should be the task manifest download. Wait for up // to 60 seconds for that to arrive. In a sense we use it as an // indication that the machine has booted and the worker process has // started. // size_t to; const size_t startup_to (60); const size_t build_to (ops.build_timeout ()); if (!tftpd.serve ((to = startup_to))) return soft_fail ("build startup timeout"); l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the worker builds things and then uploads the result manifest. // So on our side we serve TFTP requests while checking for the manifest // file. // for (to = build_to; to != 0 && !file_exists (rf); tftpd.serve (to)) ; if (to == 0) return soft_fail ("build timeout"); l3 ([&]{trace << "completed build in " << build_to - to << "s";}); // Parse the result manifest. // r = parse_manifest (rf, "result"); // If the build terminated abnormally, suspent the machine for // investigation (note that here we don't wait or return). // if (r.status == result_status::abnormal) soft_fail ("build terminated abnormally", false); // Force the machine down (there is no need wasting time on clean // shutdown since the next step is to drop the snapshot). // m->forcedown (); } run_btrfs (trace, "subvolume", "delete", xp); } // Update package name/version if the returned value as "unknown". // if (r.version == bpkg::version ("0")) { assert (r.status == result_status::abnormal); r.name = tm.name; r.version = tm.version; } return r; } catch (const system_error& e) { fail << "build error: " << e << endf; } extern "C" void handle_signal (int sig) { switch (sig) { case SIGHUP: exit (3); // Unimplemented feature. case SIGTERM: exit (0); default: assert (false); } } int main (int argc, char* argv[]) try { cli::argv_scanner scan (argc, argv, true); ops.parse (scan); verb = ops.verbose (); if (ops.systemd_daemon ()) systemd_diagnostics (true); // With critical errors. tracer trace ("main"); uid = getuid (); uname = getpwuid (uid)->pw_name; { char buf[HOST_NAME_MAX + 1]; if (gethostname (buf, sizeof (buf)) == -1) fail << "unable to obtain hostname: " << system_error (errno, generic_category ()); // Sanitize. hname = buf; } // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if // the pipe reading end is closed. Note that by default this signal // terminates a process. Also note that there is no way to disable this // behavior on a file descriptor basis or for the write() function call. // if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) fail << "unable to ignore broken pipe (SIGPIPE) signal: " << system_error (errno, generic_category ()); // Sanitize. // Version. // if (ops.version ()) { cout << "bbot-agent " << BBOT_VERSION_STR << endl << "libbbot " << LIBBBOT_VERSION_STR << endl << "libbutl " << LIBBUTL_VERSION_STR << endl << "Copyright (c) 2014-2017 Code Synthesis Ltd" << endl << "TBC; All rights reserved" << endl; return 0; } // Help. // if (ops.help ()) { pager p ("bbot-agent help", false); print_bbot_agent_usage (p.stream ()); // If the pager failed, assume it has issued some diagnostics. // return p.wait () ? 0 : 1; } tc_name = ops.toolchain_name (); tc_num = ops.toolchain_num (); tc_id = ops.toolchain_id (); // Controller URLs. // if (argc < 2 && !ops.dump_machines () && !ops.fake_request_specified ()) { fail << "controller url expected" << info << "run " << argv[0] << " --help for details"; } strings controllers; for (int i (1); i != argc; ++i) controllers.push_back (argv[i]); // Handle SIGHUP and SIGTERM. // if (signal (SIGHUP, &handle_signal) == SIG_ERR || signal (SIGTERM, &handle_signal) == SIG_ERR) fail << "unable to set signal handler: " << system_error (errno, generic_category ()); // Sanitize. if (ops.systemd_daemon ()) { diag_record dr; dr << info << "bbot agent " << BBOT_VERSION_STR << info << "toolchain name " << tc_name << info << "toolchain num " << tc_num << info << "toolchain id " << tc_id << info << "CPU(s) " << ops.cpu () << info << "RAM(kB) " << ops.ram (); for (const string& u: controllers) dr << info << "controller url " << u; } // The work loop. The steps we go through are: // // 1. Enumerate the available machines, (re-)bootstrapping any if necessary. // // 2. Poll controller(s) for build tasks. // // 3. If no build tasks are available, go to #1 (after sleeping a bit). // // 4. If a build task is returned, do it, upload the result, and go to #1 // (immediately). // for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false) { // Enumerate the machines. // auto mp (enumerate_machines (ops.machines ())); bootstrapped_machine_manifests& ms (mp.first); dir_paths& ds (mp.second); // Prepare task request. // // @@ TODO: key fingerprint. // task_request_manifest tq {hname, nullopt, machine_header_manifests {}}; for (const bootstrapped_machine_manifest& m: ms) tq.machines.emplace_back (m.machine.id, m.machine.name, m.machine.summary); if (ops.dump_machines ()) { for (const machine_header_manifest& m: tq.machines) serialize_manifest (m, cout, "stdout", "machine"); return 0; } if (tq.machines.empty ()) { warn << "no build machines for toolchain " << tc_name; sleep = true; continue; } // Send task requests. // // string url; task_response_manifest tr; if (ops.fake_request_specified ()) { auto t (parse_manifest (ops.fake_request (), "task")); tr = task_response_manifest { "fake-session", // Dummy session. string (), // Empty challange. url, // Empty result URL. move (t)}; url = "http://example.org"; } else { for (const string& u: controllers) { try { http_curl c (trace, path ("-"), path ("-"), curl::post, u, "--header", "Content-Type: text/manifest", "--max-time", ops.request_timeout ()); // This is tricky/hairy: we may fail hard parsing the output before // seeing that curl exited with an error and failing softly. // bool f (false); try { serialize_manifest (tq, c.out, u, "task request", false); } catch (const failed&) {f = true;} c.out.close (); if (!f) try { tr = parse_manifest ( c.in, u, "task response", false); } catch (const failed&) {f = true;} c.in.close (); if (!c.wait () || f) throw_generic_error (EIO); } catch (const system_error& e) { error << "unable to request task from " << u << ": " << e; continue; } if (!tr.session.empty ()) // Got a task. { url = u; break; } } } if (tr.session.empty ()) // No task from any of the controllers. { sleep = true; continue; } // We have a build task. // // First find the index of the machine we were asked to use (and also // verify it is one of those we sent). // size_t i (0); for (const machine_header_manifest& m: tq.machines) { if (m.name == tr.task->machine) break; ++i; } if (i == ms.size ()) { error << "task from " << url << " for unknown machine " << tr.task->machine; if (ops.dump_task ()) return 0; continue; } task_manifest& t (*tr.task); if (ops.dump_task ()) { serialize_manifest (t, cout, "stdout", "task"); return 0; } // If we have our own repository certificate fingerprints, then use them // to replace what we have received from the controller. // if (!ops.trust ().empty ()) t.trust = ops.trust (); const dir_path& d (ds[i]); // The - directory. const bootstrapped_machine_manifest& m (ms[i]); result_manifest r (perform_task (d, m, t)); if (ops.dump_result ()) { serialize_manifest (r, cout, "stdout", "result"); return 0; } // Upload the result. // // @@ TODO challange // result_request_manifest rq {tr.session, nullopt, move (r)}; { const string& u (*tr.result_url); try { http_curl c (trace, path ("-"), nullfd, // Not expecting any data in response. curl::post, u, "--header", "Content-Type: text/manifest", "--max-time", ops.request_timeout ()); // This is tricky/hairy: we may fail hard writing the input before // seeing that curl exited with an error and failing softly. // bool f (false); try { serialize_manifest (rq, c.out, u, "task request"); } catch (const failed&) {f = true;} c.out.close (); if (!c.wait () || f) throw_generic_error (EIO); } catch (const system_error& e) { error << "unable to upload result to " << u << ": " << e; continue; } } l2 ([&]{trace << "built " << t.name << '/' << t.version << " " << "on " << t.machine << " " << "for " << url;}); } } catch (const failed&) { return 1; // Diagnostics has already been issued. } catch (const cli::exception& e) { error << e; return 1; } namespace bbot { static unsigned int rand_seed; // Seed for rand_r(); size_t genrand () { if (rand_seed == 0) rand_seed = static_cast ( chrono::system_clock::now ().time_since_epoch ().count ()); return static_cast (rand_r (&rand_seed)); } // Note: Linux-specific implementation. // string iface_addr (const string& i) { if (i.size () >= IFNAMSIZ) throw invalid_argument ("interface nama too long"); auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)); if (fd.get () == -1) throw_system_error (errno); ifreq ifr; ifr.ifr_addr.sa_family = AF_INET; strcpy (ifr.ifr_name, i.c_str ()); if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1) throw_system_error (errno); char buf[3 * 4 + 3 + 1]; // IPv4 address. if (inet_ntop (AF_INET, &reinterpret_cast (&ifr.ifr_addr)->sin_addr, buf, sizeof (buf)) == nullptr) throw_system_error (errno); return buf; } }