runtests: change runner interface to be asynchronous

Program arguments are marshalled and then written to the end of a pipe
which is later read from and the arguments unmarshalled before the
desired function is called normally.  The function return values are
then marshalled and written into another pipe when is later read from
and unmarshalled before being returned to the caller.

The implementation is currently blocking but can be made non-blocking
without any changes to the API.  This allows calling multiple runners
without blocking in the future.

Ref: #10818
This commit is contained in:
Dan Fandrich 2023-04-25 18:03:54 -07:00
parent 0754de758a
commit a98277fcc7
2 changed files with 210 additions and 14 deletions

View File

@ -28,6 +28,7 @@ package runner;
use strict;
use warnings;
use 5.006;
BEGIN {
use base qw(Exporter);
@ -37,10 +38,13 @@ BEGIN {
prepro
restore_test_env
runner_init
runner_clearlocks
runner_stopservers
runner_test_preprocess
runner_test_run
runnerac_clearlocks
runnerac_shutdown
runnerac_stopservers
runnerac_test_preprocess
runnerac_test_run
runnerar
runnerar_ready
stderrfilename
stdoutfilename
$DBGCURL
@ -60,6 +64,14 @@ BEGIN {
);
}
use B qw(
svref_2object
);
use Storable qw(
freeze
thaw
);
use pathhelp qw(
exe_ext
);
@ -105,6 +117,10 @@ my $CURLLOG="$LOGDIR/commands.log"; # all command lines run
my $SERVERLOGS_LOCK="$LOGDIR/serverlogs.lock"; # server logs advisor read lock
my $defserverlogslocktimeout = 2; # timeout to await server logs lock removal
my $defpostcommanddelay = 0; # delay between command and postcheck sections
my $controllerw; # pipe that controller writes to
my $runnerr; # pipe that runner reads from
my $runnerw; # pipe that runner writes to
my $controllerr; # pipe that controller reads from
# redirected stdout/stderr to these files
@ -120,7 +136,9 @@ sub stderrfilename {
#######################################################################
# Initialize the runner and prepare it to run tests
#
# The runner ID returned by this function must be passed into the other
# runnerac_* functions
# Called by controller
sub runner_init {
my ($logdir)=@_;
@ -138,6 +156,13 @@ sub runner_init {
$ENV{'CURL_HOME'}=$ENV{'HOME'};
$ENV{'XDG_CONFIG_HOME'}=$ENV{'HOME'};
$ENV{'COLUMNS'}=79; # screen width!
# create pipes for communication with runner
pipe $runnerr, $controllerw;
pipe $controllerr, $runnerw;
# There is only one runner right now
return "singleton";
}
#######################################################################
@ -1034,6 +1059,151 @@ sub runner_test_run {
return (0, clearlogs(), \%testtimings, $cmdres, $CURLOUT, $tool, $usedvalgrind);
}
# Async call runner_clearlocks
# Called by controller
sub runnerac_clearlocks {
controlleripccall(\&runner_clearlocks, @_);
}
# Async call runner_shutdown
# This call does NOT generate an IPC response and must be the last IPC call
# received.
# Called by controller
sub runnerac_shutdown {
controlleripccall(\&runner_shutdown, @_);
# These have no more use
close($controllerw);
undef $controllerw;
close($controllerr);
undef $controllerr;
}
# Async call of runner_stopservers
# Called by controller
sub runnerac_stopservers {
controlleripccall(\&runner_stopservers, @_);
}
# Async call of runner_test_preprocess
# Called by controller
sub runnerac_test_preprocess {
controlleripccall(\&runner_test_preprocess, @_);
}
# Async call of runner_test_run
# Called by controller
sub runnerac_test_run {
controlleripccall(\&runner_test_run, @_);
}
###################################################################
# Call an arbitrary function via IPC
# The first argument is the function reference, the second is the runner ID
# Called by controller (indirectly, via a more specific function)
sub controlleripccall {
my $funcref = shift @_;
my $runnerid = shift @_;
# Get the name of the function from the reference
my $cv = svref_2object($funcref);
my $gv = $cv->GV;
# Prepend the name to the function arguments so it's marshalled along with them
unshift @_, $gv->NAME;
# Marshall the arguments into a flat string
my $margs = freeze \@_;
# Send IPC call via pipe
syswrite($controllerw, (pack "L", length($margs)) . $margs);
# Call the remote function
# TODO: this will eventually be done in a separate runner process
# kicked off by runner_init()
ipcrecv();
}
###################################################################
# Receive async response of a previous call via IPC
# The first return value is the runner ID
# Called by controller
sub runnerar {
my $datalen;
if (sysread($controllerr, $datalen, 4) <= 0) {
die "error in runnerar\n";
}
my $len=unpack("L", $datalen);
my $buf;
if (sysread($controllerr, $buf, $len) <= 0) {
die "error in runnerar\n";
}
# Decode response values
my $resarrayref = thaw $buf;
# First argument is runner ID
unshift @$resarrayref, "singleton";
return @$resarrayref;
}
###################################################################
# Returns nonzero if a response from an async call is ready
# Called by controller
sub runnerar_ready {
my ($blocking) = @_;
my $rin = "";
vec($rin, fileno($controllerr), 1) = 1;
return select(my $rout=$rin, undef, my $eout=$rin, $blocking ? undef : 0);
}
###################################################################
# Receive an IPC call in the runner and execute it
# The IPC is read from the $runnerr pipe and the response is
# written to the $runnerw pipe
sub ipcrecv {
my $datalen;
if (sysread($runnerr, $datalen, 4) <= 0) {
die "error in ipcrecv\n";
}
my $len=unpack("L", $datalen);
my $buf;
if (sysread($runnerr, $buf, $len) <= 0) {
die "error in ipcrecv\n";
}
# Decode the function name and arguments
my $argsarrayref = thaw $buf;
# The name of the function to call is the frist argument
my $funcname = shift @$argsarrayref;
# print "ipcrecv $funcname\n";
# Synchronously call the desired function
my @res;
if($funcname eq "runner_clearlocks") {
@res = runner_clearlocks(@$argsarrayref);
}
elsif($funcname eq "runner_shutdown") {
runner_shutdown(@$argsarrayref);
# Special case: no response
return;
}
elsif($funcname eq "runner_stopservers") {
@res = runner_stopservers(@$argsarrayref);
}
elsif($funcname eq "runner_test_preprocess") {
@res = runner_test_preprocess(@$argsarrayref);
}
elsif($funcname eq "runner_test_run") {
@res = runner_test_run(@$argsarrayref);
} else {
die "Unknown IPC function $funcname\n";
}
# print "ipcrecv results\n";
# Marshall the results to return
$buf = freeze \@res;
syswrite($runnerw, (pack "L", length($buf)) . $buf);
}
###################################################################
# Kill the server processes that still have lock files in a directory
@ -1055,5 +1225,14 @@ sub runner_stopservers {
return ($error, $logs);
}
###################################################################
# Shut down this runner
sub runner_shutdown {
close($runnerr);
undef $runnerr;
close($runnerw);
undef $runnerw;
}
1;

View File

@ -150,6 +150,7 @@ my %timetoolini; # timestamp for each test command run starting
my %timetoolend; # timestamp for each test command run stopping
my %timesrvrlog; # timestamp for each test server logs lock removal
my %timevrfyend; # timestamp for each test result verification end
my $runnerid; # ID for runner async calls
#######################################################################
# variables that command line options may set
@ -185,8 +186,13 @@ sub logmsg {
sub catch_zap {
my $signame = shift;
logmsg "runtests.pl received SIG$signame, exiting\n";
my ($unexpected, $logs) = runner_stopservers();
logmsg $logs;
# TODO: make this set a flag that is checked in the main test loop
if($runnerid) {
runnerac_stopservers($runnerid);
runnerar(); # ignore the results
# Kill the runner entirely
runnerac_shutdown($runnerid);
}
die "Somebody sent me a SIG$signame";
}
$SIG{INT} = \&catch_zap;
@ -1403,7 +1409,8 @@ sub singletest_check {
if(!$filename) {
logmsg "ERROR: section verify=>file$partsuffix ".
"has no name attribute\n";
my ($unexpected, $logs) = runner_stopservers();
runnerac_stopservers($runnerid);
my ($rid, $unexpected, $logs) = runnerar();
logmsg $logs;
# timestamp test result verification end
$timevrfyend{$testnum} = Time::HiRes::time();
@ -1620,7 +1627,8 @@ sub singletest {
# first, remove all lingering log files
if(!cleardir($logdir) && $clearlocks) {
my $logs = runner_clearlocks($logdir);
runnerac_clearlocks($runnerid, $logdir);
my ($rid, $logs) = runnerar();
logmsg $logs;
cleardir($logdir);
}
@ -1641,7 +1649,8 @@ sub singletest {
# Register the test case with the CI environment
citest_starttest($testnum);
my ($why, $error, $logs, $testtimings) = runner_test_preprocess($testnum);
runnerac_test_preprocess($runnerid, $testnum);
my ($rid, $why, $error, $logs, $testtimings) = runnerar();
logmsg $logs;
if($error == -2) {
if($postmortem) {
@ -1667,7 +1676,8 @@ sub singletest {
my $CURLOUT;
my $tool;
my $usedvalgrind;
($error, $logs, $testtimings, $cmdres, $CURLOUT, $tool, $usedvalgrind) = runner_test_run($testnum);
runnerac_test_run($runnerid, $testnum);
($rid, $error, $logs, $testtimings, $cmdres, $CURLOUT, $tool, $usedvalgrind) = runnerar();
logmsg $logs;
updatetesttimings($testnum, %$testtimings);
if($error == -1) {
@ -2251,7 +2261,6 @@ setlogfunc(\&logmsg);
#
if(!$listonly) {
unlink("$LOGDIR/$MEMDUMP"); # remove this if there was one left
checksystemfeatures();
}
@ -2513,7 +2522,7 @@ citest_starttestrun();
# Initialize the runner to prepare to run tests
cleardir($LOGDIR);
mkdir($LOGDIR, 0777);
runner_init($LOGDIR);
$runnerid = runner_init($LOGDIR);
#######################################################################
# The main test-loop
@ -2567,9 +2576,17 @@ my $sofar = time() - $start;
citest_finishtestrun();
# Tests done, stop the servers
my ($unexpected, $logs) = runner_stopservers();
runnerac_stopservers($runnerid);
my ($rid, $unexpected, $logs) = runnerar();
logmsg $logs;
# Kill the runner
# There is a race condition here since we don't know exactly when the runner
# has finished shutting itself down
runnerac_shutdown($runnerid);
undef $runnerid;
sleep 0; # give runner a chance to run
my $numskipped = %skipped ? sum values %skipped : 0;
my $all = $total + $numskipped;