Fix two issues with HEADER MATCH in COPY

072132f0 used the attnum offset to access the raw_fields array when
checking that the attribute names of the header and of the relation
match, leading to incorrect results or even crashes if the attribute
numbers of a relation are changed, like on a dropped attribute.  This
fixes the logic to use the correct attribute names for the header
matching requirements.

Also, this commit disallows HEADER MATCH in COPY TO as there is no
validation that can be done in this case.

The tests are expanded for HEADER MATCH with COPY FROM and dropped
columns, with cases where a relation has a dropped and re-added column,
as well as a reduced set of columns.

Author: Julien Rouhaud
Reviewed-by: Peter Eisentraut, Michael Paquier
Discussion: https://postgr.es/m/20220607154744.vvmitnqhyxrne5ms@jrouhaud
This commit is contained in:
Michael Paquier 2022-06-23 10:49:20 +09:00
parent eba331ae2a
commit ca7a0d1d36
5 changed files with 97 additions and 8 deletions

View File

@ -282,6 +282,8 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
of the columns in the header line must match the actual column names of
the table, otherwise an error is raised.
This option is not allowed when using <literal>binary</literal> format.
The <literal>MATCH</literal> option is only valid for <command>COPY
FROM</command> commands.
</para>
</listitem>
</varlistentry>

View File

@ -318,7 +318,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
* defGetBoolean() but also accepts the special value "match".
*/
static CopyHeaderChoice
defGetCopyHeaderChoice(DefElem *def)
defGetCopyHeaderChoice(DefElem *def, bool is_from)
{
/*
* If no parameter given, assume "true" is meant.
@ -360,7 +360,14 @@ defGetCopyHeaderChoice(DefElem *def)
if (pg_strcasecmp(sval, "off") == 0)
return COPY_HEADER_FALSE;
if (pg_strcasecmp(sval, "match") == 0)
{
if (!is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use \"%s\" with HEADER in COPY TO",
sval)));
return COPY_HEADER_MATCH;
}
}
break;
}
@ -452,7 +459,7 @@ ProcessCopyOptions(ParseState *pstate,
if (header_specified)
errorConflictingDefElem(defel, pstate);
header_specified = true;
opts_out->header_line = defGetCopyHeaderChoice(defel);
opts_out->header_line = defGetCopyHeaderChoice(defel, is_from);
}
else if (strcmp(defel->defname, "quote") == 0)
{

View File

@ -789,11 +789,12 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colName = cstate->raw_fields[attnum - 1];
char *colName;
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
fldnum++;
Assert(fldnum < cstate->max_fields);
colName = cstate->raw_fields[fldnum++];
if (colName == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),

View File

@ -182,9 +182,21 @@ create table header_copytest (
b int,
c text
);
-- Make sure it works with with dropped columns
alter table header_copytest drop column c;
alter table header_copytest add column c text;
copy header_copytest to stdout with (header match);
ERROR: cannot use "match" with HEADER in COPY TO
copy header_copytest from stdin with (header wrong_choice);
ERROR: header requires a Boolean value or "match"
-- works
copy header_copytest from stdin with (header match);
copy header_copytest (c, a, b) from stdin with (header match);
copy header_copytest from stdin with (header match, format csv);
-- errors
copy header_copytest (c, b, a) from stdin with (header match);
ERROR: column name mismatch in header line field 1: got "a", expected "c"
CONTEXT: COPY header_copytest, line 1: "a b c"
copy header_copytest from stdin with (header match);
ERROR: column name mismatch in header line field 3: got null value ("\N"), expected "c"
CONTEXT: COPY header_copytest, line 1: "a b \N"
@ -197,5 +209,34 @@ CONTEXT: COPY header_copytest, line 1: "a b c d"
copy header_copytest from stdin with (header match);
ERROR: column name mismatch in header line field 3: got "d", expected "c"
CONTEXT: COPY header_copytest, line 1: "a b d"
copy header_copytest from stdin with (header match, format csv);
SELECT * FROM header_copytest ORDER BY a;
a | b | c
---+---+-----
1 | 2 | foo
3 | 4 | bar
5 | 6 | baz
(3 rows)
-- Drop an extra column, in the middle of the existing set.
alter table header_copytest drop column b;
-- works
copy header_copytest (c, a) from stdin with (header match);
copy header_copytest (a, c) from stdin with (header match);
-- errors
copy header_copytest from stdin with (header match);
ERROR: wrong number of fields in header line: field count is 3, expected 2
CONTEXT: COPY header_copytest, line 1: "a ........pg.dropped.2........ c"
copy header_copytest (a, c) from stdin with (header match);
ERROR: wrong number of fields in header line: field count is 3, expected 2
CONTEXT: COPY header_copytest, line 1: "a c b"
SELECT * FROM header_copytest ORDER BY a;
a | c
---+-----
1 | foo
3 | bar
5 | baz
7 | foo
8 | foo
(5 rows)
drop table header_copytest;

View File

@ -204,11 +204,29 @@ create table header_copytest (
b int,
c text
);
-- Make sure it works with with dropped columns
alter table header_copytest drop column c;
alter table header_copytest add column c text;
copy header_copytest to stdout with (header match);
copy header_copytest from stdin with (header wrong_choice);
-- works
copy header_copytest from stdin with (header match);
a b c
1 2 foo
\.
copy header_copytest (c, a, b) from stdin with (header match);
c a b
bar 3 4
\.
copy header_copytest from stdin with (header match, format csv);
a,b,c
5,6,baz
\.
-- errors
copy header_copytest (c, b, a) from stdin with (header match);
a b c
1 2 foo
\.
copy header_copytest from stdin with (header match);
a b \N
1 2 foo
@ -225,8 +243,28 @@ copy header_copytest from stdin with (header match);
a b d
1 2 foo
\.
copy header_copytest from stdin with (header match, format csv);
a,b,c
1,2,foo
SELECT * FROM header_copytest ORDER BY a;
-- Drop an extra column, in the middle of the existing set.
alter table header_copytest drop column b;
-- works
copy header_copytest (c, a) from stdin with (header match);
c a
foo 7
\.
copy header_copytest (a, c) from stdin with (header match);
a c
8 foo
\.
-- errors
copy header_copytest from stdin with (header match);
a ........pg.dropped.2........ c
1 2 foo
\.
copy header_copytest (a, c) from stdin with (header match);
a c b
1 foo 2
\.
SELECT * FROM header_copytest ORDER BY a;
drop table header_copytest;