diff options
author | Karen Arutyunov <karen@codesynthesis.com> | 2018-08-09 00:42:46 +0300 |
---|---|---|
committer | Karen Arutyunov <karen@codesynthesis.com> | 2018-08-21 13:56:36 +0300 |
commit | e3533fd4c2fc90969b77d2ddaccbda649dd74973 (patch) | |
tree | 6c25d1ee37cbe04a9bce7b846798c5d636ba7503 /mod/mod-submit.cxx | |
parent | e3a8a11f7fdc64a560810cf021080c61c7d69dc5 (diff) |
Implement submit-git
Diffstat (limited to 'mod/mod-submit.cxx')
-rw-r--r-- | mod/mod-submit.cxx | 429 |
1 files changed, 347 insertions, 82 deletions
diff --git a/mod/mod-submit.cxx b/mod/mod-submit.cxx index accd56d..c68b773 100644 --- a/mod/mod-submit.cxx +++ b/mod/mod-submit.cxx @@ -4,8 +4,16 @@ #include <mod/mod-submit.hxx> -#include <cstdlib> // strtoul() +#include <sys/time.h> // timeval +#include <sys/select.h> + +#include <ratio> // ratio_greater_equal +#include <chrono> +#include <cstdlib> // strtoul() #include <istream> +#include <sstream> +#include <type_traits> // static_assert +#include <system_error> // error_code, generic_category() #include <libbutl/sha256.mxx> #include <libbutl/process.mxx> @@ -501,10 +509,56 @@ handle (request& rq, response& rs) } // Given that the submission data is now successfully persisted we are no - // longer in charge of removing it, even in case of a subsequent error. + // longer in charge of removing it, except for the cases when the submission + // handler terminates with an error (see below for details). // tdr.cancel (); + // If the handler terminates with non-zero exit status or specifies 5XX + // (HTTP server error) submission result manifest status value, then we + // stash the submission data directory for troubleshooting. Otherwise, if + // it's the 4XX (HTTP client error) status value, then we remove the + // directory. + // + // Note that leaving the directory in place in case of a submission error + // would have prevent the user from re-submitting until we research the + // issue and manually remove the directory. + // + auto stash_submit_dir = [&dd, error] () + { + try + { + if (!dir_exists (dd)) + return; + + for (size_t n (1); true; ++n) // Eventually we should find the free one. + { + string ext ('.' + to_string (n)); + dir_path d (dd + ext); + + if (!dir_exists (d)) + try + { + mvdir (dd, d); + break; + } + catch (const system_error& e) + { + int ec (e.code ().value ()); + if (ec != ENOTEMPTY && ec != EEXIST) // Note: there can be a race. + throw; + } + } + } + catch (const system_error& e) + { + // Not much we can do here. Let's just log the issue and bail out + // leaving the directory in place. + // + error << "unable to rename directory '" << dd << "': " << e; + } + }; + auto print_args = [&trace, this] (const char* args[], size_t n) { l2 ([&]{trace << process_args {args, n};}); @@ -518,13 +572,44 @@ handle (request& rq, response& rs) // containing at least the status value. Thus, an empty cache indicates that // the handler is not configured. // - status_code sc; + status_code sc (200); vector<manifest_name_value> rvs; if (options_->submit_handler_specified ()) { + // For the sake of the documentation we will call the handler's normal + // exit with 0 code "successful termination". + // + // To make sure the handler process execution doesn't exceed the specified + // timeout we set the non-blocking mode for the process stdout-reading + // stream, try to read from it with the 10 milliseconds timeout and check + // the process execution time between the reads. We then kill the process + // if the execution time is exceeded. + // + using namespace chrono; + + using time_point = system_clock::time_point; + using duration = system_clock::duration; + + // Make sure that the system clock has at least milliseconds resolution. + // + static_assert( + ratio_greater_equal<milliseconds::period, duration::period>::value, + "The system clock resolution is too low"); + + optional<milliseconds> timeout; + + if (options_->submit_handler_timeout_specified ()) + timeout = milliseconds (options_->submit_handler_timeout () * 1000); + const path& handler (options_->submit_handler ()); + // Note that due to the non-blocking mode we cannot just pass the stream + // to the manifest parser constructor. So we buffer the data in the string + // stream and then parse that. + // + stringstream ss; + for (;;) // Breakout loop. try { @@ -542,86 +627,177 @@ handle (request& rq, response& rs) dd)); pipe.out.close (); - try + auto kill = [&pr, &warn, &handler, &ac] () { - ifdstream is (move (pipe.in)); - - // Parse and verify the manifest. Obtain the HTTP status code (must go - // first) and cache it for the subsequent responding to the client. - // - parser p (is, "handler"); - manifest_name_value nv (p.next ()); - - auto bad_value ([&p, &nv] (const string& d) { - throw parsing (p.name (), nv.value_line, nv.value_column, d);}); - - if (nv.empty ()) - bad_value ("empty manifest"); - - const string& n (nv.name); - const string& v (nv.value); - - // The format version pair is verified by the parser. - // - assert (n.empty () && v == "1"); - - // Cache the format version pair. - // - rvs.push_back (move (nv)); - - // Get and verify the HTTP status. - // - nv = p.next (); - if (n != "status") - bad_value ("no status specified"); - - char* e (nullptr); - unsigned long c (strtoul (v.c_str (), &e, 10)); // Can't throw. - - assert (e != nullptr); - - if (!(*e == '\0' && c >= 100 && c < 600)) - bad_value ("invalid http status '" + v + "'"); - - // Cache the HTTP status. + // We may still end up well (see below), thus this is a warning. // - sc = static_cast<status_code> (c); - rvs.push_back (move (nv)); + warn << "ref " << ac << ": process " << handler + << " execution timeout expired"; - // Cache the remaining name/value pairs. - // - for (nv = p.next (); !nv.empty (); nv = p.next ()) - rvs.push_back (move (nv)); + pr.kill (); + }; - // Cache end of manifest. + try + { + ifdstream is (move (pipe.in), fdstream_mode::non_blocking); + + const size_t nbuf (8192); + char buf[nbuf]; + + while (is.is_open ()) + { + time_point start; + milliseconds wd (10); // Max time to wait for the data portion. + + if (timeout) + { + start = system_clock::now (); + + if (*timeout < wd) + wd = *timeout; + } + + timeval tm {wd.count () / 1000 /* seconds */, + wd.count () % 1000 * 1000 /* microseconds */}; + + fd_set rd; + FD_ZERO (&rd); + FD_SET (is.fd (), &rd); + + int r (select (is.fd () + 1, &rd, nullptr, nullptr, &tm)); + + if (r == -1) + { + // Don't fail if the select() call was interrupted by the signal. + // + if (errno != EINTR) + throw io_error ("select failed", + error_code (errno, generic_category ())); + } + else if (r != 0) // Is data available? + { + assert (FD_ISSET (is.fd (), &rd)); + + // The only leagal way to read from non-blocking ifdstream. + // + streamsize n (is.readsome (buf, nbuf)); + + // Close the stream (and bail out) if the end of the data is + // reached. Otherwise cache the read data. + // + if (is.eof ()) + is.close (); + else + { + // The data must be available. + // + // Note that we could keep reading until the readsome() call + // returns 0. However, this way we could potentially exceed the + // timeout significantly for some broken handler that floods us + // with data. So instead, we will be checking the process + // execution time after every data chunk read. + // + assert (n != 0); + + ss.write (buf, n); + } + } + else // Timeout occured. + { + // Normally, we don't expect timeout to occur on the pipe read + // operation if the process has terminated successfully, as all its + // output must already be buffered (including eof). However, there + // can be some still running handler's child that has inherited + // the parent's stdout. In this case we assume that we have read + // all the handler's output, close the stream, log the warning and + // bail out. + // + if (pr.exit) + { + // We keep reading only upon successful handler termination. + // + assert (*pr.exit); + + is.close (); + + warn << "ref " << ac << ": process " << handler + << " stdout is not closed after termination (possibly " + << "handler's child still running)"; + } + } + + if (timeout) + { + time_point now (system_clock::now ()); + + // Assume we have waited the full amount if the time adjustment is + // detected. + // + duration d (now > start ? now - start : wd); + + // If the timeout is not fully exhausted, then decrement it and + // try to read some more data from the handler' stdout. Otherwise, + // kill the process, if not done yet. + // + // Note that it may happen that we are killing an already + // terminated process, in which case kill() just sets the process + // exit information. On the other hand it's guaranteed that the + // process is terminated after the kill() call, and so the pipe is + // presumably closed on the write end (see above for details). + // Thus, if the process terminated successfully, we will continue + // reading until eof is reached or read timeout occurred. Yes, it + // may happen that we end up with a successful submission even + // with the kill. + // + if (*timeout > d) + *timeout -= duration_cast<milliseconds> (d); + else if (!pr.exit) + { + kill (); + + assert (pr.exit); + + // Close the stream (and bail out) if the process hasn't + // terminate successfully. + // + if (!*pr.exit) + is.close (); + + *timeout = milliseconds::zero (); + } + } + } + + assert (!is.is_open ()); + + if (!timeout) + pr.wait (); + + // If the process is not terminated yet, then wait for its termination + // for the remaining time. Kill it if the timeout has been exceeded + // and the process still hasn't terminate. // - rvs.push_back (move (nv)); + else if (!pr.exit && !pr.timed_wait (*timeout)) + kill (); - is.close (); + assert (pr.exit); // The process must finally be terminated. - if (pr.wait ()) + if (*pr.exit) break; // Get out of the breakout loop. - assert (pr.exit); - error << "process " << handler << " " << *pr.exit; - - // Fall through. - } - catch (const parsing& e) - { - if (pr.wait ()) - error << "unable to parse handler's output: " << e; + error << "ref " << ac << ": process " << handler << " " << *pr.exit; // Fall through. } catch (const io_error& e) { if (pr.wait ()) - error << "unable to read handler's output: " << e; + error << "ref " << ac << ": unable to read handler's output: " << e; // Fall through. } + stash_submit_dir (); return respond_error (); } // Handle process_error and io_error (both derive from system_error). @@ -629,6 +805,73 @@ handle (request& rq, response& rs) catch (const system_error& e) { error << "unable to execute '" << handler << "': " << e; + + stash_submit_dir (); + return respond_error (); + } + + try + { + // Parse and verify the manifest. Obtain the HTTP status code (must go + // first) and cache it for the subsequent response to the client. + // + parser p (ss, "handler"); + manifest_name_value nv (p.next ()); + + auto bad_value ([&p, &nv] (const string& d) { + throw parsing (p.name (), nv.value_line, nv.value_column, d);}); + + if (nv.empty ()) + bad_value ("empty manifest"); + + const string& n (nv.name); + const string& v (nv.value); + + // The format version pair is verified by the parser. + // + assert (n.empty () && v == "1"); + + // Cache the format version pair. + // + rvs.push_back (move (nv)); + + // Get and verify the HTTP status. + // + nv = p.next (); + if (n != "status") + bad_value ("no status specified"); + + char* e (nullptr); + unsigned long c (strtoul (v.c_str (), &e, 10)); // Can't throw. + + assert (e != nullptr); + + if (!(*e == '\0' && c >= 100 && c < 600)) + bad_value ("invalid HTTP status '" + v + "'"); + + // Cache the HTTP status. + // + sc = static_cast<status_code> (c); + rvs.push_back (move (nv)); + + // Cache the remaining name/value pairs. + // + for (nv = p.next (); !nv.empty (); nv = p.next ()) + rvs.push_back (move (nv)); + + // Cache end of manifest. + // + rvs.push_back (move (nv)); + } + catch (const parsing& e) + { + error << "ref " << ac << ": unable to parse handler's output: " << e; + + // It appears the handler had misbehaved, so let's stash the submission + // directory for troubleshooting. + // + stash_submit_dir (); + return respond_error (); } } @@ -637,7 +880,7 @@ handle (request& rq, response& rs) // serialization error log the error description and return false, on the // stream error pass through the io_error exception, otherwise return true. // - auto rsm = [&rvs, &error] (ostream& os) -> bool + auto rsm = [&rvs, &error, &ac] (ostream& os) -> bool { assert (!rvs.empty ()); @@ -651,30 +894,52 @@ handle (request& rq, response& rs) } catch (const serialization& e) { - error << "unable to serialize handler's output: " << e; + error << "ref " << ac << ": unable to serialize handler's output: " << e; return false; } }; - // Save the result manifest, if generated, into the submission directory - // if it still exists (note that the handler could move or remove it). + // If the submission data directory still exists then perform an appropriate + // action on it, depending on the submission result status. Note that the + // handler could move or remove the directory. // - path rsf (dd / "result.manifest"); - - if (!rvs.empty () && dir_exists (dd)) - try + if (dir_exists (dd)) { - ofdstream os (rsf); - bool r (rsm (os)); - os.close (); + // Remove the directory if the client error is detected. + // + if (sc >= 400 && sc < 500) + rmdir_r (dd); - if (!r) - return respond_error (); // The error description is already logged. - } - catch (const io_error& e) - { - error << "unable to write to '" << rsf << "': " << e; - return respond_error (); + // Otherwise, save the result manifest, if generated, into the directory. + // Also stash the directory for troubleshooting in case of the server + // error. + // + else + { + path rsf (dd / "result.manifest"); + + if (!rvs.empty ()) + try + { + ofdstream os (rsf); + + // Not being able to stash the result manifest is not a reason to + // claim the submission failed. The error is logged nevertheless. + // + rsm (os); + + os.close (); + } + catch (const io_error& e) + { + // Not fatal (see above). + // + error << "unable to write to '" << rsf << "': " << e; + } + + if (sc >= 500 && sc < 600) + stash_submit_dir (); + } } // Send email, if configured, and the submission is not simulated. |