reindexdb: Fix the index-level REINDEX with multiple jobs

47f99a407d introduced a parallel index-level REINDEX.  The code was written
assuming that running run_reindex_command() with 'async == true' can schedule
a number of queries for a connection.  That's not true, and the second query
sent using run_reindex_command() will wait for the completion of the previous
one.

This commit fixes that by putting REINDEX commands for the same table into a
single query.

Also, this commit removes the 'async' argument from run_reindex_command(),
as only its call always passes 'async == true'.

Reported-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/202503071820.j25zn3lo4hvn%40alvherre.pgsql
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Backpatch-through: 17
This commit is contained in:
Alexander Korotkov 2025-03-16 13:28:22 +02:00
parent 83e5763d4d
commit 682c5be25c

View File

@ -49,10 +49,13 @@ static void reindex_all_databases(ConnParams *cparams,
bool syscatalog, SimpleStringList *schemas,
SimpleStringList *tables,
SimpleStringList *indexes);
static void run_reindex_command(PGconn *conn, ReindexType type,
static void gen_reindex_command(PGconn *conn, ReindexType type,
const char *name, bool echo, bool verbose,
bool concurrently, bool async,
const char *tablespace);
bool concurrently, const char *tablespace,
PQExpBufferData *sql);
static void run_reindex_command(PGconn *conn, ReindexType type,
const char *name, bool echo,
PQExpBufferData *sq);
static void help(const char *progname);
@ -284,7 +287,6 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
ParallelSlotArray *sa;
bool failed = false;
int items_count = 0;
char *prev_index_table_name = NULL;
ParallelSlot *free_slot = NULL;
conn = connectDatabase(cparams, progname, echo, false, true);
@ -430,8 +432,8 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
cell = process_list->head;
do
{
PQExpBufferData sql;
const char *objname = cell->val;
bool need_new_slot = true;
if (CancelRequested)
{
@ -439,35 +441,45 @@ reindex_one_database(ConnParams *cparams, ReindexType type,
goto finish;
}
/*
* For parallel index-level REINDEX, the indices of the same table are
* ordered together and they are to be processed by the same job. So,
* we don't switch the job as soon as the index belongs to the same
* table as the previous one.
*/
free_slot = ParallelSlotsGetIdle(sa, NULL);
if (!free_slot)
{
failed = true;
goto finish;
}
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
initPQExpBuffer(&sql);
if (parallel && process_type == REINDEX_INDEX)
{
if (prev_index_table_name != NULL &&
strcmp(prev_index_table_name, indices_tables_cell->val) == 0)
need_new_slot = false;
prev_index_table_name = indices_tables_cell->val;
/*
* For parallel index-level REINDEX, the indices of the same table
* are ordered together and they are to be processed by the same
* job. So, we put all the relevant REINDEX commands into the
* same SQL query to be processed by this job at once.
*/
gen_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, tablespace, &sql);
while (indices_tables_cell->next &&
strcmp(indices_tables_cell->val, indices_tables_cell->next->val) == 0)
{
indices_tables_cell = indices_tables_cell->next;
cell = cell->next;
objname = cell->val;
appendPQExpBufferChar(&sql, '\n');
gen_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, tablespace, &sql);
}
indices_tables_cell = indices_tables_cell->next;
}
if (need_new_slot)
else
{
free_slot = ParallelSlotsGetIdle(sa, NULL);
if (!free_slot)
{
failed = true;
goto finish;
}
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
gen_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, tablespace, &sql);
}
run_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, true, tablespace);
echo, &sql);
termPQExpBuffer(&sql);
cell = cell->next;
} while (cell != NULL);
@ -495,57 +507,57 @@ finish:
exit(1);
}
/*
* Append a SQL command required to reindex a given database object to the
* '*sql' string.
*/
static void
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
bool echo, bool verbose, bool concurrently, bool async,
const char *tablespace)
gen_reindex_command(PGconn *conn, ReindexType type, const char *name,
bool echo, bool verbose, bool concurrently,
const char *tablespace, PQExpBufferData *sql)
{
const char *paren = "(";
const char *comma = ", ";
const char *sep = paren;
PQExpBufferData sql;
bool status;
Assert(name);
/* build the REINDEX query */
initPQExpBuffer(&sql);
appendPQExpBufferStr(&sql, "REINDEX ");
appendPQExpBufferStr(sql, "REINDEX ");
if (verbose)
{
appendPQExpBuffer(&sql, "%sVERBOSE", sep);
appendPQExpBuffer(sql, "%sVERBOSE", sep);
sep = comma;
}
if (tablespace)
{
appendPQExpBuffer(&sql, "%sTABLESPACE %s", sep,
appendPQExpBuffer(sql, "%sTABLESPACE %s", sep,
fmtIdEnc(tablespace, PQclientEncoding(conn)));
sep = comma;
}
if (sep != paren)
appendPQExpBufferStr(&sql, ") ");
appendPQExpBufferStr(sql, ") ");
/* object type */
switch (type)
{
case REINDEX_DATABASE:
appendPQExpBufferStr(&sql, "DATABASE ");
appendPQExpBufferStr(sql, "DATABASE ");
break;
case REINDEX_INDEX:
appendPQExpBufferStr(&sql, "INDEX ");
appendPQExpBufferStr(sql, "INDEX ");
break;
case REINDEX_SCHEMA:
appendPQExpBufferStr(&sql, "SCHEMA ");
appendPQExpBufferStr(sql, "SCHEMA ");
break;
case REINDEX_SYSTEM:
appendPQExpBufferStr(&sql, "SYSTEM ");
appendPQExpBufferStr(sql, "SYSTEM ");
break;
case REINDEX_TABLE:
appendPQExpBufferStr(&sql, "TABLE ");
appendPQExpBufferStr(sql, "TABLE ");
break;
}
@ -555,37 +567,43 @@ run_reindex_command(PGconn *conn, ReindexType type, const char *name,
* object type.
*/
if (concurrently)
appendPQExpBufferStr(&sql, "CONCURRENTLY ");
appendPQExpBufferStr(sql, "CONCURRENTLY ");
/* object name */
switch (type)
{
case REINDEX_DATABASE:
case REINDEX_SYSTEM:
appendPQExpBufferStr(&sql,
appendPQExpBufferStr(sql,
fmtIdEnc(name, PQclientEncoding(conn)));
break;
case REINDEX_INDEX:
case REINDEX_TABLE:
appendQualifiedRelation(&sql, name, conn, echo);
appendQualifiedRelation(sql, name, conn, echo);
break;
case REINDEX_SCHEMA:
appendPQExpBufferStr(&sql, name);
appendPQExpBufferStr(sql, name);
break;
}
/* finish the query */
appendPQExpBufferChar(&sql, ';');
appendPQExpBufferChar(sql, ';');
}
if (async)
{
if (echo)
printf("%s\n", sql.data);
/*
* Run one or more reindex commands accumulated in the '*sql' string against
* a given database connection.
*/
static void
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
bool echo, PQExpBufferData *sql)
{
bool status;
status = PQsendQuery(conn, sql.data) == 1;
}
else
status = executeMaintenanceCommand(conn, sql.data, echo);
if (echo)
printf("%s\n", sql->data);
status = PQsendQuery(conn, sql->data) == 1;
if (!status)
{
@ -612,14 +630,7 @@ run_reindex_command(PGconn *conn, ReindexType type, const char *name,
name, PQdb(conn), PQerrorMessage(conn));
break;
}
if (!async)
{
PQfinish(conn);
exit(1);
}
}
termPQExpBuffer(&sql);
}
/*