From 72e9ec3d7028765836851b515d80816f2da74060 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Thu, 27 Oct 2022 11:00:47 +0200 Subject: Initial work on child process diagnostics buffering Currently this is implemented for C/C++ compile and link rules. --- libbuild2/diagnostics.cxx | 256 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 255 insertions(+), 1 deletion(-) (limited to 'libbuild2/diagnostics.cxx') diff --git a/libbuild2/diagnostics.cxx b/libbuild2/diagnostics.cxx index a2a8444..7bdfea2 100644 --- a/libbuild2/diagnostics.cxx +++ b/libbuild2/diagnostics.cxx @@ -3,7 +3,7 @@ #include -#include // strchr() +#include // strchr(), memcpy() #include @@ -13,6 +13,7 @@ #include using namespace std; +using namespace butl; namespace build2 { @@ -46,6 +47,259 @@ namespace build2 // const int stream_verb_index = ostream::xalloc (); + // diag_buffer + // + process::pipe diag_buffer:: + open (const char* args0, bool force, bool blocking) + { + assert (state_ == state::closed && args0 != nullptr); + + assert (blocking); // @@ TODO (also in read() and close () below). + + serial = ctx_.sched.serial (); + + process::pipe r; + if (!serial || force) + { + try + { + fdpipe p (fdopen_pipe ()); + + // Note that we must return non-owning fd to our end of the pipe (see + // the process class for details). + // + r = process::pipe (p.in.get (), move (p.out)); + + is.open (move (p.in), fdstream_mode::text, ifdstream::badbit); + } + catch (const io_error& e) + { + fail << "unable to read from " << args0 << " stderr:" << e; + } + } + else + r = process::pipe (-1, 2); + + this->args0 = args0; + state_ = state::opened; + return r; + } + + bool diag_buffer:: + read () + { + assert (state_ == state::opened); + + bool r; + if (is.is_open ()) + { + try + { + if (is.blocking ()) + { + if (serial) + { + // This is the case where we are called after custom processing. + // + assert (buf.empty ()); + + // Note that the eof check is important: if the stream is at eof, + // this and all subsequent writes to the diagnostics stream will + // fail (and you won't see a thing). + // + if (is.peek () != ifdstream::traits_type::eof ()) + { + // Holding the diag lock while waiting for diagnostics from the + // child process would be a bad idea in the parallel build. But + // it should be harmless in serial. + // + diag_stream_lock l; + *diag_stream << is.rdbuf (); + } + } + else + { + fdstreambuf& sb (*static_cast (is.rdbuf ())); + + while (is.peek () != istream::traits_type::eof ()) + { + const char* p (sb.gptr ()); + size_t n (sb.egptr () - p); + + // Allocate at least fdstreambuf::buffer_size to reduce + // reallocations and memory fragmentation. + // + size_t i (buf.size ()); + if (i == 0 && n < fdstreambuf::buffer_size) + buf.reserve (fdstreambuf::buffer_size); + + buf.resize (i + n); + memcpy (buf.data () + i, p, n); + + sb.gbump (static_cast (n)); + } + } + + r = false; + } + else + { + // @@ TODO (maybe we can unify the two?) + r = true; + } + + if (!r) + is.close (); + } + catch (const io_error& e) + { + // For now we assume (here and pretty much everywhere else) that the + // output can't fail. + // + fail << "unable to read from " << args0 << " stderr:" << e; + } + } + else + r = false; + + if (!r) + state_ = state::eof; + + return r; + } + + void diag_buffer:: + write (const string& s, bool nl) + { + // Similar logic to read() above. + // + if (serial) + { + assert (buf.empty ()); + + diag_stream_lock l; + *diag_stream << s; + if (nl) + *diag_stream << '\n'; + } + else + { + size_t n (s.size () + (nl ? 1 : 0)); + + size_t i (buf.size ()); + if (i == 0 && n < fdstreambuf::buffer_size) + buf.reserve (fdstreambuf::buffer_size); + + buf.resize (i + n); + memcpy (buf.data () + i, s.c_str (), s.size ()); + + if (nl) + buf.back () = '\n'; + } + } + + void diag_buffer:: + close (const char* const* args, size_t args_size, + const process_exit& pe, + uint16_t v, + const location& loc) + { + assert (state_ != state::closed); + + // We may still be in the open state in case of custom processing. + // + if (state_ == state::opened) + { + if (is.is_open ()) + { + try + { + // @@ TODO: is it ok to call peek() in non-blocking? + // + assert (is.peek () == ifdstream::traits_type::eof ()); + is.close (); + } + catch (const io_error& e) + { + fail << "unable to read from " << args0 << " stderr:" << e; + } + } + + state_ = state::eof; + } + + // We need to make sure the command line we print on the unsuccessful exit + // is inseparable from any buffered diagnostics. So we prepare the record + // first and then write both while holding the diagnostics stream lock. + // + diag_record dr; + if (!pe) + { + // Note: see similar code in run_finish_impl(). + + // It's unclear whether we should print this only if printing the + // command line (we could also do things differently for normal/abnormal + // exit). Let's print this always and see how it wears. + // + dr << error (loc) << "process " << args[0] << " " << pe; + + if (verb >= 1 && verb <= v) + { + dr << info << "command line: "; + print_process (dr, args, args_size); + } + } + + if (!buf.empty () || !dr.empty ()) + { + diag_stream_lock l; + + if (!buf.empty ()) + diag_stream->write (buf.data (), static_cast (buf.size ())); + + if (!dr.empty ()) + dr.flush ([] (const butl::diag_record& r) + { + // Similar to default_writer(). + // + *diag_stream << r.os.str () << '\n'; + }); + + diag_stream->flush (); + } + + buf.clear (); + args0 = nullptr; + state_ = state::closed; + } + + void diag_buffer:: + finish (const char* const* args, size_t args_size, + process& pr, + uint16_t v, + const location& loc) + { + // Note: see similar code in run_finish_impl(). + // + try + { + pr.wait (); + } + catch (const process_error& e) + { + fail (loc) << "unable to execute " << args[0] << ": " << e << endf; + } + + const process_exit& pe (*pr.exit); + + close (args, args_size, pe, v, loc); + + if (!pe) + throw failed (); + } + + // print_process() + // void print_process (const char* const* args, size_t n) { -- cgit v1.1