COPY and pg_dump failed to cope with zero-column tables. Fix 'em.

This commit is contained in:
Tom Lane 2003-04-25 02:28:22 +00:00
parent db7e46a76d
commit 6687650ce6
2 changed files with 75 additions and 53 deletions

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.196 2003/04/24 21:16:42 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.197 2003/04/25 02:28:22 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -808,10 +808,11 @@ CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,
* Get info about the columns we need to process. * Get info about the columns we need to process.
* *
* For binary copy we really only need isvarlena, but compute it all... * For binary copy we really only need isvarlena, but compute it all...
* +1's here are to avoid palloc(0) in a zero-column table.
*/ */
out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); out_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo));
elements = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid));
isvarlena = (bool *) palloc(num_phys_attrs * sizeof(bool)); isvarlena = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool));
foreach(cur, attnumlist) foreach(cur, attnumlist)
{ {
int attnum = lfirsti(cur); int attnum = lfirsti(cur);
@ -1078,12 +1079,13 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
* relation, including the input function, the element type (to pass * relation, including the input function, the element type (to pass
* to the input function), and info about defaults and constraints. * to the input function), and info about defaults and constraints.
* (We don't actually use the input function if it's a binary copy.) * (We don't actually use the input function if it's a binary copy.)
* +1's here are to avoid palloc(0) in a zero-column table.
*/ */
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); in_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo));
elements = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defmap = (int *) palloc((num_phys_attrs + 1) * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); defexprs = (ExprState **) palloc((num_phys_attrs + 1) * sizeof(ExprState *));
constraintexprs = (ExprState **) palloc0(num_phys_attrs * sizeof(ExprState *)); constraintexprs = (ExprState **) palloc0((num_phys_attrs + 1) * sizeof(ExprState *));
for (i = 0; i < num_phys_attrs; i++) for (i = 0; i < num_phys_attrs; i++)
{ {
@ -1196,8 +1198,8 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
} }
} }
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); values = (Datum *) palloc((num_phys_attrs + 1) * sizeof(Datum));
nulls = (char *) palloc(num_phys_attrs * sizeof(char)); nulls = (char *) palloc((num_phys_attrs + 1) * sizeof(char));
/* Make room for a PARAM_EXEC value for domain constraint checks */ /* Make room for a PARAM_EXEC value for domain constraint checks */
if (hasConstraints) if (hasConstraints)
@ -1304,9 +1306,31 @@ CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,
if (done) if (done)
break; /* out of per-row loop */ break; /* out of per-row loop */
/* Complain if there are more fields on the input line */ /*
* Complain if there are more fields on the input line.
*
* Special case: if we're reading a zero-column table, we
* won't yet have called CopyReadAttribute() at all; so do that
* and check we have an empty line. Fortunately we can keep that
* silly corner case out of the main line of execution.
*/
if (result == NORMAL_ATTR) if (result == NORMAL_ATTR)
elog(ERROR, "Extra data after last expected column"); {
if (attnumlist == NIL && !file_has_oids)
{
string = CopyReadAttribute(delim, &result);
if (result == NORMAL_ATTR || *string != '\0')
elog(ERROR, "Extra data after last expected column");
if (result == END_OF_FILE)
{
/* EOF at start of line: all is well */
done = true;
break;
}
}
else
elog(ERROR, "Extra data after last expected column");
}
/* /*
* If we got some data on the line, but it was ended by EOF, * If we got some data on the line, but it was ended by EOF,

View File

@ -12,7 +12,7 @@
* by PostgreSQL * by PostgreSQL
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.326 2003/04/04 20:42:12 momjian Exp $ * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.327 2003/04/25 02:28:22 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -838,9 +838,6 @@ dumpClasses_nodumpData(Archive *fout, char *oid, void *dctxv)
* possibility of retrieving data in the wrong column order. (The * possibility of retrieving data in the wrong column order. (The
* default column ordering of COPY will not be what we want in certain * default column ordering of COPY will not be what we want in certain
* corner cases involving ADD COLUMN and inheritance.) * corner cases involving ADD COLUMN and inheritance.)
*
* NB: caller should have already determined that there are dumpable
* columns, so that fmtCopyColumnList will return something.
*/ */
if (g_fout->remoteVersion >= 70300) if (g_fout->remoteVersion >= 70300)
column_list = fmtCopyColumnList(tbinfo); column_list = fmtCopyColumnList(tbinfo);
@ -975,6 +972,7 @@ dumpClasses_dumpData(Archive *fout, char *oid, void *dctxv)
PQExpBuffer q = createPQExpBuffer(); PQExpBuffer q = createPQExpBuffer();
PGresult *res; PGresult *res;
int tuple; int tuple;
int nfields;
int field; int field;
/* /*
@ -1024,14 +1022,21 @@ dumpClasses_dumpData(Archive *fout, char *oid, void *dctxv)
exit_nicely(); exit_nicely();
} }
nfields = PQnfields(res);
for (tuple = 0; tuple < PQntuples(res); tuple++) for (tuple = 0; tuple < PQntuples(res); tuple++)
{ {
archprintf(fout, "INSERT INTO %s ", fmtId(classname)); archprintf(fout, "INSERT INTO %s ", fmtId(classname));
if (nfields == 0)
{
/* corner case for zero-column table */
archprintf(fout, "DEFAULT VALUES;\n");
continue;
}
if (attrNames == true) if (attrNames == true)
{ {
resetPQExpBuffer(q); resetPQExpBuffer(q);
appendPQExpBuffer(q, "("); appendPQExpBuffer(q, "(");
for (field = 0; field < PQnfields(res); field++) for (field = 0; field < nfields; field++)
{ {
if (field > 0) if (field > 0)
appendPQExpBuffer(q, ", "); appendPQExpBuffer(q, ", ");
@ -1041,7 +1046,7 @@ dumpClasses_dumpData(Archive *fout, char *oid, void *dctxv)
archprintf(fout, "%s", q->data); archprintf(fout, "%s", q->data);
} }
archprintf(fout, "VALUES ("); archprintf(fout, "VALUES (");
for (field = 0; field < PQnfields(res); field++) for (field = 0; field < nfields; field++)
{ {
if (field > 0) if (field > 0)
archprintf(fout, ", "); archprintf(fout, ", ");
@ -1154,45 +1159,38 @@ dumpClasses(const TableInfo *tblinfo, const int numTables, Archive *fout,
if (tblinfo[i].dump) if (tblinfo[i].dump)
{ {
const char *column_list;
if (g_verbose) if (g_verbose)
write_msg(NULL, "preparing to dump the contents of table %s\n", write_msg(NULL, "preparing to dump the contents of table %s\n",
classname); classname);
/* Get column list first to check for zero-column table */ dumpCtx = (DumpContext *) malloc(sizeof(DumpContext));
column_list = fmtCopyColumnList(&(tblinfo[i])); dumpCtx->tblinfo = (TableInfo *) tblinfo;
if (column_list) dumpCtx->tblidx = i;
dumpCtx->oids = oids;
if (!dumpData)
{ {
dumpCtx = (DumpContext *) malloc(sizeof(DumpContext)); /* Dump/restore using COPY */
dumpCtx->tblinfo = (TableInfo *) tblinfo; dumpFn = dumpClasses_nodumpData;
dumpCtx->tblidx = i; resetPQExpBuffer(copyBuf);
dumpCtx->oids = oids; appendPQExpBuffer(copyBuf, "COPY %s %s %sFROM stdin;\n",
fmtId(tblinfo[i].relname),
if (!dumpData) fmtCopyColumnList(&(tblinfo[i])),
{
/* Dump/restore using COPY */
dumpFn = dumpClasses_nodumpData;
resetPQExpBuffer(copyBuf);
appendPQExpBuffer(copyBuf, "COPY %s %s %sFROM stdin;\n",
fmtId(tblinfo[i].relname),
column_list,
(oids && tblinfo[i].hasoids) ? "WITH OIDS " : ""); (oids && tblinfo[i].hasoids) ? "WITH OIDS " : "");
copyStmt = copyBuf->data; copyStmt = copyBuf->data;
}
else
{
/* Restore using INSERT */
dumpFn = dumpClasses_dumpData;
copyStmt = NULL;
}
ArchiveEntry(fout, tblinfo[i].oid, tblinfo[i].relname,
tblinfo[i].relnamespace->nspname,
tblinfo[i].usename,
"TABLE DATA", NULL, "", "", copyStmt,
dumpFn, dumpCtx);
} }
else
{
/* Restore using INSERT */
dumpFn = dumpClasses_dumpData;
copyStmt = NULL;
}
ArchiveEntry(fout, tblinfo[i].oid, tblinfo[i].relname,
tblinfo[i].relnamespace->nspname,
tblinfo[i].usename,
"TABLE DATA", NULL, "", "", copyStmt,
dumpFn, dumpCtx);
} }
} }
@ -6980,7 +6978,7 @@ fmtQualifiedId(const char *schema, const char *id)
* Return a column list clause for the given relation. * Return a column list clause for the given relation.
* *
* Special case: if there are no undropped columns in the relation, return * Special case: if there are no undropped columns in the relation, return
* NULL, not an invalid "()" column list. * "", not an invalid "()" column list.
*/ */
static const char * static const char *
fmtCopyColumnList(const TableInfo *ti) fmtCopyColumnList(const TableInfo *ti)
@ -7010,7 +7008,7 @@ fmtCopyColumnList(const TableInfo *ti)
} }
if (!needComma) if (!needComma)
return NULL; /* no undropped columns */ return ""; /* no undropped columns */
appendPQExpBuffer(q, ")"); appendPQExpBuffer(q, ")");
return q->data; return q->data;