aboutsummaryrefslogtreecommitdiff
path: root/mod/mod-submit.cxx
diff options
context:
space:
mode:
authorKaren Arutyunov <karen@codesynthesis.com>2018-08-09 00:42:46 +0300
committerKaren Arutyunov <karen@codesynthesis.com>2018-08-21 13:56:36 +0300
commite3533fd4c2fc90969b77d2ddaccbda649dd74973 (patch)
tree6c25d1ee37cbe04a9bce7b846798c5d636ba7503 /mod/mod-submit.cxx
parente3a8a11f7fdc64a560810cf021080c61c7d69dc5 (diff)
Implement submit-git
Diffstat (limited to 'mod/mod-submit.cxx')
-rw-r--r--mod/mod-submit.cxx429
1 files changed, 347 insertions, 82 deletions
diff --git a/mod/mod-submit.cxx b/mod/mod-submit.cxx
index accd56d..c68b773 100644
--- a/mod/mod-submit.cxx
+++ b/mod/mod-submit.cxx
@@ -4,8 +4,16 @@
#include <mod/mod-submit.hxx>
-#include <cstdlib> // strtoul()
+#include <sys/time.h> // timeval
+#include <sys/select.h>
+
+#include <ratio> // ratio_greater_equal
+#include <chrono>
+#include <cstdlib> // strtoul()
#include <istream>
+#include <sstream>
+#include <type_traits> // static_assert
+#include <system_error> // error_code, generic_category()
#include <libbutl/sha256.mxx>
#include <libbutl/process.mxx>
@@ -501,10 +509,56 @@ handle (request& rq, response& rs)
}
// Given that the submission data is now successfully persisted we are no
- // longer in charge of removing it, even in case of a subsequent error.
+ // longer in charge of removing it, except for the cases when the submission
+ // handler terminates with an error (see below for details).
//
tdr.cancel ();
+ // If the handler terminates with non-zero exit status or specifies 5XX
+ // (HTTP server error) submission result manifest status value, then we
+ // stash the submission data directory for troubleshooting. Otherwise, if
+ // it's the 4XX (HTTP client error) status value, then we remove the
+ // directory.
+ //
+ // Note that leaving the directory in place in case of a submission error
+ // would have prevent the user from re-submitting until we research the
+ // issue and manually remove the directory.
+ //
+ auto stash_submit_dir = [&dd, error] ()
+ {
+ try
+ {
+ if (!dir_exists (dd))
+ return;
+
+ for (size_t n (1); true; ++n) // Eventually we should find the free one.
+ {
+ string ext ('.' + to_string (n));
+ dir_path d (dd + ext);
+
+ if (!dir_exists (d))
+ try
+ {
+ mvdir (dd, d);
+ break;
+ }
+ catch (const system_error& e)
+ {
+ int ec (e.code ().value ());
+ if (ec != ENOTEMPTY && ec != EEXIST) // Note: there can be a race.
+ throw;
+ }
+ }
+ }
+ catch (const system_error& e)
+ {
+ // Not much we can do here. Let's just log the issue and bail out
+ // leaving the directory in place.
+ //
+ error << "unable to rename directory '" << dd << "': " << e;
+ }
+ };
+
auto print_args = [&trace, this] (const char* args[], size_t n)
{
l2 ([&]{trace << process_args {args, n};});
@@ -518,13 +572,44 @@ handle (request& rq, response& rs)
// containing at least the status value. Thus, an empty cache indicates that
// the handler is not configured.
//
- status_code sc;
+ status_code sc (200);
vector<manifest_name_value> rvs;
if (options_->submit_handler_specified ())
{
+ // For the sake of the documentation we will call the handler's normal
+ // exit with 0 code "successful termination".
+ //
+ // To make sure the handler process execution doesn't exceed the specified
+ // timeout we set the non-blocking mode for the process stdout-reading
+ // stream, try to read from it with the 10 milliseconds timeout and check
+ // the process execution time between the reads. We then kill the process
+ // if the execution time is exceeded.
+ //
+ using namespace chrono;
+
+ using time_point = system_clock::time_point;
+ using duration = system_clock::duration;
+
+ // Make sure that the system clock has at least milliseconds resolution.
+ //
+ static_assert(
+ ratio_greater_equal<milliseconds::period, duration::period>::value,
+ "The system clock resolution is too low");
+
+ optional<milliseconds> timeout;
+
+ if (options_->submit_handler_timeout_specified ())
+ timeout = milliseconds (options_->submit_handler_timeout () * 1000);
+
const path& handler (options_->submit_handler ());
+ // Note that due to the non-blocking mode we cannot just pass the stream
+ // to the manifest parser constructor. So we buffer the data in the string
+ // stream and then parse that.
+ //
+ stringstream ss;
+
for (;;) // Breakout loop.
try
{
@@ -542,86 +627,177 @@ handle (request& rq, response& rs)
dd));
pipe.out.close ();
- try
+ auto kill = [&pr, &warn, &handler, &ac] ()
{
- ifdstream is (move (pipe.in));
-
- // Parse and verify the manifest. Obtain the HTTP status code (must go
- // first) and cache it for the subsequent responding to the client.
- //
- parser p (is, "handler");
- manifest_name_value nv (p.next ());
-
- auto bad_value ([&p, &nv] (const string& d) {
- throw parsing (p.name (), nv.value_line, nv.value_column, d);});
-
- if (nv.empty ())
- bad_value ("empty manifest");
-
- const string& n (nv.name);
- const string& v (nv.value);
-
- // The format version pair is verified by the parser.
- //
- assert (n.empty () && v == "1");
-
- // Cache the format version pair.
- //
- rvs.push_back (move (nv));
-
- // Get and verify the HTTP status.
- //
- nv = p.next ();
- if (n != "status")
- bad_value ("no status specified");
-
- char* e (nullptr);
- unsigned long c (strtoul (v.c_str (), &e, 10)); // Can't throw.
-
- assert (e != nullptr);
-
- if (!(*e == '\0' && c >= 100 && c < 600))
- bad_value ("invalid http status '" + v + "'");
-
- // Cache the HTTP status.
+ // We may still end up well (see below), thus this is a warning.
//
- sc = static_cast<status_code> (c);
- rvs.push_back (move (nv));
+ warn << "ref " << ac << ": process " << handler
+ << " execution timeout expired";
- // Cache the remaining name/value pairs.
- //
- for (nv = p.next (); !nv.empty (); nv = p.next ())
- rvs.push_back (move (nv));
+ pr.kill ();
+ };
- // Cache end of manifest.
+ try
+ {
+ ifdstream is (move (pipe.in), fdstream_mode::non_blocking);
+
+ const size_t nbuf (8192);
+ char buf[nbuf];
+
+ while (is.is_open ())
+ {
+ time_point start;
+ milliseconds wd (10); // Max time to wait for the data portion.
+
+ if (timeout)
+ {
+ start = system_clock::now ();
+
+ if (*timeout < wd)
+ wd = *timeout;
+ }
+
+ timeval tm {wd.count () / 1000 /* seconds */,
+ wd.count () % 1000 * 1000 /* microseconds */};
+
+ fd_set rd;
+ FD_ZERO (&rd);
+ FD_SET (is.fd (), &rd);
+
+ int r (select (is.fd () + 1, &rd, nullptr, nullptr, &tm));
+
+ if (r == -1)
+ {
+ // Don't fail if the select() call was interrupted by the signal.
+ //
+ if (errno != EINTR)
+ throw io_error ("select failed",
+ error_code (errno, generic_category ()));
+ }
+ else if (r != 0) // Is data available?
+ {
+ assert (FD_ISSET (is.fd (), &rd));
+
+ // The only leagal way to read from non-blocking ifdstream.
+ //
+ streamsize n (is.readsome (buf, nbuf));
+
+ // Close the stream (and bail out) if the end of the data is
+ // reached. Otherwise cache the read data.
+ //
+ if (is.eof ())
+ is.close ();
+ else
+ {
+ // The data must be available.
+ //
+ // Note that we could keep reading until the readsome() call
+ // returns 0. However, this way we could potentially exceed the
+ // timeout significantly for some broken handler that floods us
+ // with data. So instead, we will be checking the process
+ // execution time after every data chunk read.
+ //
+ assert (n != 0);
+
+ ss.write (buf, n);
+ }
+ }
+ else // Timeout occured.
+ {
+ // Normally, we don't expect timeout to occur on the pipe read
+ // operation if the process has terminated successfully, as all its
+ // output must already be buffered (including eof). However, there
+ // can be some still running handler's child that has inherited
+ // the parent's stdout. In this case we assume that we have read
+ // all the handler's output, close the stream, log the warning and
+ // bail out.
+ //
+ if (pr.exit)
+ {
+ // We keep reading only upon successful handler termination.
+ //
+ assert (*pr.exit);
+
+ is.close ();
+
+ warn << "ref " << ac << ": process " << handler
+ << " stdout is not closed after termination (possibly "
+ << "handler's child still running)";
+ }
+ }
+
+ if (timeout)
+ {
+ time_point now (system_clock::now ());
+
+ // Assume we have waited the full amount if the time adjustment is
+ // detected.
+ //
+ duration d (now > start ? now - start : wd);
+
+ // If the timeout is not fully exhausted, then decrement it and
+ // try to read some more data from the handler' stdout. Otherwise,
+ // kill the process, if not done yet.
+ //
+ // Note that it may happen that we are killing an already
+ // terminated process, in which case kill() just sets the process
+ // exit information. On the other hand it's guaranteed that the
+ // process is terminated after the kill() call, and so the pipe is
+ // presumably closed on the write end (see above for details).
+ // Thus, if the process terminated successfully, we will continue
+ // reading until eof is reached or read timeout occurred. Yes, it
+ // may happen that we end up with a successful submission even
+ // with the kill.
+ //
+ if (*timeout > d)
+ *timeout -= duration_cast<milliseconds> (d);
+ else if (!pr.exit)
+ {
+ kill ();
+
+ assert (pr.exit);
+
+ // Close the stream (and bail out) if the process hasn't
+ // terminate successfully.
+ //
+ if (!*pr.exit)
+ is.close ();
+
+ *timeout = milliseconds::zero ();
+ }
+ }
+ }
+
+ assert (!is.is_open ());
+
+ if (!timeout)
+ pr.wait ();
+
+ // If the process is not terminated yet, then wait for its termination
+ // for the remaining time. Kill it if the timeout has been exceeded
+ // and the process still hasn't terminate.
//
- rvs.push_back (move (nv));
+ else if (!pr.exit && !pr.timed_wait (*timeout))
+ kill ();
- is.close ();
+ assert (pr.exit); // The process must finally be terminated.
- if (pr.wait ())
+ if (*pr.exit)
break; // Get out of the breakout loop.
- assert (pr.exit);
- error << "process " << handler << " " << *pr.exit;
-
- // Fall through.
- }
- catch (const parsing& e)
- {
- if (pr.wait ())
- error << "unable to parse handler's output: " << e;
+ error << "ref " << ac << ": process " << handler << " " << *pr.exit;
// Fall through.
}
catch (const io_error& e)
{
if (pr.wait ())
- error << "unable to read handler's output: " << e;
+ error << "ref " << ac << ": unable to read handler's output: " << e;
// Fall through.
}
+ stash_submit_dir ();
return respond_error ();
}
// Handle process_error and io_error (both derive from system_error).
@@ -629,6 +805,73 @@ handle (request& rq, response& rs)
catch (const system_error& e)
{
error << "unable to execute '" << handler << "': " << e;
+
+ stash_submit_dir ();
+ return respond_error ();
+ }
+
+ try
+ {
+ // Parse and verify the manifest. Obtain the HTTP status code (must go
+ // first) and cache it for the subsequent response to the client.
+ //
+ parser p (ss, "handler");
+ manifest_name_value nv (p.next ());
+
+ auto bad_value ([&p, &nv] (const string& d) {
+ throw parsing (p.name (), nv.value_line, nv.value_column, d);});
+
+ if (nv.empty ())
+ bad_value ("empty manifest");
+
+ const string& n (nv.name);
+ const string& v (nv.value);
+
+ // The format version pair is verified by the parser.
+ //
+ assert (n.empty () && v == "1");
+
+ // Cache the format version pair.
+ //
+ rvs.push_back (move (nv));
+
+ // Get and verify the HTTP status.
+ //
+ nv = p.next ();
+ if (n != "status")
+ bad_value ("no status specified");
+
+ char* e (nullptr);
+ unsigned long c (strtoul (v.c_str (), &e, 10)); // Can't throw.
+
+ assert (e != nullptr);
+
+ if (!(*e == '\0' && c >= 100 && c < 600))
+ bad_value ("invalid HTTP status '" + v + "'");
+
+ // Cache the HTTP status.
+ //
+ sc = static_cast<status_code> (c);
+ rvs.push_back (move (nv));
+
+ // Cache the remaining name/value pairs.
+ //
+ for (nv = p.next (); !nv.empty (); nv = p.next ())
+ rvs.push_back (move (nv));
+
+ // Cache end of manifest.
+ //
+ rvs.push_back (move (nv));
+ }
+ catch (const parsing& e)
+ {
+ error << "ref " << ac << ": unable to parse handler's output: " << e;
+
+ // It appears the handler had misbehaved, so let's stash the submission
+ // directory for troubleshooting.
+ //
+ stash_submit_dir ();
+
return respond_error ();
}
}
@@ -637,7 +880,7 @@ handle (request& rq, response& rs)
// serialization error log the error description and return false, on the
// stream error pass through the io_error exception, otherwise return true.
//
- auto rsm = [&rvs, &error] (ostream& os) -> bool
+ auto rsm = [&rvs, &error, &ac] (ostream& os) -> bool
{
assert (!rvs.empty ());
@@ -651,30 +894,52 @@ handle (request& rq, response& rs)
}
catch (const serialization& e)
{
- error << "unable to serialize handler's output: " << e;
+ error << "ref " << ac << ": unable to serialize handler's output: " << e;
return false;
}
};
- // Save the result manifest, if generated, into the submission directory
- // if it still exists (note that the handler could move or remove it).
+ // If the submission data directory still exists then perform an appropriate
+ // action on it, depending on the submission result status. Note that the
+ // handler could move or remove the directory.
//
- path rsf (dd / "result.manifest");
-
- if (!rvs.empty () && dir_exists (dd))
- try
+ if (dir_exists (dd))
{
- ofdstream os (rsf);
- bool r (rsm (os));
- os.close ();
+ // Remove the directory if the client error is detected.
+ //
+ if (sc >= 400 && sc < 500)
+ rmdir_r (dd);
- if (!r)
- return respond_error (); // The error description is already logged.
- }
- catch (const io_error& e)
- {
- error << "unable to write to '" << rsf << "': " << e;
- return respond_error ();
+ // Otherwise, save the result manifest, if generated, into the directory.
+ // Also stash the directory for troubleshooting in case of the server
+ // error.
+ //
+ else
+ {
+ path rsf (dd / "result.manifest");
+
+ if (!rvs.empty ())
+ try
+ {
+ ofdstream os (rsf);
+
+ // Not being able to stash the result manifest is not a reason to
+ // claim the submission failed. The error is logged nevertheless.
+ //
+ rsm (os);
+
+ os.close ();
+ }
+ catch (const io_error& e)
+ {
+ // Not fatal (see above).
+ //
+ error << "unable to write to '" << rsf << "': " << e;
+ }
+
+ if (sc >= 500 && sc < 600)
+ stash_submit_dir ();
+ }
}
// Send email, if configured, and the submission is not simulated.