Refactor broken CREATE TABLE IF NOT EXISTS support.

Per bug #5988, reported by Marko Tiikkaja, and further analyzed by Tom
Lane, the previous coding was broken in several respects: even if the
target table already existed, a subsequent CREATE TABLE IF NOT EXISTS
might try to add additional constraints or sequences-for-serial
specified in the new CREATE TABLE statement.

In passing, this also fixes a minor information leak: it's no longer
possible to figure out whether a schema to which you don't have CREATE
access contains a sequence named like "x_y_seq" by attempting to create a
table in that schema called "x" with a serial column called "y".

Some more refactoring of this code in the future might be warranted,
but that will need to wait for a later major release.
This commit is contained in:
Robert Haas 2011-04-25 16:55:11 -04:00
parent be90032e0d
commit 68ef051f5c
11 changed files with 73 additions and 79 deletions

View File

@ -247,8 +247,7 @@ Boot_CreateStmt:
ONCOMMIT_NOOP, ONCOMMIT_NOOP,
(Datum) 0, (Datum) 0,
false, false,
true, true);
false);
elog(DEBUG4, "relation created with oid %u", id); elog(DEBUG4, "relation created with oid %u", id);
} }
do_end(); do_end();

View File

@ -973,8 +973,7 @@ heap_create_with_catalog(const char *relname,
OnCommitAction oncommit, OnCommitAction oncommit,
Datum reloptions, Datum reloptions,
bool use_user_acl, bool use_user_acl,
bool allow_system_table_mods, bool allow_system_table_mods)
bool if_not_exists)
{ {
Relation pg_class_desc; Relation pg_class_desc;
Relation new_rel_desc; Relation new_rel_desc;
@ -994,26 +993,14 @@ heap_create_with_catalog(const char *relname,
CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods); CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
/* /*
* If the relation already exists, it's an error, unless the user * This would fail later on anyway, if the relation already exists. But
* specifies "IF NOT EXISTS". In that case, we just print a notice and do * by catching it here we can emit a nicer error message.
* nothing further.
*/ */
existing_relid = get_relname_relid(relname, relnamespace); existing_relid = get_relname_relid(relname, relnamespace);
if (existing_relid != InvalidOid) if (existing_relid != InvalidOid)
{
if (if_not_exists)
{
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
relname)));
heap_close(pg_class_desc, RowExclusiveLock);
return InvalidOid;
}
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE), (errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists", relname))); errmsg("relation \"%s\" already exists", relname)));
}
/* /*
* Since we are going to create a rowtype as well, also check for * Since we are going to create a rowtype as well, also check for

View File

@ -360,6 +360,35 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
return namespaceId; return namespaceId;
} }
/*
* RangeVarGetAndCheckCreationNamespace
* As RangeVarGetCreationNamespace, but with a permissions check.
*/
Oid
RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
{
Oid namespaceId;
namespaceId = RangeVarGetCreationNamespace(newRelation);
/*
* Check we have permission to create there. Skip check if bootstrapping,
* since permissions machinery may not be working yet.
*/
if (!IsBootstrapProcessingMode())
{
AclResult aclresult;
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceId));
}
return namespaceId;
}
/* /*
* RelnameGetRelid * RelnameGetRelid
* Try to resolve an unqualified relation name. * Try to resolve an unqualified relation name.

View File

@ -227,8 +227,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
ONCOMMIT_NOOP, ONCOMMIT_NOOP,
reloptions, reloptions,
false, false,
true, true);
false);
Assert(toast_relid != InvalidOid); Assert(toast_relid != InvalidOid);
/* make the toast relation visible, else heap_open will fail */ /* make the toast relation visible, else heap_open will fail */

View File

@ -646,8 +646,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
ONCOMMIT_NOOP, ONCOMMIT_NOOP,
reloptions, reloptions,
false, false,
true, true);
false);
Assert(OIDNewHeap != InvalidOid); Assert(OIDNewHeap != InvalidOid);
ReleaseSysCache(tuple); ReleaseSysCache(tuple);

View File

@ -439,22 +439,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
errmsg("cannot create temporary table within security-restricted operation"))); errmsg("cannot create temporary table within security-restricted operation")));
/* /*
* Look up the namespace in which we are supposed to create the relation. * Look up the namespace in which we are supposed to create the relation,
* Check we have permission to create there. Skip check if bootstrapping, * and check we have permission to create there.
* since permissions machinery may not be working yet.
*/ */
namespaceId = RangeVarGetCreationNamespace(stmt->relation); namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
if (!IsBootstrapProcessingMode())
{
AclResult aclresult;
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceId));
}
/* /*
* Select tablespace to use. If not specified, use default tablespace * Select tablespace to use. If not specified, use default tablespace
@ -602,16 +590,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
stmt->oncommit, stmt->oncommit,
reloptions, reloptions,
true, true,
allowSystemTableMods, allowSystemTableMods);
stmt->if_not_exists);
/*
* If heap_create_with_catalog returns InvalidOid, it means that the user
* specified "IF NOT EXISTS" and the relation already exists. In that
* case we do nothing further.
*/
if (relationId == InvalidOid)
return InvalidOid;
/* Store inheritance information for new rel. */ /* Store inheritance information for new rel. */
StoreCatalogInheritance(relationId, inheritOids); StoreCatalogInheritance(relationId, inheritOids);

View File

@ -2341,7 +2341,6 @@ OpenIntoRel(QueryDesc *queryDesc)
Oid namespaceId; Oid namespaceId;
Oid tablespaceId; Oid tablespaceId;
Datum reloptions; Datum reloptions;
AclResult aclresult;
Oid intoRelationId; Oid intoRelationId;
TupleDesc tupdesc; TupleDesc tupdesc;
DR_intorel *myState; DR_intorel *myState;
@ -2378,13 +2377,7 @@ OpenIntoRel(QueryDesc *queryDesc)
* Find namespace to create in, check its permissions * Find namespace to create in, check its permissions
*/ */
intoName = into->rel->relname; intoName = into->rel->relname;
namespaceId = RangeVarGetCreationNamespace(into->rel); namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceId));
/* /*
* Select tablespace to use. If not specified, use default tablespace * Select tablespace to use. If not specified, use default tablespace
@ -2444,8 +2437,7 @@ OpenIntoRel(QueryDesc *queryDesc)
into->onCommit, into->onCommit,
reloptions, reloptions,
true, true,
allowSystemTableMods, allowSystemTableMods);
false);
Assert(intoRelationId != InvalidOid); Assert(intoRelationId != InvalidOid);
FreeTupleDesc(tupdesc); FreeTupleDesc(tupdesc);

View File

@ -148,6 +148,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
List *result; List *result;
List *save_alist; List *save_alist;
ListCell *elements; ListCell *elements;
Oid namespaceid;
/* /*
* We must not scribble on the passed-in CreateStmt, so copy it. (This is * We must not scribble on the passed-in CreateStmt, so copy it. (This is
@ -155,6 +156,33 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
*/ */
stmt = (CreateStmt *) copyObject(stmt); stmt = (CreateStmt *) copyObject(stmt);
/*
* Look up the creation namespace. This also checks permissions on the
* target namespace, so that we throw any permissions error as early as
* possible.
*/
namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
/*
* If the relation already exists and the user specified "IF NOT EXISTS",
* bail out with a NOTICE.
*/
if (stmt->if_not_exists)
{
Oid existing_relid;
existing_relid = get_relname_relid(stmt->relation->relname,
namespaceid);
if (existing_relid != InvalidOid)
{
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
stmt->relation->relname)));
return NIL;
}
}
/* /*
* If the target relation name isn't schema-qualified, make it so. This * If the target relation name isn't schema-qualified, make it so. This
* prevents some corner cases in which added-on rewritten commands might * prevents some corner cases in which added-on rewritten commands might
@ -164,11 +192,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
*/ */
if (stmt->relation->schemaname == NULL if (stmt->relation->schemaname == NULL
&& stmt->relation->relpersistence != RELPERSISTENCE_TEMP) && stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
{
Oid namespaceid = RangeVarGetCreationNamespace(stmt->relation);
stmt->relation->schemaname = get_namespace_name(namespaceid); stmt->relation->schemaname = get_namespace_name(namespaceid);
}
/* Set up pstate and CreateStmtContext */ /* Set up pstate and CreateStmtContext */
pstate = make_parsestate(NULL); pstate = make_parsestate(NULL);

View File

@ -529,13 +529,6 @@ standard_ProcessUtility(Node *parsetree,
RELKIND_RELATION, RELKIND_RELATION,
InvalidOid); InvalidOid);
/*
* If "IF NOT EXISTS" was specified and the relation
* already exists, do nothing further.
*/
if (relOid == InvalidOid)
continue;
/* /*
* Let AlterTableCreateToastTable decide if this one * Let AlterTableCreateToastTable decide if this one
* needs a secondary relation too. * needs a secondary relation too.
@ -559,15 +552,8 @@ standard_ProcessUtility(Node *parsetree,
relOid = DefineRelation((CreateStmt *) stmt, relOid = DefineRelation((CreateStmt *) stmt,
RELKIND_FOREIGN_TABLE, RELKIND_FOREIGN_TABLE,
InvalidOid); InvalidOid);
CreateForeignTable((CreateForeignTableStmt *) stmt,
/* relOid);
* Unless "IF NOT EXISTS" was specified and the
* relation already exists, create the
* pg_foreign_table entry.
*/
if (relOid != InvalidOid)
CreateForeignTable((CreateForeignTableStmt *) stmt,
relOid);
} }
else else
{ {

View File

@ -63,8 +63,7 @@ extern Oid heap_create_with_catalog(const char *relname,
OnCommitAction oncommit, OnCommitAction oncommit,
Datum reloptions, Datum reloptions,
bool use_user_acl, bool use_user_acl,
bool allow_system_table_mods, bool allow_system_table_mods);
bool if_not_exists);
extern void heap_drop_with_catalog(Oid relid); extern void heap_drop_with_catalog(Oid relid);

View File

@ -49,6 +49,7 @@ typedef struct OverrideSearchPath
extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern Oid RelnameGetRelid(const char *relname); extern Oid RelnameGetRelid(const char *relname);
extern bool RelationIsVisible(Oid relid); extern bool RelationIsVisible(Oid relid);