Add REJECT_LIMIT option to the COPY command.

Previously, when ON_ERROR was set to 'ignore', the COPY command
would skip all rows with data type conversion errors, with no way to
limit the number of skipped rows before failing.

This commit introduces the REJECT_LIMIT option, allowing users to
specify the maximum number of erroneous rows that can be skipped.
If more rows encounter data type conversion errors than allowed by
REJECT_LIMIT, the COPY command will fail with an error, even when
ON_ERROR = 'ignore'.

Author: Atsushi Torikoshi
Reviewed-by: Junwang Zhao, Kirill Reshke, jian he, Fujii Masao
Discussion: https://postgr.es/m/63f99327aa6b404cc951217fa3e61fe4@oss.nttdata.com
This commit is contained in:
Fujii Masao 2024-10-08 18:19:58 +09:00
parent d759c1a0b8
commit 4ac2a9bece
6 changed files with 91 additions and 0 deletions

View File

@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
ON_ERROR <replaceable class="parameter">error_action</replaceable>
REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable>
ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
</synopsis>
@ -413,6 +414,24 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
<varlistentry>
<term><literal>REJECT_LIMIT</literal></term>
<listitem>
<para>
Specifies the maximum number of errors tolerated while converting a
column's input value to its data type, when <literal>ON_ERROR</literal> is
set to <literal>ignore</literal>.
If the input causes more errors than the specified value, the <command>COPY</command>
command fails, even with <literal>ON_ERROR</literal> set to <literal>ignore</literal>.
This clause must be used with <literal>ON_ERROR</literal>=<literal>ignore</literal>
and <replaceable class="parameter">maxerror</replaceable> must be positive <type>bigint</type>.
If not specified, <literal>ON_ERROR</literal>=<literal>ignore</literal>
allows an unlimited number of errors, meaning <command>COPY</command> will
skip all erroneous data.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>ENCODING</literal></term>
<listitem>

View File

@ -418,6 +418,23 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
return COPY_ON_ERROR_STOP; /* keep compiler quiet */
}
/*
* Extract REJECT_LIMIT value from a DefElem.
*/
static int64
defGetCopyRejectLimitOption(DefElem *def)
{
int64 reject_limit = defGetInt64(def);
if (reject_limit <= 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("REJECT_LIMIT (%lld) must be greater than zero",
(long long) reject_limit)));
return reject_limit;
}
/*
* Extract a CopyLogVerbosityChoice value from a DefElem.
*/
@ -472,6 +489,7 @@ ProcessCopyOptions(ParseState *pstate,
bool header_specified = false;
bool on_error_specified = false;
bool log_verbosity_specified = false;
bool reject_limit_specified = false;
ListCell *option;
/* Support external use for option sanity checking */
@ -638,6 +656,13 @@ ProcessCopyOptions(ParseState *pstate,
log_verbosity_specified = true;
opts_out->log_verbosity = defGetCopyLogVerbosityChoice(defel, pstate);
}
else if (strcmp(defel->defname, "reject_limit") == 0)
{
if (reject_limit_specified)
errorConflictingDefElem(defel, pstate);
reject_limit_specified = true;
opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -874,6 +899,14 @@ ProcessCopyOptions(ParseState *pstate,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("NULL specification and DEFAULT specification cannot be the same")));
}
/* Check on_error */
if (opts_out->reject_limit && !opts_out->on_error)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
/*- translator: first and second %s are the names of COPY option, e.g.
* ON_ERROR, third is the value of the COPY option, e.g. IGNORE */
errmsg("COPY %s requires %s to be set to %s",
"REJECT_LIMIT", "ON_ERROR", "IGNORE")));
}
/*

View File

@ -1018,6 +1018,13 @@ CopyFrom(CopyFromState cstate)
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
cstate->num_errors);
if (cstate->opts.reject_limit > 0 && \
cstate->num_errors > cstate->opts.reject_limit)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
(long long) cstate->opts.reject_limit)));
/* Repeat NextCopyFrom() until no soft error occurs */
continue;
}

View File

@ -85,6 +85,7 @@ typedef struct CopyFormatOptions
bool convert_selectively; /* do selective binary conversion? */
CopyOnErrorChoice on_error; /* what to do when error happened */
CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */
int64 reject_limit; /* maximum tolerable number of errors */
List *convert_select; /* list of column names (can be NIL) */
} CopyFormatOptions;

View File

@ -116,6 +116,10 @@ COPY x to stdout (log_verbosity unsupported);
ERROR: COPY LOG_VERBOSITY "unsupported" not recognized
LINE 1: COPY x to stdout (log_verbosity unsupported);
^
COPY x from stdin with (reject_limit 1);
ERROR: COPY REJECT_LIMIT requires ON_ERROR to be set to IGNORE
COPY x from stdin with (on_error ignore, reject_limit 0);
ERROR: REJECT_LIMIT (0) must be greater than zero
-- too many columns in column list: should fail
COPY x (a, b, c, d, e, d, c) from stdin;
ERROR: column "d" specified more than once
@ -791,6 +795,12 @@ CONTEXT: COPY check_ign_err, line 1: "1 {1}"
COPY check_ign_err FROM STDIN WITH (on_error ignore);
ERROR: extra data after last expected column
CONTEXT: COPY check_ign_err, line 1: "1 {1} 3 abc"
-- tests for reject_limit option
COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
ERROR: skipped more than REJECT_LIMIT (3) rows due to data type incompatibility
CONTEXT: COPY check_ign_err, line 5, column n: ""
COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
NOTICE: 4 rows were skipped due to data type incompatibility
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;

View File

@ -82,6 +82,8 @@ COPY x to stdout (format TEXT, force_null(a));
COPY x to stdin (format CSV, force_null(a));
COPY x to stdin (format BINARY, on_error unsupported);
COPY x to stdout (log_verbosity unsupported);
COPY x from stdin with (reject_limit 1);
COPY x from stdin with (on_error ignore, reject_limit 0);
-- too many columns in column list: should fail
COPY x (a, b, c, d, e, d, c) from stdin;
@ -561,6 +563,25 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore);
1 {1} 3 abc
\.
-- tests for reject_limit option
COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
6 {6} 6
a {7} 7
8 {8} 8888888888
9 {a, 9} 9
10 {10} 10
\.
COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
6 {6} 6
a {7} 7
8 {8} 8888888888
9 {a, 9} 9
10 {10} 10
\.
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;