From 8ddc05fb01ee2c423bf5613426726351e860d4b4 Mon Sep 17 00:00:00 2001 From: Itagaki Takahiro Date: Wed, 16 Feb 2011 11:19:11 +0900 Subject: [PATCH] Export the external file reader used in COPY FROM as APIs. They are expected to be used by extension modules like file_fdw. There are no user-visible changes. Itagaki Takahiro Reviewed and tested by Kevin Grittner and Noah Misch. --- src/backend/commands/copy.c | 1292 ++++++++++++++++++++--------------- src/include/commands/copy.h | 12 + 2 files changed, 771 insertions(+), 533 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3350ca0b6e..9f7263d59a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -93,13 +93,11 @@ typedef struct CopyStateData FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ - bool fe_copy; /* true for all FE copy dests */ bool fe_eof; /* true if detected end of copy data */ EolType eol_type; /* EOL type of input */ int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - uint64 processed; /* # of tuples processed */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -119,18 +117,35 @@ typedef struct CopyStateData bool *force_quote_flags; /* per-column CSV FQ flags */ bool *force_notnull_flags; /* per-column CSV FNN flags */ - /* these are just for error messages, see copy_in_error_callback */ + /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ + /* + * Working state for COPY TO/FROM + */ + MemoryContext copycontext; /* per-copy execution context */ + /* * Working state for COPY TO */ FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ + /* + * Working state for COPY FROM + */ + AttrNumber num_defaults; + bool file_has_oids; + FmgrInfo oid_in_function; + Oid oid_typioparam; + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + int *defmap; /* array of default att numbers */ + ExprState **defexprs; /* array of default att expressions */ + /* * These variables are used to reduce overhead in textual COPY FROM. * @@ -169,13 +184,12 @@ typedef struct CopyStateData int raw_buf_len; /* total # of bytes stored */ } CopyStateData; -typedef CopyStateData *CopyState; - /* DestReceiver for COPY (SELECT) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ + uint64 processed; /* # of tuples processed */ } DR_copy; @@ -248,11 +262,17 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static void DoCopyTo(CopyState cstate); -static void CopyTo(CopyState cstate); +static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query, + const char *queryString, List *attnamelist, List *options); +static void EndCopy(CopyState cstate); +static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, + const char *filename, List *attnamelist, List *options); +static void EndCopyTo(CopyState cstate); +static uint64 DoCopyTo(CopyState cstate); +static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); -static void CopyFrom(CopyState cstate); +static uint64 CopyFrom(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); @@ -700,18 +720,6 @@ CopyLoadRawBuf(CopyState cstate) * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * - * Iff , unload or reload in the binary format, as opposed to the - * more wasteful but more robust and portable text format. - * - * Iff , unload or reload the format that includes OID information. - * On input, we accept OIDs whether or not the table has an OID column, - * but silently drop them if it does not. On output, we report an error - * if the user asks for OIDs in a table that has none (not providing an - * OID column might seem friendlier, but could seriously confuse programs). - * - * If in the text format, delimit columns with delimiter and print - * NULL values as . - * * Do not allow a Postgres user without superuser privilege to read from * or write to a file. * @@ -724,22 +732,137 @@ DoCopy(const CopyStmt *stmt, const char *queryString) CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); - List *attnamelist = stmt->attlist; + Relation rel; + uint64 processed; + + /* Disallow file COPY except to superusers. */ + if (!pipe && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from a file"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + + if (stmt->relation) + { + TupleDesc tupDesc; + AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); + RangeTblEntry *rte; + List *attnums; + ListCell *cur; + + Assert(!stmt->query); + + /* Open and lock the relation, using the appropriate lock type. */ + rel = heap_openrv(stmt->relation, + (is_from ? RowExclusiveLock : AccessShareLock)); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->requiredPerms = required_access; + + tupDesc = RelationGetDescr(rel); + attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); + foreach (cur, attnums) + { + int attno = lfirst_int(cur) - + FirstLowInvalidHeapAttributeNumber; + + if (is_from) + rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); + else + rte->selectedCols = bms_add_member(rte->selectedCols, attno); + } + ExecCheckRTPerms(list_make1(rte), true); + } + else + { + Assert(stmt->query); + + rel = NULL; + } + + if (is_from) + { + /* check read-only transaction */ + if (XactReadOnly && rel->rd_backend != MyBackendId) + PreventCommandIfReadOnly("COPY FROM"); + + cstate = BeginCopyFrom(rel, stmt->filename, + stmt->attlist, stmt->options); + processed = CopyFrom(cstate); /* copy from file to database */ + EndCopyFrom(cstate); + } + else + { + cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename, + stmt->attlist, stmt->options); + processed = DoCopyTo(cstate); /* copy from database to file */ + EndCopyTo(cstate); + } + + /* + * Close the relation. If reading, we can release the AccessShareLock we + * got; if writing, we should hold the lock until end of transaction to + * ensure that updates will be committed before lock is released. + */ + if (rel != NULL) + heap_close(rel, (is_from ? NoLock : AccessShareLock)); + + return processed; +} + +/* + * Common setup routines used by BeginCopyFrom and BeginCopyTo. + * + * Iff , unload or reload in the binary format, as opposed to the + * more wasteful but more robust and portable text format. + * + * Iff , unload or reload the format that includes OID information. + * On input, we accept OIDs whether or not the table has an OID column, + * but silently drop them if it does not. On output, we report an error + * if the user asks for OIDs in a table that has none (not providing an + * OID column might seem friendlier, but could seriously confuse programs). + * + * If in the text format, delimit columns with delimiter and print + * NULL values as . + */ +static CopyState +BeginCopy(bool is_from, + Relation rel, + Node *raw_query, + const char *queryString, + List *attnamelist, + List *options) +{ + CopyState cstate; List *force_quote = NIL; List *force_notnull = NIL; bool force_quote_all = false; bool format_specified = false; - AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); ListCell *option; TupleDesc tupDesc; int num_phys_attrs; - uint64 processed; + MemoryContext oldcontext; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + /* + * We allocate everything used by a cstate in a new memory context. + * This would avoid memory leaks repeated uses of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + /* Extract options from the statement node tree */ - foreach(option, stmt->options) + foreach(option, options) { DefElem *defel = (DefElem *) lfirst(option); @@ -980,51 +1103,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); - /* Disallow file COPY except to superusers. */ - if (!pipe && !superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to COPY to or from a file"), - errhint("Anyone can COPY to stdout or from stdin. " - "psql's \\copy command also works for anyone."))); - - if (stmt->relation) + if (rel) { - RangeTblEntry *rte; - List *attnums; - ListCell *cur; + Assert(!raw_query); - Assert(!stmt->query); - cstate->queryDesc = NULL; - - /* Open and lock the relation, using the appropriate lock type. */ - cstate->rel = heap_openrv(stmt->relation, - (is_from ? RowExclusiveLock : AccessShareLock)); + cstate->rel = rel; tupDesc = RelationGetDescr(cstate->rel); - /* Check relation permissions. */ - rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(cstate->rel); - rte->requiredPerms = required_access; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - foreach (cur, attnums) - { - int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; - - if (is_from) - rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); - else - rte->selectedCols = bms_add_member(rte->selectedCols, attno); - } - ExecCheckRTPerms(list_make1(rte), true); - - /* check read-only transaction */ - if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId) - PreventCommandIfReadOnly("COPY FROM"); - /* Don't allow COPY w/ OIDs to or from a table without them */ if (cstate->oids && !cstate->rel->rd_rel->relhasoids) ereport(ERROR, @@ -1058,7 +1144,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ - rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query), + rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query), queryString, NULL, 0); /* We don't expect more or less than one result query */ @@ -1160,14 +1246,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString) } } - /* Set up variables to avoid per-attribute overhead. */ - initStringInfo(&cstate->attribute_buf); - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - cstate->processed = 0; - /* * Set up encoding conversion info. Even if the client and server * encodings are the same, we must apply pg_client_to_server() to validate @@ -1181,84 +1259,75 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ - cstate->filename = stmt->filename; - if (is_from) - CopyFrom(cstate); /* copy from file to database */ - else - DoCopyTo(cstate); /* copy from database to file */ + MemoryContextSwitchTo(oldcontext); - /* - * Close the relation or query. If reading, we can release the - * AccessShareLock we got; if writing, we should hold the lock until end - * of transaction to ensure that updates will be committed before lock is - * released. - */ - if (cstate->rel) - heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); - else - { - /* Close down the query and free resources. */ - ExecutorEnd(cstate->queryDesc); - FreeQueryDesc(cstate->queryDesc); - PopActiveSnapshot(); - } - - /* Clean up storage (probably not really necessary) */ - processed = cstate->processed; - - pfree(cstate->attribute_buf.data); - pfree(cstate->line_buf.data); - pfree(cstate->raw_buf); - pfree(cstate); - - return processed; + return cstate; } - /* - * This intermediate routine exists mainly to localize the effects of setjmp - * so we don't need to plaster a lot of variables with "volatile". + * Release resources allocated in a cstate for COPY TO/FROM. */ static void -DoCopyTo(CopyState cstate) +EndCopy(CopyState cstate) { - bool pipe = (cstate->filename == NULL); + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); - if (cstate->rel) + MemoryContextDelete(cstate->copycontext); + pfree(cstate); +} + +/* + * Setup CopyState to read tuples from a table or a query for COPY TO. + */ +static CopyState +BeginCopyTo(Relation rel, + Node *query, + const char *queryString, + const char *filename, + List *attnamelist, + List *options) +{ + CopyState cstate; + bool pipe = (filename == NULL); + MemoryContext oldcontext; + + if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { - if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) - { - if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from view \"%s\"", - RelationGetRelationName(cstate->rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from foreign table \"%s\"", - RelationGetRelationName(cstate->rel)), - errhint("Try the COPY (SELECT ...) TO variant."))); - else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from sequence \"%s\"", - RelationGetRelationName(cstate->rel)))); - else - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy from non-table relation \"%s\"", - RelationGetRelationName(cstate->rel)))); - } + if (rel->rd_rel->relkind == RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from view \"%s\"", + RelationGetRelationName(rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); + else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from foreign table \"%s\"", + RelationGetRelationName(rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); + else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from sequence \"%s\"", + RelationGetRelationName(rel)))); + else + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from non-table relation \"%s\"", + RelationGetRelationName(rel)))); } + cstate = BeginCopy(false, rel, query, queryString, attnamelist, options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + if (pipe) { - if (whereToSendOutput == DestRemote) - cstate->fe_copy = true; - else + if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else @@ -1270,11 +1339,12 @@ DoCopyTo(CopyState cstate) * Prevent write to relative path ... too easy to shoot oneself in the * foot by overwriting a database file ... */ - if (!is_absolute_path(cstate->filename)) + if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); + cstate->filename = pstrdup(filename); oumask = umask(S_IWGRP | S_IWOTH); cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); umask(oumask); @@ -1292,14 +1362,30 @@ DoCopyTo(CopyState cstate) errmsg("\"%s\" is a directory", cstate->filename))); } + MemoryContextSwitchTo(oldcontext); + + return cstate; +} + +/* + * This intermediate routine exists mainly to localize the effects of setjmp + * so we don't need to plaster a lot of variables with "volatile". + */ +static uint64 +DoCopyTo(CopyState cstate) +{ + bool pipe = (cstate->filename == NULL); + bool fe_copy = (pipe && whereToSendOutput == DestRemote); + uint64 processed; + PG_TRY(); { - if (cstate->fe_copy) + if (fe_copy) SendCopyBegin(cstate); - CopyTo(cstate); + processed = CopyTo(cstate); - if (cstate->fe_copy) + if (fe_copy) SendCopyEnd(cstate); } PG_CATCH(); @@ -1314,26 +1400,38 @@ DoCopyTo(CopyState cstate) } PG_END_TRY(); - if (!pipe) + return processed; +} + +/* + * Clean up storage and release resources for COPY TO. + */ +static void +EndCopyTo(CopyState cstate) +{ + if (cstate->queryDesc != NULL) { - if (FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); + /* Close down the query and free resources. */ + ExecutorEnd(cstate->queryDesc); + FreeQueryDesc(cstate->queryDesc); + PopActiveSnapshot(); } + + /* Clean up storage */ + EndCopy(cstate); } /* * Copy from relation or query TO file. */ -static void +static uint64 CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; Form_pg_attribute *attr; ListCell *cur; + uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); @@ -1439,6 +1537,7 @@ CopyTo(CopyState cstate) scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); + processed = 0; while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { CHECK_FOR_INTERRUPTS(); @@ -1448,14 +1547,19 @@ CopyTo(CopyState cstate) /* Format and send the data */ CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls); + processed++; } heap_endscan(scandesc); + + pfree(values); + pfree(nulls); } else { /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } if (cstate->binary) @@ -1467,6 +1571,8 @@ CopyTo(CopyState cstate) } MemoryContextDelete(cstate->rowcontext); + + return processed; } /* @@ -1558,16 +1664,16 @@ CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls) CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); - - cstate->processed++; } /* * error context callback for COPY FROM + * + * The argument for the error context must be CopyState. */ -static void -copy_in_error_callback(void *arg) +void +CopyFromErrorCallback(void *arg) { CopyState cstate = (CopyState) arg; @@ -1669,41 +1775,23 @@ limit_printout_length(const char *str) /* * Copy FROM file to relation. */ -static void +static uint64 CopyFrom(CopyState cstate) { - bool pipe = (cstate->filename == NULL); HeapTuple tuple; TupleDesc tupDesc; - Form_pg_attribute *attr; - AttrNumber num_phys_attrs, - attr_count, - num_defaults; - FmgrInfo *in_functions; - FmgrInfo oid_in_function; - Oid *typioparams; - Oid oid_typioparam; - int attnum; - int i; - Oid in_func_oid; Datum *values; bool *nulls; - int nfields; - char **field_strings; - bool done = false; - bool isnull; ResultRelInfo *resultRelInfo; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ + ExprContext *econtext; TupleTableSlot *slot; - bool file_has_oids; - int *defmap; - ExprState **defexprs; /* array of default att expressions */ - ExprContext *econtext; /* used for ExecEvalExpr for default atts */ MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; + uint64 processed = 0; Assert(cstate->rel); @@ -1731,6 +1819,8 @@ CopyFrom(CopyState cstate) RelationGetRelationName(cstate->rel)))); } + tupDesc = RelationGetDescr(cstate->rel); + /*---------- * Check to see if we can avoid writing WAL * @@ -1766,38 +1856,6 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_SKIP_WAL; } - if (pipe) - { - if (whereToSendOutput == DestRemote) - ReceiveCopyBegin(cstate); - else - cstate->copy_file = stdin; - } - else - { - struct stat st; - - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); - - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); - } - - tupDesc = RelationGetDescr(cstate->rel); - attr = tupDesc->attrs; - num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - num_defaults = 0; - /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -1826,51 +1884,6 @@ CopyFrom(CopyState cstate) slot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(slot, tupDesc); - econtext = GetPerTupleExprContext(estate); - - /* - * Pick up the required catalog information for each attribute in the - * relation, including the input function, the element type (to pass to - * the input function), and info about defaults and constraints. (Which - * input function we use depends on text/binary format choice.) - */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); - defmap = (int *) palloc(num_phys_attrs * sizeof(int)); - defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); - - for (attnum = 1; attnum <= num_phys_attrs; attnum++) - { - /* We don't need info for dropped attributes */ - if (attr[attnum - 1]->attisdropped) - continue; - - /* Fetch the input function and typioparam info */ - if (cstate->binary) - getTypeBinaryInputInfo(attr[attnum - 1]->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(attr[attnum - 1]->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - - /* Get default info if needed */ - if (!list_member_int(cstate->attnumlist, attnum)) - { - /* attribute is NOT to be copied from input */ - /* use default value if one exists */ - Node *defexpr = build_column_default(cstate->rel, attnum); - - if (defexpr != NULL) - { - defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr, - estate); - defmap[num_defaults] = attnum - 1; - num_defaults++; - } - } - } - /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -1882,296 +1895,38 @@ CopyFrom(CopyState cstate) */ ExecBSInsertTriggers(estate, resultRelInfo); - if (!cstate->binary) - file_has_oids = cstate->oids; /* must rely on user to tell us... */ - else - { - /* Read and verify binary header */ - char readSig[11]; - int32 tmp; - - /* Signature */ - if (CopyGetData(cstate, readSig, 11, 11) != 11 || - memcmp(readSig, BinarySignature, 11) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("COPY file signature not recognized"))); - /* Flags field */ - if (!CopyGetInt32(cstate, &tmp)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing flags)"))); - file_has_oids = (tmp & (1 << 16)) != 0; - tmp &= ~(1 << 16); - if ((tmp >> 16) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unrecognized critical flags in COPY file header"))); - /* Header extension length */ - if (!CopyGetInt32(cstate, &tmp) || - tmp < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing length)"))); - /* Skip extension header, if present */ - while (tmp-- > 0) - { - if (CopyGetData(cstate, readSig, 1, 1) != 1) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (wrong length)"))); - } - } - - if (file_has_oids && cstate->binary) - { - getTypeBinaryInputInfo(OIDOID, - &in_func_oid, &oid_typioparam); - fmgr_info(in_func_oid, &oid_in_function); - } - - values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); - nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); - - /* create workspace for CopyReadAttributes results */ - nfields = file_has_oids ? (attr_count + 1) : attr_count; - if (! cstate->binary) - { - cstate->max_fields = nfields; - cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); - } - - /* Initialize state variables */ - cstate->fe_eof = false; - cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; + values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); bistate = GetBulkInsertState(); + econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ - errcontext.callback = copy_in_error_callback; + errcontext.callback = CopyFromErrorCallback; errcontext.arg = (void *) cstate; errcontext.previous = error_context_stack; error_context_stack = &errcontext; - /* on input just throw the header line away */ - if (cstate->header_line) - { - cstate->cur_lineno++; - done = CopyReadLine(cstate); - } - - while (!done) + for (;;) { bool skip_tuple; Oid loaded_oid = InvalidOid; CHECK_FOR_INTERRUPTS(); - cstate->cur_lineno++; - /* Reset the per-tuple exprcontext */ ResetPerTupleExprContext(estate); /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - - if (!cstate->binary) - { - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); - - /* - * EOF at start of line means we're done. If we see EOF after - * some characters, we act as though it was newline followed by - * EOF, ie, process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - break; - - /* Parse the line into de-escaped field values */ - if (cstate->csv_mode) - fldct = CopyReadAttributesCSV(cstate); - else - fldct = CopyReadAttributesText(cstate); - - /* check for overflowing fields */ - if (nfields > 0 && fldct > nfields) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - field_strings = cstate->raw_fields; - - /* Read the OID field if present */ - if (file_has_oids) - { - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for OID column"))); - string = field_strings[fieldno++]; - - if (string == NULL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("null OID in COPY data"))); - else - { - cstate->cur_attname = "oid"; - cstate->cur_attval = string; - loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin, - CStringGetDatum(string))); - if (loaded_oid == InvalidOid) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid OID in COPY data"))); - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - } - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(attr[m]->attname)))); - string = field_strings[fieldno++]; - - if (cstate->csv_mode && string == NULL && - cstate->force_notnull_flags[m]) - { - /* Go ahead and read the NULL string */ - string = cstate->null_print; - } - - cstate->cur_attname = NameStr(attr[m]->attname); - cstate->cur_attval = string; - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - attr[m]->atttypmod); - if (string != NULL) - nulls[m] = false; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == nfields); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - done = true; - break; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for - * the protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle - * CopyFail, if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an - * old-protocol copy, since there is no protocol-level EOF - * marker then. We could go either way for copy from file, - * but choose to throw error if there's data after the EOF - * marker, for consistency with the new-protocol case. - */ - char dummy; - - if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - done = true; - break; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - if (file_has_oids) - { - cstate->cur_attname = "oid"; - loaded_oid = - DatumGetObjectId(CopyReadBinaryAttribute(cstate, - 0, - &oid_in_function, - oid_typioparam, - -1, - &isnull)); - if (isnull || loaded_oid == InvalidOid) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid OID in COPY data"))); - cstate->cur_attname = NULL; - } - - i = 0; - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - - cstate->cur_attname = NameStr(attr[m]->attname); - i++; - values[m] = CopyReadBinaryAttribute(cstate, - i, - &in_functions[m], - typioparams[m], - attr[m]->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } - - /* - * Now compute and insert any defaults available for the columns not - * provided by the input data. Anything not processed here or above - * will remain NULL. - */ - for (i = 0; i < num_defaults; i++) - { - values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, - &nulls[defmap[i]], NULL); - } + if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) + break; /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); - if (cstate->oids && file_has_oids) + if (loaded_oid != InvalidOid) HeapTupleSetOid(tuple, loaded_oid); /* Triggers and stuff need to be invoked in query context. */ @@ -2225,7 +1980,7 @@ CopyFrom(CopyState cstate) * this is the same definition used by execMain.c for counting * tuples inserted by an INSERT command. */ - cstate->processed++; + processed++; } } @@ -2244,13 +1999,6 @@ CopyFrom(CopyState cstate) pfree(values); pfree(nulls); - if (! cstate->binary) - pfree(cstate->raw_fields); - - pfree(in_functions); - pfree(typioparams); - pfree(defmap); - pfree(defexprs); ExecResetTupleTable(estate->es_tupleTable, false); @@ -2258,23 +2006,500 @@ CopyFrom(CopyState cstate) FreeExecutorState(estate); - if (!pipe) - { - if (FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); - } - /* * If we skipped writing WAL, then we need to sync the heap (but not * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) heap_sync(cstate->rel); + + return processed; } +/* + * Setup to read tuples from a file for COPY FROM. + * + * 'rel': Used as a template for the tuples + * 'filename': Name of server-local file to read + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyState, to be passed to NextCopyFrom and related functions. + */ +CopyState +BeginCopyFrom(Relation rel, + const char *filename, + List *attnamelist, + List *options) +{ + CopyState cstate; + bool pipe = (filename == NULL); + TupleDesc tupDesc; + Form_pg_attribute *attr; + AttrNumber num_phys_attrs, + num_defaults; + FmgrInfo *in_functions; + Oid *typioparams; + int attnum; + Oid in_func_oid; + int *defmap; + ExprState **defexprs; + MemoryContext oldcontext; + + cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Initialize state variables */ + cstate->fe_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + initStringInfo(&cstate->line_buf); + cstate->line_buf_converted = false; + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + + tupDesc = RelationGetDescr(cstate->rel); + attr = tupDesc->attrs; + num_phys_attrs = tupDesc->natts; + num_defaults = 0; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + defmap = (int *) palloc(num_phys_attrs * sizeof(int)); + defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); + + for (attnum = 1; attnum <= num_phys_attrs; attnum++) + { + /* We don't need info for dropped attributes */ + if (attr[attnum - 1]->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + if (cstate->binary) + getTypeBinaryInputInfo(attr[attnum - 1]->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + else + getTypeInputInfo(attr[attnum - 1]->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + + /* Get default info if needed */ + if (!list_member_int(cstate->attnumlist, attnum)) + { + /* attribute is NOT to be copied from input */ + /* use default value if one exists */ + Node *defexpr = build_column_default(cstate->rel, attnum); + + if (defexpr != NULL) + { + /* Initialize expressions in copycontext. */ + defexprs[num_defaults] = ExecInitExpr( + expression_planner((Expr *) defexpr), NULL); + defmap[num_defaults] = attnum - 1; + num_defaults++; + } + } + } + + /* We keep those variables in cstate. */ + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; + cstate->defmap = defmap; + cstate->defexprs = defexprs; + cstate->num_defaults = num_defaults; + + if (pipe) + { + if (whereToSendOutput == DestRemote) + ReceiveCopyBegin(cstate); + else + cstate->copy_file = stdin; + } + else + { + struct stat st; + + cstate->filename = pstrdup(filename); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename))); + + fstat(fileno(cstate->copy_file), &st); + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } + + if (!cstate->binary) + { + /* must rely on user to tell us... */ + cstate->file_has_oids = cstate->oids; + } + else + { + /* Read and verify binary header */ + char readSig[11]; + int32 tmp; + + /* Signature */ + if (CopyGetData(cstate, readSig, 11, 11) != 11 || + memcmp(readSig, BinarySignature, 11) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY file signature not recognized"))); + /* Flags field */ + if (!CopyGetInt32(cstate, &tmp)) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing flags)"))); + cstate->file_has_oids = (tmp & (1 << 16)) != 0; + tmp &= ~(1 << 16); + if ((tmp >> 16) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unrecognized critical flags in COPY file header"))); + /* Header extension length */ + if (!CopyGetInt32(cstate, &tmp) || + tmp < 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing length)"))); + /* Skip extension header, if present */ + while (tmp-- > 0) + { + if (CopyGetData(cstate, readSig, 1, 1) != 1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (wrong length)"))); + } + } + + if (cstate->file_has_oids && cstate->binary) + { + getTypeBinaryInputInfo(OIDOID, + &in_func_oid, &cstate->oid_typioparam); + fmgr_info(in_func_oid, &cstate->oid_in_function); + } + + /* create workspace for CopyReadAttributes results */ + if (!cstate->binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count; + + cstate->max_fields = nfields; + cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); + } + + MemoryContextSwitchTo(oldcontext); + + return cstate; +} + +/* + * Read raw fields in the next line for COPY FROM in text or csv mode. + * Return false if no more lines. + * + * An internal temporary buffer is returned via 'fields'. It is valid until + * the next call of the function. Since the function returns all raw fields + * in the input file, 'nfields' could be different from the number of columns + * in the relation. + * + * NOTE: force_not_null option are not applied to the returned fields. + */ +bool +NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) +{ + int fldct; + bool done; + + /* only available for text or csv input */ + Assert(!cstate->binary); + + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } + + cstate->cur_lineno++; + + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after + * some characters, we act as though it was newline followed by + * EOF, ie, process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + + /* Parse the line into de-escaped field values */ + if (cstate->csv_mode) + fldct = CopyReadAttributesCSV(cstate); + else + fldct = CopyReadAttributesText(cstate); + + *fields = cstate->raw_fields; + *nfields = fldct; + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each columns not + * read from the file. It can be NULL when no default values are used, i.e. + * when all columns are read from the file. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + * Oid of the tuple is returned with 'tupleOid' separately. + */ +bool +NextCopyFrom(CopyState cstate, ExprContext *econtext, + Datum *values, bool *nulls, Oid *tupleOid) +{ + TupleDesc tupDesc; + Form_pg_attribute *attr; + AttrNumber num_phys_attrs, + attr_count, + num_defaults = cstate->num_defaults; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int i; + int nfields; + bool isnull; + bool file_has_oids = cstate->file_has_oids; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + attr = tupDesc->attrs; + num_phys_attrs = tupDesc->natts; + attr_count = list_length(cstate->attnumlist); + nfields = file_has_oids ? (attr_count + 1) : attr_count; + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + + if (!cstate->binary) + { + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; + + /* check for overflowing fields */ + if (nfields > 0 && fldct > nfields) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Read the OID field if present */ + if (file_has_oids) + { + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for OID column"))); + string = field_strings[fieldno++]; + + if (string == NULL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("null OID in COPY data"))); + else if (cstate->oids && tupleOid != NULL) + { + cstate->cur_attname = "oid"; + cstate->cur_attval = string; + *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin, + CStringGetDatum(string))); + if (*tupleOid == InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid OID in COPY data"))); + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + } + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(attr[m]->attname)))); + string = field_strings[fieldno++]; + + if (cstate->csv_mode && string == NULL && + cstate->force_notnull_flags[m]) + { + /* Go ahead and read the NULL string */ + string = cstate->null_print; + } + + cstate->cur_attname = NameStr(attr[m]->attname); + cstate->cur_attval = string; + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + attr[m]->atttypmod); + if (string != NULL) + nulls[m] = false; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == nfields); + } + else + { + /* binary */ + int16 fld_count; + ListCell *cur; + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. In a V3-protocol copy, wait for + * the protocol-level EOF, and complain if it doesn't come + * immediately. This ensures that we correctly handle + * CopyFail, if client chooses to send that now. + * + * Note that we MUST NOT try to read more data in an + * old-protocol copy, since there is no protocol-level EOF + * marker then. We could go either way for copy from file, + * but choose to throw error if there's data after the EOF + * marker, for consistency with the new-protocol case. + */ + char dummy; + + if (cstate->copy_dest != COPY_OLD_FE && + CopyGetData(cstate, &dummy, 1, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + if (file_has_oids) + { + Oid loaded_oid; + + cstate->cur_attname = "oid"; + loaded_oid = + DatumGetObjectId(CopyReadBinaryAttribute(cstate, + 0, + &cstate->oid_in_function, + cstate->oid_typioparam, + -1, + &isnull)); + if (isnull || loaded_oid == InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid OID in COPY data"))); + cstate->cur_attname = NULL; + if (cstate->oids && tupleOid != NULL) + *tupleOid = loaded_oid; + } + + i = 0; + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + + cstate->cur_attname = NameStr(attr[m]->attname); + i++; + values[m] = CopyReadBinaryAttribute(cstate, + i, + &in_functions[m], + typioparams[m], + attr[m]->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + + /* + * Now compute and insert any defaults available for the columns not + * provided by the input data. Anything not processed here or above + * will remain NULL. + */ + for (i = 0; i < num_defaults; i++) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, + &nulls[defmap[i]], NULL); + } + + return true; +} + +/* + * Clean up storage and release resources for COPY FROM. + */ +void +EndCopyFrom(CopyState cstate) +{ + /* No COPY FROM related resources except memory. */ + + EndCopy(cstate); +} /* * Read the next input line and stash it in line_buf, with conversion to @@ -3537,6 +3762,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* And send the data */ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); + myState->processed++; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 9e2bbe8d8e..afe4b5e450 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,12 +14,24 @@ #ifndef COPY_H #define COPY_H +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" +typedef struct CopyStateData *CopyState; + extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); +extern CopyState BeginCopyFrom(Relation rel, const char *filename, + List *attnamelist, List *options); +extern void EndCopyFrom(CopyState cstate); +extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, + Datum *values, bool *nulls, Oid *tupleOid); +extern bool NextCopyFromRawFields(CopyState cstate, + char ***fields, int *nfields); +extern void CopyFromErrorCallback(void *arg); + extern DestReceiver *CreateCopyDestReceiver(void); #endif /* COPY_H */