Support custom wait events for wait event type "Extension"

Two backend routines are added to allow extension to allocate and define
custom wait events, all of these being allocated in the type
"Extension":
* WaitEventExtensionNew(), that allocates a wait event ID computed from
a counter in shared memory.
* WaitEventExtensionRegisterName(), to associate a custom string to the
wait event ID allocated.

Note that this includes an example of how to use this new facility in
worker_spi with tests in TAP for various scenarios, and some
documentation about how to use them.

Any code in the tree that currently uses WAIT_EVENT_EXTENSION could
switch to this new facility to define custom wait events.  This is left
as work for future patches.

Author: Masahiro Ikeda
Reviewed-by: Andres Freund, Michael Paquier, Tristan Partin, Bharath
Rupireddy
Discussion: https://postgr.es/m/b9f5411acda0cf15c8fbb767702ff43e@oss.nttdata.com
This commit is contained in:
Michael Paquier 2023-07-31 17:09:24 +09:00
parent 39055cb4cc
commit c9af054653
11 changed files with 393 additions and 23 deletions

View File

@ -1117,11 +1117,14 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<note>
<para>
Extensions can add <literal>LWLock</literal> types to the list shown in
<xref linkend="wait-event-lwlock-table"/>. In some cases, the name
Extensions can add <literal>Extension</literal> and
<literal>LWLock</literal> types
to the list shown in <xref linkend="wait-event-extension-table"/> and
<xref linkend="wait-event-lwlock-table"/>. In some cases, the name
assigned by an extension will not be available in all server processes;
so an <literal>LWLock</literal> wait event might be reported as
just <quote><literal>extension</literal></quote> rather than the
so an <literal>Extension</literal> or <literal>LWLock</literal> wait
event might be reported as just
<quote><literal>extension</literal></quote> rather than the
extension-assigned name.
</para>
</note>

View File

@ -3453,6 +3453,51 @@ if (!ptr)
</para>
</sect2>
<sect2 id="xfunc-addin-wait-events">
<title>Shared Memory and Custom Wait Events</title>
<para>
Add-ins can define custom wait events under the wait event type
<literal>Extension</literal>. The add-in's shared library must be
preloaded by specifying it in <literal>shared_preload_libraries</literal>,
and register a <literal>shmem_request_hook</literal> and a
<literal>shmem_startup_hook</literal> in its
<function>_PG_init</function> function.
<literal>shmem_request_hook</literal> can request a shared memory size
to be later used at startup by calling:
<programlisting>
void RequestAddinShmemSpace(int size)
</programlisting>
</para>
<para>
<literal>shmem_startup_hook</literal> can allocate in shared memory
custom wait events by calling while holding the LWLock
<function>AddinShmemInitLock</function> to avoid any race conditions:
<programlisting>
uint32 WaitEventExtensionNew(void)
</programlisting>
Next, each process needs to associate the wait event allocated previously
to a user-facing custom string, which is something done by calling:
<programlisting>
void WaitEventExtensionRegisterName(uint32 wait_event_info, const char *wait_event_name)
</programlisting>
An example can be found in <filename>src/test/modules/worker_spi</filename>
in the PostgreSQL source tree.
</para>
<para>
Custom wait events can be viewed in
<link linkend="monitoring-pg-stat-activity-view"><structname>pg_stat_activity</structname></link>:
<screen>
=# SELECT wait_event_type, wait_event FROM pg_stat_activity
WHERE backend_type ~ 'worker_spi';
wait_event_type | wait_event
-----------------+-----------------
Extension | worker_spi_main
(1 row)
</screen>
</para>
</sect2>
<sect2 id="extend-cpp">
<title>Using C++ for Extensibility</title>

View File

@ -49,6 +49,7 @@
#include "storage/spin.h"
#include "utils/guc.h"
#include "utils/snapmgr.h"
#include "utils/wait_event.h"
/* GUCs */
int shared_memory_type = DEFAULT_SHARED_MEMORY_TYPE;
@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
size = add_size(size, WaitEventExtensionShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@ -301,6 +303,7 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
StatsShmemInit();
WaitEventExtensionShmemInit();
#ifdef EXEC_BACKEND

View File

@ -133,10 +133,11 @@ if ($gen_code)
foreach my $waitclass (sort { uc($a) cmp uc($b) } keys %hashwe)
{
# Don't generate .c and .h files for LWLock and Lock, these are
# handled independently.
# Don't generate .c and .h files for Extension, LWLock and
# Lock, these are handled independently.
next
if ( $waitclass eq 'WaitEventLWLock'
if ( $waitclass eq 'WaitEventExtension'
|| $waitclass eq 'WaitEventLWLock'
|| $waitclass eq 'WaitEventLock');
my $last = $waitclass;

View File

@ -22,15 +22,18 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "storage/lmgr.h" /* for GetLockNameFromTagType */
#include "storage/lwlock.h" /* for GetLWLockIdentifier */
#include "storage/spin.h"
#include "utils/memutils.h"
#include "utils/wait_event.h"
static const char *pgstat_get_wait_activity(WaitEventActivity w);
static const char *pgstat_get_wait_bufferpin(WaitEventBufferPin w);
static const char *pgstat_get_wait_client(WaitEventClient w);
static const char *pgstat_get_wait_extension(WaitEventExtension w);
static const char *pgstat_get_wait_ipc(WaitEventIPC w);
static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
static const char *pgstat_get_wait_io(WaitEventIO w);
@ -42,6 +45,169 @@ uint32 *my_wait_event_info = &local_my_wait_event_info;
#define WAIT_EVENT_CLASS_MASK 0xFF000000
#define WAIT_EVENT_ID_MASK 0x0000FFFF
/* dynamic allocation counter for custom wait events in extensions */
typedef struct WaitEventExtensionCounterData
{
int nextId; /* next ID to assign */
slock_t mutex; /* protects the counter */
} WaitEventExtensionCounterData;
/* pointer to the shared memory */
static WaitEventExtensionCounterData *WaitEventExtensionCounter;
/* first event ID of custom wait events for extensions */
#define NUM_BUILTIN_WAIT_EVENT_EXTENSION \
(WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED - WAIT_EVENT_EXTENSION)
/*
* This is indexed by event ID minus NUM_BUILTIN_WAIT_EVENT_EXTENSION, and
* stores the names of all dynamically-created event IDs known to the current
* process. Any unused entries in the array will contain NULL.
*/
static const char **WaitEventExtensionNames = NULL;
static int WaitEventExtensionNamesAllocated = 0;
static const char *GetWaitEventExtensionIdentifier(uint16 eventId);
/*
* Return the space for dynamic allocation counter.
*/
Size
WaitEventExtensionShmemSize(void)
{
return sizeof(WaitEventExtensionCounterData);
}
/*
* Allocate shmem space for dynamic allocation counter.
*/
void
WaitEventExtensionShmemInit(void)
{
bool found;
WaitEventExtensionCounter = (WaitEventExtensionCounterData *)
ShmemInitStruct("WaitEventExtensionCounterData",
WaitEventExtensionShmemSize(), &found);
if (!found)
{
/* initialize the allocation counter and its spinlock. */
WaitEventExtensionCounter->nextId = NUM_BUILTIN_WAIT_EVENT_EXTENSION;
SpinLockInit(&WaitEventExtensionCounter->mutex);
}
}
/*
* Allocate a new event ID and return the wait event.
*/
uint32
WaitEventExtensionNew(void)
{
uint16 eventId;
Assert(LWLockHeldByMeInMode(AddinShmemInitLock, LW_EXCLUSIVE));
SpinLockAcquire(&WaitEventExtensionCounter->mutex);
if (WaitEventExtensionCounter->nextId > PG_UINT16_MAX)
{
SpinLockRelease(&WaitEventExtensionCounter->mutex);
ereport(ERROR,
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many wait events for extensions"));
}
eventId = WaitEventExtensionCounter->nextId++;
SpinLockRelease(&WaitEventExtensionCounter->mutex);
return PG_WAIT_EXTENSION | eventId;
}
/*
* Register a dynamic wait event name for extension in the lookup table
* of the current process.
*
* This routine will save a pointer to the wait event name passed as an argument,
* so the name should be allocated in a backend-lifetime context
* (shared memory, TopMemoryContext, static constant, or similar).
*
* The "wait_event_name" will be user-visible as a wait event name, so try to
* use a name that fits the style for those.
*/
void
WaitEventExtensionRegisterName(uint32 wait_event_info,
const char *wait_event_name)
{
uint32 classId;
uint16 eventId;
classId = wait_event_info & WAIT_EVENT_CLASS_MASK;
eventId = wait_event_info & WAIT_EVENT_ID_MASK;
/* Check the wait event class. */
if (classId != PG_WAIT_EXTENSION)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid wait event class %u", classId));
/* This should only be called for user-defined wait event. */
if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid wait event ID %u", eventId));
/* Convert to array index. */
eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION;
/* If necessary, create or enlarge array. */
if (eventId >= WaitEventExtensionNamesAllocated)
{
uint32 newalloc;
newalloc = pg_nextpower2_32(Max(8, eventId + 1));
if (WaitEventExtensionNames == NULL)
WaitEventExtensionNames = (const char **)
MemoryContextAllocZero(TopMemoryContext,
newalloc * sizeof(char *));
else
WaitEventExtensionNames =
repalloc0_array(WaitEventExtensionNames, const char *,
WaitEventExtensionNamesAllocated, newalloc);
WaitEventExtensionNamesAllocated = newalloc;
}
WaitEventExtensionNames[eventId] = wait_event_name;
}
/*
* Return the name of an wait event ID for extension.
*/
static const char *
GetWaitEventExtensionIdentifier(uint16 eventId)
{
/* Built-in event? */
if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION)
return "Extension";
/*
* It is a user-defined wait event, so look at WaitEventExtensionNames[].
* However, it is possible that the name has never been registered by
* calling WaitEventExtensionRegisterName() in the current process, in
* which case give up and return "extension".
*/
eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION;
if (eventId >= WaitEventExtensionNamesAllocated ||
WaitEventExtensionNames[eventId] == NULL)
return "extension";
return WaitEventExtensionNames[eventId];
}
/*
* Configure wait event reporting to report wait events to *wait_event_info.
* *wait_event_info needs to be valid until pgstat_reset_wait_event_storage()
@ -151,6 +317,9 @@ pgstat_get_wait_event(uint32 wait_event_info)
case PG_WAIT_LOCK:
event_name = GetLockNameFromTagType(eventId);
break;
case PG_WAIT_EXTENSION:
event_name = GetWaitEventExtensionIdentifier(eventId);
break;
case PG_WAIT_BUFFERPIN:
{
WaitEventBufferPin w = (WaitEventBufferPin) wait_event_info;
@ -172,13 +341,6 @@ pgstat_get_wait_event(uint32 wait_event_info)
event_name = pgstat_get_wait_client(w);
break;
}
case PG_WAIT_EXTENSION:
{
WaitEventExtension w = (WaitEventExtension) wait_event_info;
event_name = pgstat_get_wait_extension(w);
break;
}
case PG_WAIT_IPC:
{
WaitEventIPC w = (WaitEventIPC) wait_event_info;

View File

@ -261,7 +261,7 @@ WAIT_EVENT_BUFFER_PIN BufferPin "Waiting to acquire an exclusive pin on a buffer
Section: ClassName - WaitEventExtension
WAIT_EVENT_EXTENSION Extension "Waiting in an extension."
WAIT_EVENT_DOCONLY Extension "Waiting in an extension."
#
# Wait events - LWLock

View File

@ -38,6 +38,32 @@ extern void pgstat_reset_wait_event_storage(void);
extern PGDLLIMPORT uint32 *my_wait_event_info;
/* ----------
* Wait Events - Extension
*
* Use this category when the server process is waiting for some condition
* defined by an extension module.
*
* Extensions can define their own wait events in this category. First,
* they should call WaitEventExtensionNew() to get one or more wait event
* IDs that are allocated from a shared counter. These can be used directly
* with pgstat_report_wait_start() or equivalent. Next, each individual
* process should call WaitEventExtensionRegisterName() to associate a wait
* event string to the number allocated previously.
*/
typedef enum
{
WAIT_EVENT_EXTENSION = PG_WAIT_EXTENSION,
WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED
} WaitEventExtension;
extern void WaitEventExtensionShmemInit(void);
extern Size WaitEventExtensionShmemSize(void);
extern uint32 WaitEventExtensionNew(void);
extern void WaitEventExtensionRegisterName(uint32 wait_event_info,
const char *wait_event_name);
/* ----------
* pgstat_report_wait_start() -
*

View File

@ -39,6 +39,28 @@ $node->poll_query_until('postgres',
$result = $node->safe_psql('postgres', 'SELECT * FROM schema4.counted;');
is($result, qq(total|1), 'dynamic bgworker correctly consumed tuple data');
# Check the wait event used by the dynamic bgworker. For a session without
# the state in shared memory known, the default of "extension" is the value
# waited on.
$result = $node->poll_query_until(
'postgres',
qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
'extension');
is($result, 1, 'dynamic bgworker has reported "extension" as wait event');
# If the shared memory state is loaded (here with worker_spi_init within
# the same connection as the one querying pg_stat_activity), the wait
# event is the custom one.
# The expected result is a special pattern here with a newline coming from the
# first query where the shared memory state is set.
$result = $node->poll_query_until(
'postgres',
qq[SELECT worker_spi_init(); SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
qq[
worker_spi_main]);
is($result, 1,
'dynamic bgworker has reported "worker_spi_main" as wait event');
note "testing bgworkers loaded with shared_preload_libraries";
# Create the database first so as the workers can connect to it when
@ -58,9 +80,9 @@ $node->restart;
# Check that bgworkers have been registered and launched.
ok( $node->poll_query_until(
'mydb',
qq[SELECT datname, count(datname) FROM pg_stat_activity
WHERE backend_type = 'worker_spi' GROUP BY datname;],
'mydb|3'),
qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
WHERE backend_type = 'worker_spi' GROUP BY datname, wait_event;],
'mydb|3|worker_spi_main'),
'bgworkers all launched'
) or die "Timed out while waiting for bgworkers to be launched";
@ -72,10 +94,10 @@ my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);');
ok( $node->poll_query_until(
'mydb',
qq[SELECT datname, count(datname) FROM pg_stat_activity
qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
WHERE backend_type = 'worker_spi dynamic' AND
pid IN ($worker1_pid, $worker2_pid) GROUP BY datname;],
'mydb|2'),
pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;],
'mydb|2|worker_spi_main'),
'dynamic bgworkers all launched'
) or die "Timed out while waiting for dynamic bgworkers to be launched";

View File

@ -7,3 +7,8 @@ CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION worker_spi_init()
RETURNS VOID STRICT
AS 'MODULE_PATHNAME'
LANGUAGE C;

View File

@ -44,10 +44,28 @@
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_init);
PG_FUNCTION_INFO_V1(worker_spi_launch);
PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
/* Shared memory state */
typedef struct worker_spi_state
{
/* the wait event defined during initialization phase */
uint32 wait_event;
} worker_spi_state;
static worker_spi_state *wsstate = NULL; /* pointer to shared memory */
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_request_hook_type prev_shmem_startup_hook = NULL;
static void worker_spi_shmem_request(void);
static void worker_spi_shmem_startup(void);
static void worker_spi_shmem_init(void);
static Size worker_spi_memsize(void);
/* GUC variables */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
@ -60,6 +78,63 @@ typedef struct worktable
const char *name;
} worktable;
static void
worker_spi_shmem_request(void)
{
if (prev_shmem_request_hook)
prev_shmem_request_hook();
RequestAddinShmemSpace(worker_spi_memsize());
}
static void
worker_spi_shmem_startup(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
worker_spi_shmem_init();
}
static Size
worker_spi_memsize(void)
{
return MAXALIGN(sizeof(worker_spi_state));
}
/*
* Initialize the shared memory state of worker_spi.
*
* This routine allocates a new wait event when called the first time.
* On follow-up calls, the name of the wait event associated with the
* existing shared memory state is registered.
*/
static void
worker_spi_shmem_init(void)
{
bool found;
wsstate = NULL;
/* Create or attach to the shared memory state */
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
wsstate = ShmemInitStruct("worker_spi State",
sizeof(worker_spi_state),
&found);
/* Define a new wait event */
if (!found)
wsstate->wait_event = WaitEventExtensionNew();
LWLockRelease(AddinShmemInitLock);
/*
* Register the wait event in the lookup table of the current process.
*/
WaitEventExtensionRegisterName(wsstate->wait_event, "worker_spi_main");
return;
}
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
@ -149,6 +224,9 @@ worker_spi_main(Datum main_arg)
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Create (if necessary) and attach to our shared memory area. */
worker_spi_shmem_init();
/* Connect to our database */
BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
@ -199,7 +277,7 @@ worker_spi_main(Datum main_arg)
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
worker_spi_naptime * 1000L,
WAIT_EVENT_EXTENSION);
wsstate->wait_event);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@ -328,6 +406,11 @@ _PG_init(void)
MarkGUCPrefixReserved("worker_spi");
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = worker_spi_shmem_request;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = worker_spi_shmem_startup;
/* set up common data for all our workers */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
@ -351,6 +434,21 @@ _PG_init(void)
}
}
/*
* Wrapper to initialize a session with the shared memory state
* used by this module. This is a convenience routine to be able to
* see the custom wait event stored in shared memory without loading
* through shared_preload_libraries.
*/
Datum
worker_spi_init(PG_FUNCTION_ARGS)
{
/* Create (if necessary) and attach to our shared memory area. */
worker_spi_shmem_init();
PG_RETURN_VOID();
}
/*
* Dynamically launch an SPI worker.
*/
@ -363,6 +461,9 @@ worker_spi_launch(PG_FUNCTION_ARGS)
BgwHandleStatus status;
pid_t pid;
/* Create (if necessary) and attach to our shared memory area. */
worker_spi_shmem_init();
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;

View File

@ -2992,6 +2992,7 @@ WaitEventActivity
WaitEventBufferPin
WaitEventClient
WaitEventExtension
WaitEventExtensionCounterData
WaitEventIO
WaitEventIPC
WaitEventSet
@ -3862,6 +3863,7 @@ wchar2mb_with_len_converter
wchar_t
win32_deadchild_waitinfo
wint_t
worker_spi_state
worker_state
worktable
wrap