Rearrange dblink's dblink_build_sql_insert() and related routines to open and

lock the target relation just once per SQL function call.  The original coding
obtained and released lock several times per call.  Aside from saving a
not-insignificant number of cycles, this eliminates possible race conditions
if someone tries to modify the relation's schema concurrently.  Also
centralize locking and permission-checking logic.

Problem noted while investigating a trouble report from Robert Voinea --- his
problem is still to be fixed, though.
This commit is contained in:
Tom Lane 2010-06-14 20:49:51 +00:00
parent 7bd31c6715
commit ecb23d8b8b

View File

@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr> * Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in> * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
* *
* $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.60.2.7 2010/06/09 00:59:54 itagaki Exp $ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.60.2.8 2010/06/14 20:49:51 tgl Exp $
* Copyright (c) 2001-2006, PostgreSQL Global Development Group * Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
@ -52,6 +52,7 @@
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "parser/scansup.h" #include "parser/scansup.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/array.h" #include "utils/array.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/dynahash.h" #include "utils/dynahash.h"
@ -80,20 +81,20 @@ static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void); static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn); static void createNewConnection(const char *name, remoteConn * rconn);
static void deleteConnection(const char *name); static void deleteConnection(const char *name);
static char **get_pkey_attnames(Oid relid, int16 *numatts); static char **get_pkey_attnames(Relation rel, int16 *numatts);
static char **get_text_array_contents(ArrayType *array, int *numitems); static char **get_text_array_contents(ArrayType *array, int *numitems);
static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr); static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr); static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key); static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals); static HeapTuple get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text); static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
static char *generate_relation_name(Oid relid); static char *generate_relation_name(Relation rel);
static char *connstr_strip_password(const char *connstr); static char *connstr_strip_password(const char *connstr);
static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr); static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
static int get_nondropped_natts(Oid relid); static int get_nondropped_natts(Relation rel);
/* Global */ /* Global */
static remoteConn *pconn = NULL; static remoteConn *pconn = NULL;
@ -1249,7 +1250,6 @@ Datum
dblink_get_pkey(PG_FUNCTION_ARGS) dblink_get_pkey(PG_FUNCTION_ARGS)
{ {
int16 numatts; int16 numatts;
Oid relid;
char **results; char **results;
FuncCallContext *funcctx; FuncCallContext *funcctx;
int32 call_cntr; int32 call_cntr;
@ -1260,7 +1260,8 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
/* stuff done only on the first call of the function */ /* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) if (SRF_IS_FIRSTCALL())
{ {
TupleDesc tupdesc = NULL; Relation rel;
TupleDesc tupdesc;
/* create a function context for cross-call persistence */ /* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT(); funcctx = SRF_FIRSTCALL_INIT();
@ -1270,13 +1271,13 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
*/ */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* convert relname to rel Oid */ /* open target relation */
relid = get_relid_from_relname(PG_GETARG_TEXT_P(0)); rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
if (!OidIsValid(relid))
ereport(ERROR, /* get the array of attnums */
(errcode(ERRCODE_UNDEFINED_TABLE), results = get_pkey_attnames(rel, &numatts);
errmsg("relation \"%s\" does not exist",
GET_STR(PG_GETARG_TEXT_P(0))))); relation_close(rel, AccessShareLock);
/* /*
* need a tuple descriptor representing one INT and one TEXT column * need a tuple descriptor representing one INT and one TEXT column
@ -1294,9 +1295,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
attinmeta = TupleDescGetAttInMetadata(tupdesc); attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta; funcctx->attinmeta = attinmeta;
/* get an array of attnums */
results = get_pkey_attnames(relid, &numatts);
if ((results != NULL) && (numatts > 0)) if ((results != NULL) && (numatts > 0))
{ {
funcctx->max_calls = numatts; funcctx->max_calls = numatts;
@ -1383,7 +1381,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
int32 pknumatts_tmp = PG_GETARG_INT32(2); int32 pknumatts_tmp = PG_GETARG_INT32(2);
ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
Oid relid; Relation rel;
int16 pknumatts = 0; int16 pknumatts = 0;
char **src_pkattvals; char **src_pkattvals;
char **tgt_pkattvals; char **tgt_pkattvals;
@ -1393,14 +1391,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
int nondropped_natts; int nondropped_natts;
/* /*
* Convert relname to rel OID. * Open target relation.
*/ */
relid = get_relid_from_relname(relname_text); rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
GET_STR(relname_text))));
/* /*
* There should be at least one key attribute * There should be at least one key attribute
@ -1422,7 +1415,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have * ensure we don't ask for more pk attributes than we have
* non-dropped columns * non-dropped columns
*/ */
nondropped_natts = get_nondropped_natts(relid); nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts) if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes"))); errmsg("number of primary key fields exceeds number of specified relation attributes")));
@ -1460,7 +1453,12 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
/* /*
* Prep work is finally done. Go get the SQL string. * Prep work is finally done. Go get the SQL string.
*/ */
sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Now we can close the relation.
*/
relation_close(rel, AccessShareLock);
/* /*
* And send it * And send it
@ -1493,21 +1491,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
int32 pknumatts_tmp = PG_GETARG_INT32(2); int32 pknumatts_tmp = PG_GETARG_INT32(2);
ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
int nondropped_natts; int nondropped_natts;
Oid relid; Relation rel;
int16 pknumatts = 0; int16 pknumatts = 0;
char **tgt_pkattvals; char **tgt_pkattvals;
int tgt_nitems; int tgt_nitems;
char *sql; char *sql;
/* /*
* Convert relname to rel OID. * Open target relation.
*/ */
relid = get_relid_from_relname(relname_text); rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
GET_STR(relname_text))));
/* /*
* There should be at least one key attribute * There should be at least one key attribute
@ -1529,7 +1522,7 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have * ensure we don't ask for more pk attributes than we have
* non-dropped columns * non-dropped columns
*/ */
nondropped_natts = get_nondropped_natts(relid); nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts) if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes"))); errmsg("number of primary key fields exceeds number of specified relation attributes")));
@ -1552,7 +1545,12 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
/* /*
* Prep work is finally done. Go get the SQL string. * Prep work is finally done. Go get the SQL string.
*/ */
sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
/*
* Now we can close the relation.
*/
relation_close(rel, AccessShareLock);
/* /*
* And send it * And send it
@ -1590,7 +1588,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
int nondropped_natts; int nondropped_natts;
Oid relid; Relation rel;
int16 pknumatts = 0; int16 pknumatts = 0;
char **src_pkattvals; char **src_pkattvals;
char **tgt_pkattvals; char **tgt_pkattvals;
@ -1599,14 +1597,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
char *sql; char *sql;
/* /*
* Convert relname to rel OID. * Open target relation.
*/ */
relid = get_relid_from_relname(relname_text); rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
GET_STR(relname_text))));
/* /*
* There should be one source array key values for each key attnum * There should be one source array key values for each key attnum
@ -1628,7 +1621,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
* ensure we don't ask for more pk attributes than we have * ensure we don't ask for more pk attributes than we have
* non-dropped columns * non-dropped columns
*/ */
nondropped_natts = get_nondropped_natts(relid); nondropped_natts = get_nondropped_natts(rel);
if (pknumatts > nondropped_natts) if (pknumatts > nondropped_natts)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("number of primary key fields exceeds number of specified relation attributes"))); errmsg("number of primary key fields exceeds number of specified relation attributes")));
@ -1666,7 +1659,12 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
/* /*
* Prep work is finally done. Go get the SQL string. * Prep work is finally done. Go get the SQL string.
*/ */
sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Now we can close the relation.
*/
relation_close(rel, AccessShareLock);
/* /*
* And send it * And send it
@ -1703,7 +1701,7 @@ dblink_current_query(PG_FUNCTION_ARGS)
* Return NULL, and set numatts = 0, if no primary key exists. * Return NULL, and set numatts = 0, if no primary key exists.
*/ */
static char ** static char **
get_pkey_attnames(Oid relid, int16 *numatts) get_pkey_attnames(Relation rel, int16 *numatts)
{ {
Relation indexRelation; Relation indexRelation;
ScanKeyData entry; ScanKeyData entry;
@ -1711,22 +1709,19 @@ get_pkey_attnames(Oid relid, int16 *numatts)
HeapTuple indexTuple; HeapTuple indexTuple;
int i; int i;
char **result = NULL; char **result = NULL;
Relation rel;
TupleDesc tupdesc; TupleDesc tupdesc;
/* open relation using relid, get tupdesc */
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
/* initialize numatts to 0 in case no primary key exists */ /* initialize numatts to 0 in case no primary key exists */
*numatts = 0; *numatts = 0;
tupdesc = rel->rd_att;
/* use relid to get all related indexes */ /* use relid to get all related indexes */
indexRelation = heap_open(IndexRelationId, AccessShareLock); indexRelation = heap_open(IndexRelationId, AccessShareLock);
ScanKeyInit(&entry, ScanKeyInit(&entry,
Anum_pg_index_indrelid, Anum_pg_index_indrelid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid)); ObjectIdGetDatum(RelationGetRelid(rel)));
scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
@ -1749,7 +1744,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
} }
heap_endscan(scan); heap_endscan(scan);
heap_close(indexRelation, AccessShareLock); heap_close(indexRelation, AccessShareLock);
relation_close(rel, AccessShareLock);
return result; return result;
} }
@ -1816,9 +1810,8 @@ get_text_array_contents(ArrayType *array, int *numitems)
} }
static char * static char *
get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{ {
Relation rel;
char *relname; char *relname;
HeapTuple tuple; HeapTuple tuple;
TupleDesc tupdesc; TupleDesc tupdesc;
@ -1832,16 +1825,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
initStringInfo(&buf); initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */ /* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(relid); relname = generate_relation_name(rel);
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att; tupdesc = rel->rd_att;
natts = tupdesc->natts; natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
if (!tuple) if (!tuple)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION), (errcode(ERRCODE_CARDINALITY_VIOLATION),
@ -1898,14 +1887,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
} }
appendStringInfo(&buf, ")"); appendStringInfo(&buf, ")");
relation_close(rel, AccessShareLock);
return (buf.data); return (buf.data);
} }
static char * static char *
get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals) get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{ {
Relation rel;
char *relname; char *relname;
TupleDesc tupdesc; TupleDesc tupdesc;
int natts; int natts;
@ -1915,12 +1902,8 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
initStringInfo(&buf); initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */ /* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(relid); relname = generate_relation_name(rel);
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att; tupdesc = rel->rd_att;
natts = tupdesc->natts; natts = tupdesc->natts;
@ -1946,14 +1929,12 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
appendStringInfo(&buf, " IS NULL"); appendStringInfo(&buf, " IS NULL");
} }
relation_close(rel, AccessShareLock);
return (buf.data); return (buf.data);
} }
static char * static char *
get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{ {
Relation rel;
char *relname; char *relname;
HeapTuple tuple; HeapTuple tuple;
TupleDesc tupdesc; TupleDesc tupdesc;
@ -1967,16 +1948,12 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
initStringInfo(&buf); initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */ /* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(relid); relname = generate_relation_name(rel);
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att; tupdesc = rel->rd_att;
natts = tupdesc->natts; natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
if (!tuple) if (!tuple)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION), (errcode(ERRCODE_CARDINALITY_VIOLATION),
@ -2042,7 +2019,6 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
appendStringInfo(&buf, " IS NULL"); appendStringInfo(&buf, " IS NULL");
} }
relation_close(rel, AccessShareLock);
return (buf.data); return (buf.data);
} }
@ -2098,9 +2074,8 @@ get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
} }
static HeapTuple static HeapTuple
get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals) get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
{ {
Relation rel;
char *relname; char *relname;
TupleDesc tupdesc; TupleDesc tupdesc;
StringInfoData buf; StringInfoData buf;
@ -2111,14 +2086,9 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
initStringInfo(&buf); initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */ /* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(relid); relname = generate_relation_name(rel);
/* tupdesc = rel->rd_att;
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = CreateTupleDescCopy(rel->rd_att);
relation_close(rel, AccessShareLock);
/* /*
* Connect to SPI manager * Connect to SPI manager
@ -2189,52 +2159,49 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
return NULL; return NULL;
} }
static Oid /*
get_relid_from_relname(text *relname_text) * Open the relation named by relname_text, acquire specified type of lock,
* verify we have specified permissions.
* Caller must close rel when done with it.
*/
static Relation
get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
{ {
RangeVar *relvar; RangeVar *relvar;
Relation rel; Relation rel;
Oid relid; AclResult aclresult;
relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text)); relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
rel = heap_openrv(relvar, AccessShareLock); rel = heap_openrv(relvar, lockmode);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
return relid; aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
aclmode);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(rel));
return rel;
} }
/* /*
* generate_relation_name - copied from ruleutils.c * generate_relation_name - copied from ruleutils.c
* Compute the name to display for a relation specified by OID * Compute the name to display for a relation
* *
* The result includes all necessary quoting and schema-prefixing. * The result includes all necessary quoting and schema-prefixing.
*/ */
static char * static char *
generate_relation_name(Oid relid) generate_relation_name(Relation rel)
{ {
HeapTuple tp;
Form_pg_class reltup;
char *nspname; char *nspname;
char *result; char *result;
tp = SearchSysCache(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0);
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for relation %u", relid);
reltup = (Form_pg_class) GETSTRUCT(tp);
/* Qualify the name if not visible in search path */ /* Qualify the name if not visible in search path */
if (RelationIsVisible(relid)) if (RelationIsVisible(RelationGetRelid(rel)))
nspname = NULL; nspname = NULL;
else else
nspname = get_namespace_name(reltup->relnamespace); nspname = get_namespace_name(rel->rd_rel->relnamespace);
result = quote_qualified_identifier(nspname, NameStr(reltup->relname)); result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
ReleaseSysCache(tp);
return result; return result;
} }
@ -2479,15 +2446,13 @@ dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
} }
static int static int
get_nondropped_natts(Oid relid) get_nondropped_natts(Relation rel)
{ {
int nondropped_natts = 0; int nondropped_natts = 0;
TupleDesc tupdesc; TupleDesc tupdesc;
Relation rel;
int natts; int natts;
int i; int i;
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att; tupdesc = rel->rd_att;
natts = tupdesc->natts; natts = tupdesc->natts;
@ -2498,7 +2463,6 @@ get_nondropped_natts(Oid relid)
nondropped_natts++; nondropped_natts++;
} }
relation_close(rel, AccessShareLock);
return nondropped_natts; return nondropped_natts;
} }