mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-24 18:55:04 +08:00
Prevent adding relations to a concurrently dropped schema.
In the previous coding, it was possible for a relation to be created via CREATE TABLE, CREATE VIEW, CREATE SEQUENCE, CREATE FOREIGN TABLE, etc. in a schema while that schema was meanwhile being concurrently dropped. This led to a pg_class entry with an invalid relnamespace value. The same problem could occur if a relation was moved using ALTER .. SET SCHEMA while the target schema was being concurrently dropped. This patch prevents both of those scenarios by locking the schema to which the relation is being added using AccessShareLock, which conflicts with the AccessExclusiveLock taken by DROP. As a desirable side effect, this also prevents the use of CREATE OR REPLACE VIEW to queue for an AccessExclusiveLock on a relation on which you have no rights: that will now fail immediately with a permissions error, before trying to obtain a lock. We need similar protection for all other object types, but as everything other than relations uses a slightly different set of code paths, I'm leaving that for a separate commit. Original complaint (as far as I could find) about CREATE by Nikhil Sontakke; risk for ALTER .. SET SCHEMA pointed out by Tom Lane; further details by Dan Farina; patch by me; review by Hitoshi Harada.
This commit is contained in:
parent
01d83ffdca
commit
1575fbcb79
@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
|
||||
|
||||
/*
|
||||
* RangeVarGetAndCheckCreationNamespace
|
||||
* As RangeVarGetCreationNamespace, but with a permissions check.
|
||||
*
|
||||
* This function returns the OID of the namespace in which a new relation
|
||||
* with a given name should be created. If the user does not have CREATE
|
||||
* permission on the target namespace, this function will instead signal
|
||||
* an ERROR.
|
||||
*
|
||||
* If non-NULL, *existing_oid is set to the OID of any existing relation with
|
||||
* the same name which already exists in that namespace, or to InvalidOid if
|
||||
* no such relation exists.
|
||||
*
|
||||
* If lockmode != NoLock, the specified lock mode is acquire on the existing
|
||||
* relation, if any, provided that the current user owns the target relation.
|
||||
* However, if lockmode != NoLock and the user does not own the target
|
||||
* relation, we throw an ERROR, as we must not try to lock relations the
|
||||
* user does not have permissions on.
|
||||
*
|
||||
* As a side effect, this function acquires AccessShareLock on the target
|
||||
* namespace. Without this, the namespace could be dropped before our
|
||||
* transaction commits, leaving behind relations with relnamespace pointing
|
||||
* to a no-longer-exstant namespace.
|
||||
*
|
||||
* As a further side-effect, if the select namespace is a temporary namespace,
|
||||
* we mark the RangeVar as RELPERSISTENCE_TEMP.
|
||||
*/
|
||||
Oid
|
||||
RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
|
||||
RangeVarGetAndCheckCreationNamespace(RangeVar *relation,
|
||||
LOCKMODE lockmode,
|
||||
Oid *existing_relation_id)
|
||||
{
|
||||
Oid namespaceId;
|
||||
|
||||
namespaceId = RangeVarGetCreationNamespace(newRelation);
|
||||
uint64 inval_count;
|
||||
Oid relid;
|
||||
Oid oldrelid = InvalidOid;
|
||||
Oid nspid;
|
||||
Oid oldnspid = InvalidOid;
|
||||
bool retry = false;
|
||||
|
||||
/*
|
||||
* Check we have permission to create there. Skip check if bootstrapping,
|
||||
* since permissions machinery may not be working yet.
|
||||
* We check the catalog name and then ignore it.
|
||||
*/
|
||||
if (!IsBootstrapProcessingMode())
|
||||
if (relation->catalogname)
|
||||
{
|
||||
if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
|
||||
relation->catalogname, relation->schemaname,
|
||||
relation->relname)));
|
||||
}
|
||||
|
||||
/*
|
||||
* As in RangeVarGetRelidExtended(), we guard against concurrent DDL
|
||||
* operations by tracking whether any invalidation messages are processed
|
||||
* while we're doing the name lookups and acquiring locks. See comments
|
||||
* in that function for a more detailed explanation of this logic.
|
||||
*/
|
||||
for (;;)
|
||||
{
|
||||
AclResult aclresult;
|
||||
|
||||
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
|
||||
ACL_CREATE);
|
||||
inval_count = SharedInvalidMessageCounter;
|
||||
|
||||
/* Look up creation namespace and check for existing relation. */
|
||||
nspid = RangeVarGetCreationNamespace(relation);
|
||||
Assert(OidIsValid(nspid));
|
||||
if (existing_relation_id != NULL)
|
||||
relid = get_relname_relid(relation->relname, nspid);
|
||||
else
|
||||
relid = InvalidOid;
|
||||
|
||||
/*
|
||||
* In bootstrap processing mode, we don't bother with permissions
|
||||
* or locking. Permissions might not be working yet, and locking is
|
||||
* unnecessary.
|
||||
*/
|
||||
if (IsBootstrapProcessingMode())
|
||||
break;
|
||||
|
||||
/* Check namespace permissions. */
|
||||
aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
|
||||
get_namespace_name(namespaceId));
|
||||
get_namespace_name(nspid));
|
||||
|
||||
if (retry)
|
||||
{
|
||||
/* If nothing changed, we're done. */
|
||||
if (relid == oldrelid && nspid == oldnspid)
|
||||
break;
|
||||
/* If creation namespace has changed, give up old lock. */
|
||||
if (nspid != oldnspid)
|
||||
UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0,
|
||||
AccessShareLock);
|
||||
/* If name points to something different, give up old lock. */
|
||||
if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock)
|
||||
UnlockRelationOid(oldrelid, lockmode);
|
||||
}
|
||||
|
||||
/* Lock namespace. */
|
||||
if (nspid != oldnspid)
|
||||
LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock);
|
||||
|
||||
/* Lock relation, if required if and we have permission. */
|
||||
if (lockmode != NoLock && OidIsValid(relid))
|
||||
{
|
||||
if (!pg_class_ownercheck(relid, GetUserId()))
|
||||
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
|
||||
relation->relname);
|
||||
if (relid != oldrelid)
|
||||
LockRelationOid(relid, lockmode);
|
||||
}
|
||||
|
||||
/* If no invalidation message were processed, we're done! */
|
||||
if (inval_count == SharedInvalidMessageCounter)
|
||||
break;
|
||||
|
||||
/* Something may have changed, so recheck our work. */
|
||||
retry = true;
|
||||
oldrelid = relid;
|
||||
oldnspid = nspid;
|
||||
}
|
||||
|
||||
return namespaceId;
|
||||
RangeVarAdjustRelationPersistence(relation, nspid);
|
||||
if (existing_relation_id != NULL)
|
||||
*existing_relation_id = relid;
|
||||
return nspid;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -451,10 +451,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
|
||||
|
||||
/*
|
||||
* Look up the namespace in which we are supposed to create the relation,
|
||||
* and check we have permission to create there.
|
||||
* check we have permission to create there, lock it against concurrent
|
||||
* drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary
|
||||
* namespace is selected.
|
||||
*/
|
||||
namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
|
||||
RangeVarAdjustRelationPersistence(stmt->relation, namespaceId);
|
||||
namespaceId =
|
||||
RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL);
|
||||
|
||||
/*
|
||||
* Security check: disallow creating temp tables from security-restricted
|
||||
@ -9417,6 +9419,7 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
|
||||
Oid oldNspOid;
|
||||
Oid nspOid;
|
||||
Relation classRel;
|
||||
RangeVar *newrv;
|
||||
|
||||
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
|
||||
false, false,
|
||||
@ -9441,8 +9444,9 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
|
||||
get_rel_name(tableId))));
|
||||
}
|
||||
|
||||
/* get schema OID and check its permissions */
|
||||
nspOid = LookupCreationNamespace(stmt->newschema);
|
||||
/* Get and lock schema OID and check its permissions. */
|
||||
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
|
||||
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
|
||||
|
||||
/* common checks on switching namespaces */
|
||||
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
|
||||
|
@ -2005,7 +2005,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist)
|
||||
* check is here mainly to get a better error message about a "type"
|
||||
* instead of below about a "relation".
|
||||
*/
|
||||
typeNamespace = RangeVarGetCreationNamespace(createStmt->relation);
|
||||
typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation,
|
||||
NoLock, NULL);
|
||||
RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace);
|
||||
old_type_oid =
|
||||
GetSysCacheOid2(TYPENAMENSP,
|
||||
|
@ -98,10 +98,12 @@ isViewOnTempTable_walker(Node *node, void *context)
|
||||
*---------------------------------------------------------------------
|
||||
*/
|
||||
static Oid
|
||||
DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
|
||||
Oid namespaceId, List *options)
|
||||
DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace,
|
||||
List *options)
|
||||
{
|
||||
Oid viewOid;
|
||||
Oid namespaceId;
|
||||
LOCKMODE lockmode;
|
||||
CreateStmt *createStmt = makeNode(CreateStmt);
|
||||
List *attrList;
|
||||
ListCell *t;
|
||||
@ -159,9 +161,14 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
|
||||
errmsg("view must have at least one column")));
|
||||
|
||||
/*
|
||||
* Check to see if we want to replace an existing view.
|
||||
* Look up, check permissions on, and lock the creation namespace; also
|
||||
* check for a preexisting view with the same name. This will also set
|
||||
* relation->relpersistence to RELPERSISTENCE_TEMP if the selected
|
||||
* namespace is temporary.
|
||||
*/
|
||||
viewOid = get_relname_relid(relation->relname, namespaceId);
|
||||
lockmode = replace ? AccessExclusiveLock : NoLock;
|
||||
namespaceId =
|
||||
RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid);
|
||||
|
||||
if (OidIsValid(viewOid) && replace)
|
||||
{
|
||||
@ -170,24 +177,16 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
|
||||
List *atcmds = NIL;
|
||||
AlterTableCmd *atcmd;
|
||||
|
||||
/*
|
||||
* Yes. Get exclusive lock on the existing view ...
|
||||
*/
|
||||
rel = relation_open(viewOid, AccessExclusiveLock);
|
||||
/* Relation is already locked, but we must build a relcache entry. */
|
||||
rel = relation_open(viewOid, NoLock);
|
||||
|
||||
/*
|
||||
* Make sure it *is* a view, and do permissions checks.
|
||||
*/
|
||||
/* Make sure it *is* a view. */
|
||||
if (rel->rd_rel->relkind != RELKIND_VIEW)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is not a view",
|
||||
RelationGetRelationName(rel))));
|
||||
|
||||
if (!pg_class_ownercheck(viewOid, GetUserId()))
|
||||
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
|
||||
RelationGetRelationName(rel));
|
||||
|
||||
/* Also check it's not in use already */
|
||||
CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
|
||||
|
||||
@ -428,7 +427,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
|
||||
{
|
||||
Query *viewParse;
|
||||
Oid viewOid;
|
||||
Oid namespaceId;
|
||||
RangeVar *view;
|
||||
|
||||
/*
|
||||
@ -514,10 +512,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
|
||||
view->relname)));
|
||||
}
|
||||
|
||||
/* Might also need to make it temporary if placed in temp schema. */
|
||||
namespaceId = RangeVarGetCreationNamespace(view);
|
||||
RangeVarAdjustRelationPersistence(view, namespaceId);
|
||||
|
||||
/*
|
||||
* Create the view relation
|
||||
*
|
||||
@ -525,7 +519,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
|
||||
* aborted.
|
||||
*/
|
||||
viewOid = DefineVirtualRelation(view, viewParse->targetList,
|
||||
stmt->replace, namespaceId, stmt->options);
|
||||
stmt->replace, stmt->options);
|
||||
|
||||
/*
|
||||
* The relation we have just created is not visible to any other commands
|
||||
|
@ -2532,11 +2532,13 @@ OpenIntoRel(QueryDesc *queryDesc)
|
||||
}
|
||||
|
||||
/*
|
||||
* Find namespace to create in, check its permissions
|
||||
* Find namespace to create in, check its permissions, lock it against
|
||||
* concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the
|
||||
* selected namespace is temporary.
|
||||
*/
|
||||
intoName = into->rel->relname;
|
||||
namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
|
||||
RangeVarAdjustRelationPersistence(into->rel, namespaceId);
|
||||
namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock,
|
||||
NULL);
|
||||
|
||||
/*
|
||||
* Security check: disallow creating temp tables from security-restricted
|
||||
|
@ -146,6 +146,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
|
||||
List *save_alist;
|
||||
ListCell *elements;
|
||||
Oid namespaceid;
|
||||
Oid existing_relid;
|
||||
|
||||
/*
|
||||
* We must not scribble on the passed-in CreateStmt, so copy it. (This is
|
||||
@ -155,30 +156,25 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
|
||||
|
||||
/*
|
||||
* Look up the creation namespace. This also checks permissions on the
|
||||
* target namespace, so that we throw any permissions error as early as
|
||||
* possible.
|
||||
* target namespace, locks it against concurrent drops, checks for a
|
||||
* preexisting relation in that namespace with the same name, and updates
|
||||
* stmt->relation->relpersistence if the select namespace is temporary.
|
||||
*/
|
||||
namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
|
||||
RangeVarAdjustRelationPersistence(stmt->relation, namespaceid);
|
||||
namespaceid =
|
||||
RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock,
|
||||
&existing_relid);
|
||||
|
||||
/*
|
||||
* If the relation already exists and the user specified "IF NOT EXISTS",
|
||||
* bail out with a NOTICE.
|
||||
*/
|
||||
if (stmt->if_not_exists)
|
||||
if (stmt->if_not_exists && OidIsValid(existing_relid))
|
||||
{
|
||||
Oid existing_relid;
|
||||
|
||||
existing_relid = get_relname_relid(stmt->relation->relname,
|
||||
namespaceid);
|
||||
if (existing_relid != InvalidOid)
|
||||
{
|
||||
ereport(NOTICE,
|
||||
(errcode(ERRCODE_DUPLICATE_TABLE),
|
||||
errmsg("relation \"%s\" already exists, skipping",
|
||||
stmt->relation->relname)));
|
||||
return NIL;
|
||||
}
|
||||
ereport(NOTICE,
|
||||
(errcode(ERRCODE_DUPLICATE_TABLE),
|
||||
errmsg("relation \"%s\" already exists, skipping",
|
||||
stmt->relation->relname)));
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -58,7 +58,9 @@ extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
|
||||
RangeVarGetRelidCallback callback,
|
||||
void *callback_arg);
|
||||
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
|
||||
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
|
||||
extern Oid RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation,
|
||||
LOCKMODE lockmode,
|
||||
Oid *existing_relation_id);
|
||||
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
|
||||
extern Oid RelnameGetRelid(const char *relname);
|
||||
extern bool RelationIsVisible(Oid relid);
|
||||
|
Loading…
Reference in New Issue
Block a user