From 2fca6d23f87304ceed78e93d2a52d137c5ffd0c7 Mon Sep 17 00:00:00 2001 From: Karen Arutyunov Date: Wed, 26 Apr 2023 21:34:12 +0300 Subject: Add support for build artifacts upload in agent --- bbot/agent/agent.cxx | 438 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 312 insertions(+), 126 deletions(-) (limited to 'bbot/agent/agent.cxx') diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index 6163c14..e56f742 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -31,6 +31,7 @@ #include // generic_category() #include +#include #include #include #include // dir_iterator, try_rmfile(), readsymlink() @@ -47,6 +48,7 @@ #include #include +#include using namespace butl; using namespace bbot; @@ -1249,7 +1251,35 @@ catch (const system_error& e) // struct interrupt {}; -static result_manifest +struct perform_task_result +{ + auto_rmdir work_dir; // /build/-/ + result_manifest manifest; + + // Uploaded archive, if any (somewhere inside work_dir). + // + optional upload_archive; + + // Create the special empty result. + // + perform_task_result () = default; + + // Create task result without build artifacts. + // + explicit + perform_task_result (auto_rmdir&& d, result_manifest&& m) + : work_dir (move (d)), + manifest (move (m)) {} + + // Create task result with build artifacts. + // + perform_task_result (auto_rmdir&& d, result_manifest&& m, path&& a) + : work_dir (move (d)), + manifest (move (m)), + upload_archive (move (a)) {} +}; + +static perform_task_result perform_task (toolchain_lock tl, // Note: assumes ownership. machine_lock& ml, const dir_path& md, @@ -1267,6 +1297,11 @@ try sigurs1.store (0, std::memory_order_release); tl.unlock (); + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); + + try_mkdir_p (arm.path); + result_manifest r { tm.name, tm.version, @@ -1276,7 +1311,7 @@ try nullopt /* dependency_checksum */}; if (ops.fake_build ()) - return r; + return perform_task_result (move (arm), move (r)); // The overall plan is as follows: // @@ -1295,12 +1330,9 @@ try // TFTP server mapping (server chroot is --tftp): // - // GET requests to .../build/-/get/* - // PUT requests to .../build/-/put/* + // GET requests to .../build/-/get/* + // PUT requests to .../build/-/put/* // - const string in_name (tc_name + '-' + to_string (inst)); - auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); - dir_path gd (dir_path (arm.path) /= "get"); dir_path pd (dir_path (arm.path) /= "put"); @@ -1334,43 +1366,11 @@ try } r = parse_manifest (rf, "result"); - - // If archive of build artifacts is present, then just list its content as - // a sanity check. - // - bool err (!r.status); - if (!err && file_exists (af)) - { - try - { - auto_fd null (fdopen_null ()); - - // Redirect stdout to stderr if the command is traced and to /dev/null - // otherwise. - // - process_exit pe ( - process_run_callback ( - trace, - null.get (), // Don't expect to read from stdin. - verb >= 3 ? 2 : null.get (), - 2, - "tar", - "-tf", af)); - - if (!pe) - fail << "tar " << pe; - } - catch (const process_error& e) - { - fail << "unable execute tar: " << e; - } - } } else { try_rmfile (rf); try_rmfile (af); - try_rmdir_r (pd / dir_path ("upload")); // -- // @@ -1418,7 +1418,7 @@ try } })); - auto soft_fail = [&trace, &ml, &xp, &m, &r] (const char* msg) + auto soft_fail = [&trace, &ml, &xp, &m, &arm, &r] (const char* msg) { { diag_record dr (error); @@ -1441,7 +1441,7 @@ try } catch (const failed&) {} - return r; + return perform_task_result (move (arm), move (r)); }; auto check_machine = [&xp, &m] () @@ -1501,7 +1501,7 @@ try if (!check_machine ()) { run_btrfs (trace, "subvolume", "delete", xp); - return r; + return perform_task_result (move (arm), move (r)); } } @@ -1541,7 +1541,7 @@ try if (!file_not_empty (rf)) { run_btrfs (trace, "subvolume", "delete", xp); - return r; + return perform_task_result (move (arm), move (r)); } } @@ -1558,81 +1558,14 @@ try // Parse the result manifest. // - optional rm; - try { - rm = parse_manifest (rf, "result", false); + r = parse_manifest (rf, "result", false); } catch (const failed&) { r.status = result_status::abnormal; // Soft-fail below. } - - // Upload the build artifacts if the result manifest is parsed - // successfully, the result status is not an error, and upload.tar - // exists. - // - // Note that while the worker doesn't upload the build artifacts - // archives on errors, there can be the case when the error occurred - // while uploading the archive and so the partially uploaded file - // may exist. Thus, we check if the result status is not an error. - // - // Note also that we will not bother with interrupting this process - // assuming it will be quick (relative to the amount of work that - // would be wasted). - // - bool err (!rm || !rm->status); - if (!err && file_exists (af)) - { - // Extract the build artifacts from the archive and upload them to - // the controller. On error keep the result status as abort for - // transient errors (network failure, etc) and set it to abnormal - // otherwise (for subsequent machine suspension and - // investigation). - // - optional err; // True if the error is transient. - - try - { - process_exit pe ( - process_run_callback ( - trace, - fdopen_null (), // Don't expect to read from stdin. - 2, // Redirect stdout to stderr. - 2, - "tar", - "-xf", af, - "-C", pd)); - - if (!pe) - { - err = false; - error << "tar " << pe; - } - } - catch (const process_error& e) - { - err = false; - error << "unable execute tar: " << e; - } - - if (!err) - { - // @@ Upload the extracted artifacts. - } - - if (err) - { - if (!*err) // Non-transient? - r.status = result_status::abnormal; // Soft-fail below. - - rm = nullopt; // Drop the parsed manifest. - } - } - - if (rm) - r = move (*rm); } else { @@ -1681,7 +1614,9 @@ try r.version = tm.version; } - return r; + return (!r.status || !file_exists (af) + ? perform_task_result (move (arm), move (r)) + : perform_task_result (move (arm), move (r), move (af))); } catch (const system_error& e) { @@ -2206,9 +2141,10 @@ try auto t (parse_manifest (ops.fake_request (), "task")); tr = task_response_manifest { - "fake-session", // Dummy session. - nullopt, // No challenge. - string (), // Empty result URL. + "fake-session", // Dummy session. + nullopt, // No challenge. + string (), // Empty result URL. + vector (), agent_checksum, move (t)}; @@ -2381,7 +2317,7 @@ try // that we have already received a task, responding with an interrupt // feels like the most sensible option). // - result_manifest r; + perform_task_result r; bootstrapped_machine* pm (nullptr); try { @@ -2592,21 +2528,27 @@ try } catch (const interrupt&) { - r = result_manifest { - t.name, - t.version, - result_status::interrupt, - operation_results {}, - nullopt /* worker_checksum */, - nullopt /* dependency_checksum */}; + // Note: no work_dir. + // + r = perform_task_result ( + auto_rmdir (), + result_manifest { + t.name, + t.version, + result_status::interrupt, + operation_results {}, + nullopt /* worker_checksum */, + nullopt /* dependency_checksum */}); } if (pm != nullptr && pm->lock.locked ()) pm->lock.unlock (); // No need to hold the lock any longer. + result_manifest& rm (r.manifest); + if (ops.dump_result ()) { - serialize_manifest (r, cout, "stdout", "result"); + serialize_manifest (rm, cout, "stdout", "result"); return 0; } @@ -2642,14 +2584,258 @@ try fail << "unable to sign task response challenge: " << e; } - result_status rs (r.status); + // Re-package the build artifacts, if present, into the type/instance- + // specific archives and upload them to the type-specific URLs, if + // provided. + // + // Note that the initial upload archive content is organized as a bunch of + // upload///*, where the second level directories are the + // upload types and the third level sub-directories are their instances. + // The resulting .tar archives content (which is what we submit + // to the type-specific handler) are organized as /*. + // + if (r.upload_archive && !tr.upload_urls.empty ()) + { + const path& ua (*r.upload_archive); + + // Extract the archive content into the parent directory of the archive + // file. But first, make sure the resulting directory doesn't exist. + // + // Note that while we don't assume where inside the working directory + // the archive is, we do assume that there is nothing clashing/precious + // in the upload/ directory which we are going to cleanup. + // + dir_path d (ua.directory ()); + + const dir_path ud (d / dir_path ("upload")); + try_rmdir_r (ud); + + try + { + process_exit pe ( + process_run_callback ( + trace, + fdopen_null (), // Don't expect to read from stdin. + 2, // Redirect stdout to stderr. + 2, + "tar", + "-xf", ua, + "-C", d)); + + if (!pe) + fail << "tar " << pe; + } + catch (const system_error& e) + { + // There must be something wrong with the setup or there is no space + // left on the disk, thus the failure is fatal. + // + fail << "unable to extract build artifacts from archive: " << e; + } + + try_rmfile (ua); // Let's free up the disk space. + + // To decrease nesting a bit, let's collect the type-specific upload + // directories and the corresponding URLs first. This way we can also + // create the archive files as the upload/ directory sub-entries without + // interfering with iterating over this directory. + // + vector> urls; + + try + { + for (const dir_entry& te: dir_iterator (ud, dir_iterator::no_follow)) + { + const string& t (te.path ().string ()); + + // Can only be a result of the worker malfunction, thus the failure + // is fatal. + // + if (te.type () != entry_type::directory) + fail << "unexpected filesystem entry '" << t << "' in " << ud; + + auto i (find_if (tr.upload_urls.begin (), tr.upload_urls.end (), + [&t] (const upload_url& u) {return u.type == t;})); + + if (i == tr.upload_urls.end ()) + continue; + + urls.emplace_back (ud / path_cast (te.path ()), i->url); + } + } + catch (const system_error& e) + { + fail << "unable to iterate over " << ud << ": " << e; + } + + // Now create archives and upload. + // + for (const pair& p: urls) + { + const dir_path& td (p.first); // / + const string& url (p.second); + + try + { + for (const dir_entry& ie: dir_iterator (td, dir_iterator::no_follow)) + { + const string& i (ie.path ().string ()); // + + // Can only be a result of the worker malfunction, thus the + // failure is fatal. + // + if (ie.type () != entry_type::directory) + fail << "unexpected filesystem entry '" << i << "' in " << td; + + // Archive the upload instance files and, while at it, calculate + // the resulting archive checksum. + // + sha256 sha; + auto_rmfile ari (ud / (i + ".tar")); + + try + { + // Instruct tar to print the archive to stdout. + // + fdpipe in_pipe (fdopen_pipe (fdopen_mode::binary)); + + process pr ( + process_start_callback ( + trace, + fdopen_null (), // Don't expect to read from stdin. + in_pipe, + 2 /* stderr */, + "tar", + "--format", "ustar", + "-c", + "-C", td, + i)); + + // Shouldn't throw, unless something is severely damaged. + // + in_pipe.out.close (); + + ifdstream is ( + move (in_pipe.in), fdstream_mode::skip, ifdstream::badbit); + + ofdstream os (ari.path, fdopen_mode::binary); + + char buf[8192]; + while (!eof (is)) + { + is.read (buf, sizeof (buf)); + + if (size_t n = static_cast (is.gcount ())) + { + sha.append (buf, n); + os.write (buf, n); + } + } + + os.close (); + + if (!pr.wait ()) + fail << "tar " << *pr.exit; + } + catch (const system_error& e) + { + // There must be something wrong with the setup or there is no + // space left on the disk, thus the failure is fatal. + // + fail << "unable to archive " << td << i << "/: " << e; + } + + // Post the upload instance archive. + // + using namespace http_service; + + parameters params ({ + {parameter::text, "session", tr.session}, + {parameter::text, "instance", i}, + {parameter::file, "archive", ari.path.string ()}, + {parameter::text, "sha256sum", sha.string ()}}); + + if (challenge) + params.push_back ({ + parameter::text, "challenge", base64_encode (*challenge)}); + + result pr (post (ops, url, params)); + + // Turn the potential upload failure into the "upload" operation + // error, amending the task result manifest. + // + if (pr.error) + { + // The "upload" operation result must be present (otherwise + // there would be nothing to upload). We can assume it is last. + // + assert (!rm.results.empty ()); + + operation_result& r (rm.results.back ()); + + // The "upload" operation result must be the last, if present. + // + assert (r.operation == "upload"); + + auto log = [&r, indent = false] (const string& t, + const string& l) mutable + { + if (indent) + r.log += " "; + else + indent = true; + + r.log += t; + r.log += ": "; + r.log += l; + r.log += '\n'; + }; + + log ("error", + "unable to upload " + td.leaf ().string () + '/' + i + + " build artifacts"); + + log ("error", *pr.error); + + if (!pr.message.empty ()) + log ("reason", pr.message); + + if (pr.reference) + log ("reference", *pr.reference); + + for (const manifest_name_value& nv: pr.body) + { + if (!nv.name.empty ()) + log (nv.name, nv.value); + } + + r.status |= result_status::error; + rm.status |= r.status; + + break; + } + } + + // Bail out on the instance archive upload failure. + // + if (!rm.status) + break; + } + catch (const system_error& e) + { + fail << "unable to iterate over " << td << ": " << e; + } + } + } + + result_status rs (rm.status); // Upload the result. // result_request_manifest rq {tr.session, move (challenge), agent_checksum, - move (r)}; + move (rm)}; { const string& u (*tr.result_url); -- cgit v1.1