postgresql/contrib/rserv/RServ.pm
2003-06-25 21:37:53 +00:00

762 lines
16 KiB
Perl

# -*- perl -*-
# RServ.pm
# Vadim Mikheev, (c) 2000, PostgreSQL Inc.
package RServ;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);
@EXPORT_OK = qw();
use Pg;
$debug = 0;
$quiet = 1;
my %Mtables = ();
my %Stables = ();
sub PrepareSnapshot
{
my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);
my $result = $conn->exec("BEGIN");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("set transaction isolation level serializable");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
# MAP oid --> tabname, keyname
$result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .
" from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
" where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
" and pga.attnum = rt.key");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my @row;
while (@row = $result->fetchrow)
{
# printf "$row[0], $row[1], $row[2]\n";
push @{$Mtables{$row[0]}}, $row[1], $row[2];
}
# Read last succeeded sync
$sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
" where server = $server and syncid = (select max(syncid) from" .
" _RSERV_SYNC_ where server = $server and status > 0)";
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my @lastsync = $result->fetchrow;
my $sinfo = "";
if ($lastsync[3] ne '') # sync info
{
$sinfo = "and (l.logid >= $lastsync[3]";
$sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
$sinfo .= ")";
}
my $havedeal = 0;
# DELETED rows
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
" where l.deleted = 1 $sinfo order by l.reloid";
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$lastoid = '';
while (@row = $result->fetchrow)
{
next unless exists $Mtables{$row[0]};
if ($lastoid != $row[0])
{
if ($lastoid eq '')
{
my $syncid = GetSYNCID($conn, $outf);
return($syncid) if $syncid < 0;
$havedeal = 1;
}
else
{
printf $outf "\\.\n";
}
printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
$lastoid = $row[0];
}
if (! defined $row[1])
{
print STDERR "NULL key\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
printf $outf "%s\n", OutputValue($row[1]);
}
printf $outf "\\.\n" if $lastoid ne '';
# UPDATED rows
my ($taboid, $tabname, $tabkey);
foreach $taboid (keys %Mtables)
{
($tabname, $tabkey) = @{$Mtables{$taboid}};
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
$sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .
" $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .
" and l.key = _$tabname.${tabkey}::text";
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
printf $outf "-- ERROR\n" if $havedeal;
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
next if $result->ntuples <= 0;
if (! $havedeal)
{
my $syncid = GetSYNCID($conn, $outf);
return($syncid) if $syncid < 0;
$havedeal = 1;
}
printf $outf "-- UPDATE $tabname\n";
while (@row = $result->fetchrow)
{
for ($i = 0; $i <= $#row; $i++)
{
printf $outf " " if $i;
printf $outf "%s", OutputValue($row[$i]);
}
printf $outf "\n";
}
printf $outf "\\.\n";
}
unless ($havedeal)
{
$conn->exec("ROLLBACK");
return(0);
}
# Remember this snapshot info
$result = $conn->exec("select _rserv_sync_($server)");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
printf $outf "-- ERROR\n";
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("COMMIT");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
printf $outf "-- ERROR\n";
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
printf $outf "-- OK\n";
return(1);
}
sub OutputValue
{
my ($val) = @_; # @_[0];
return("\\N") unless defined $val;
$val =~ s/\\/\\\\/g;
$val =~ s/ /\\011/g;
$val =~ s/\n/\\012/g;
$val =~ s/\'/\\047/g;
return($val);
}
# Get syncid for new snapshot
sub GetSYNCID
{
my ($conn, $outf) = @_; # (@_[0], @_[1]);
my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my @row = $result->fetchrow;
printf $outf "-- SYNCID $row[0]\n";
return($row[0]);
}
sub CleanLog
{
my ($conn, $howold) = @_; # (@_[0], @_[1]);
my $result = $conn->exec("BEGIN");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
" where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
" where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
my $maxid = '';
my %active = ();
while (my @row = $result->fetchrow)
{
$maxid = $row[0] if $maxid eq '';
last if $row[0] > $maxid;
my @ids = split(/[ ]+,[ ]+/, $row[1]);
foreach $aid (@ids)
{
$active{$aid} = 1 unless exists $active{$aid};
}
}
if ($maxid eq '')
{
print STDERR "No Sync IDs\n" unless ($quiet);
return(0);
}
my $alist = join(',', keys %active);
my $sinfo = "logid < $maxid";
$sinfo .= " and logid not in ($alist)" if $alist ne '';
$sql = "delete from _RSERV_LOG_ where " .
"logtime < now() - '$howold second'::interval and $sinfo";
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$maxid = $result->cmdTuples;
$result = $conn->exec("COMMIT");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
return($maxid);
}
sub ApplySnapshot
{
my ($conn, $inpf) = @_; # (@_[0], @_[1]);
my $result = $conn->exec("BEGIN");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
# MAP name --> oid, keyname, keynum
my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
" from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
" where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
" and pga.attnum = rt.key";
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
while (@row = $result->fetchrow)
{
# printf " %s %s\n", $row[1], $row[0];
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
}
my $ok = 0;
my $syncid = '';
while(<$inpf>)
{
$_ =~ s/\n//;
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
if ($cmt ne '--')
{
printf STDERR "Invalid format\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
if ($cmd eq 'DELETE')
{
if ($syncid eq '')
{
printf STDERR "Sync ID unspecified\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
$result = DoDelete($conn, $inpf, $prm);
if ($result)
{
$conn->exec("ROLLBACK");
return($result);
}
}
elsif ($cmd eq 'UPDATE')
{
if ($syncid eq '')
{
printf STDERR "Sync ID unspecified\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
$result = DoUpdate($conn, $inpf, $prm);
if ($result)
{
$conn->exec("ROLLBACK");
return($result);
}
}
elsif ($cmd eq 'SYNCID')
{
if ($syncid ne '')
{
printf STDERR "Second Sync ID ?!\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
if ($prm !~ /^\d+$/)
{
printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
$syncid = $prm;
printf STDERR "Sync ID $syncid\n" unless ($quiet);
$result = $conn->exec("select syncid, synctime from " .
"_RSERV_SLAVE_SYNC_ where syncid = " .
"(select max(syncid) from _RSERV_SLAVE_SYNC_)");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my @row = $result->fetchrow;
if (! defined $row[0])
{
$result = $conn->exec("insert into" .
" _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");
}
elsif ($row[0] >= $prm)
{
printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(0);
}
else
{
$result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
" set syncid = $syncid, synctime = now()");
}
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
}
elsif ($cmd eq 'OK')
{
$ok = 1;
last;
}
elsif ($cmd eq 'ERROR')
{
printf STDERR "ERROR signaled\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
else
{
printf STDERR "Unknown command $cmd\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
}
if (! $ok)
{
printf STDERR "No OK flag in input\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(-2);
}
$result = $conn->exec("COMMIT");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
return(1);
}
sub DoDelete
{
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
my $ok = 0;
while(<$inpf>)
{
if ($_ !~ /\n$/)
{
printf STDERR "Invalid format\n" unless ($quiet);
return(-2);
}
my $key = $_;
$key =~ s/\n//;
if ($key eq '\.')
{
$ok = 1;
last;
}
my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";
printf "$sql\n" if $debug;
my $result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
}
if (! $ok)
{
printf STDERR "No end of input in DELETE section\n" unless ($quiet);
return(-2);
}
return(0);
}
sub DoUpdate
{
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
my @CopyBuf = ();
my $CBufLen = 0;
my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
my $sql = "select attnum, attname from pg_attribute" .
" where attrelid = $Stables{$tabname}->[0] and attnum > 0";
my $result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
my @anames = ();
while (@row = $result->fetchrow)
{
$anames[$row[0]] = $row[1];
}
my $istring;
my $ok = 0;
while(<$inpf>)
{
if ($_ !~ /\n$/)
{
printf STDERR "Invalid format\n" unless ($quiet);
return(-2);
}
$istring = $_;
$istring =~ s/\n//;
if ($istring eq '\.')
{
$ok = 1;
last;
}
my @vals = split(/ /, $istring);
if ($oidkey)
{
if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
{
printf STDERR "Invalid OID\n" unless ($quiet);
return(-2);
}
$oidkey = $vals[0];
}
else
{
unshift @vals, '';
}
$sql = "update $tabname set ";
my $ocnt = 0;
for (my $i = 1; $i <= $#anames; $i++)
{
if ($vals[$i] eq '\N')
{
if ($i == $Stables{$tabname}->[2])
{
printf STDERR "NULL key\n" unless ($quiet);
return(-2);
}
$vals[$i] = 'null';
}
else
{
$vals[$i] = "'" . $vals[$i] . "'";
next if $i == $Stables{$tabname}->[2];
}
$ocnt++;
$sql .= ', ' if $ocnt > 1;
$sql .= "$anames[$i] = $vals[$i]";
}
if ($oidkey)
{
$sql .= " where $Stables{$tabname}->[1] = $oidkey";
}
else
{
$sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";
}
printf "$sql\n" if $debug;
$result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
next if $result->cmdTuples == 1; # updated
if ($result->cmdTuples > 1)
{
printf STDERR "Duplicate keys\n" unless ($quiet);
return(-2);
}
# no key - copy
push @CopyBuf, "$istring\n";
$CBufLen += length($istring);
if ($CBufLen >= $CBufMax)
{
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
return($result) if $result;
@CopyBuf = ();
$CBufLen = 0;
}
}
if (! $ok)
{
printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
return(-2);
}
if ($CBufLen)
{
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
return($result) if $result;
}
return(0);
}
sub DoCopy
{
my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .
"FROM STDIN";
my $result = $conn->exec($sql);
if ($result->resultStatus ne PGRES_COPY_IN)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
foreach $str (@{$CBuf})
{
$conn->putline($str);
}
$conn->putline("\\.\n");
if ($conn->endcopy)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
return(0);
}
#
# Returns last SyncID applied on Slave
#
sub GetSyncID
{
my ($conn) = @_; # (@_[0]);
my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
return(-1);
}
my @row = $result->fetchrow;
return(undef) unless defined $row[0]; # null
return($row[0]);
}
#
# Updates _RSERV_SYNC_ on Master with Slave SyncID
#
sub SyncSyncID
{
my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
my $result = $conn->exec("BEGIN");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
" where server = $server and syncid = $syncid" .
" for update");
if ($result->resultStatus ne PGRES_TUPLES_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
my @row = $result->fetchrow;
if (! defined $row[0])
{
printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(0);
}
if ($row[1] > 0)
{
printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);
$conn->exec("ROLLBACK");
return(0);
}
$result = $conn->exec("update _RSERV_SYNC_" .
" set synctime = now(), status = 1" .
" where server = $server and syncid = $syncid");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("delete from _RSERV_SYNC_" .
" where server = $server and syncid < $syncid");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
$result = $conn->exec("COMMIT");
if ($result->resultStatus ne PGRES_COMMAND_OK)
{
print STDERR $conn->errorMessage unless ($quiet);
$conn->exec("ROLLBACK");
return(-1);
}
return(1);
}
1;