Modify pg_stat_get_activity to build a tuplestore

This updates pg_stat_get_activity() to build a tuplestore for its
results instead of using the old-style multiple-call method.  This
simplifies the function, though that wasn't the primary motivation for
the change, which is that we may turn it into a helper function which
can filter the results (or not) much more easily.
This commit is contained in:
Stephen Frost 2015-05-08 19:25:30 -04:00
parent 4b342fb591
commit f91feba877

View File

@ -524,137 +524,75 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS)
} }
} }
/*
* Returns activity of PG backends.
*/
Datum Datum
pg_stat_get_activity(PG_FUNCTION_ARGS) pg_stat_get_activity(PG_FUNCTION_ARGS)
{ {
FuncCallContext *funcctx; #define PG_STAT_GET_ACTIVITY_COLS 22
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
if (SRF_IS_FIRSTCALL()) /* check to see if caller supports us returning a tuplestore */
{ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
MemoryContext oldcontext; ereport(ERROR,
TupleDesc tupdesc; (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
funcctx = SRF_FIRSTCALL_INIT(); /* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupdesc = CreateTemplateTupleDesc(22, false); tupstore = tuplestore_begin_heap(true, false, work_mem);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "datid", rsinfo->returnMode = SFRM_Materialize;
OIDOID, -1, 0); rsinfo->setResult = tupstore;
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "pid", rsinfo->setDesc = tupdesc;
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "usesysid",
OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "application_name",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "state",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "query",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "waiting",
BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "act_start",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "query_start",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "backend_start",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "state_change",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "client_addr",
INETOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 13, "client_hostname",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 14, "client_port",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 15, "backend_xid",
XIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 16, "backend_xmin",
XIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 17, "ssl",
BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 18, "sslversion",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 19, "sslcipher",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 20, "sslbits",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 21, "sslcompression",
BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 22, "sslclientdn",
TEXTOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc); MemoryContextSwitchTo(oldcontext);
funcctx->user_fctx = palloc0(sizeof(int)); /* 1-based index */
if (PG_ARGISNULL(0)) for (curr_backend = 1; curr_backend <= num_backends; curr_backend++)
{
/* Get all backends */
funcctx->max_calls = pgstat_fetch_stat_numbackends();
}
else
{
/*
* Get one backend - locate by pid.
*
* We lookup the backend early, so we can return zero rows if it
* doesn't exist, instead of returning a single row full of NULLs.
*/
int pid = PG_GETARG_INT32(0);
int i;
int n = pgstat_fetch_stat_numbackends();
for (i = 1; i <= n; i++)
{
PgBackendStatus *be = pgstat_fetch_stat_beentry(i);
if (be)
{
if (be->st_procpid == pid)
{
*(int *) (funcctx->user_fctx) = i;
break;
}
}
}
if (*(int *) (funcctx->user_fctx) == 0)
/* Pid not found, return zero rows */
funcctx->max_calls = 0;
else
funcctx->max_calls = 1;
}
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls)
{ {
/* for each row */ /* for each row */
Datum values[22]; Datum values[PG_STAT_GET_ACTIVITY_COLS];
bool nulls[22]; bool nulls[PG_STAT_GET_ACTIVITY_COLS];
HeapTuple tuple;
LocalPgBackendStatus *local_beentry; LocalPgBackendStatus *local_beentry;
PgBackendStatus *beentry; PgBackendStatus *beentry;
MemSet(values, 0, sizeof(values)); MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls)); MemSet(nulls, 0, sizeof(nulls));
if (*(int *) (funcctx->user_fctx) > 0) if (pid != -1)
{ {
/* Get specific pid slot */ /* Skip any which are not the one we're looking for. */
local_beentry = pgstat_fetch_stat_local_beentry(*(int *) (funcctx->user_fctx)); PgBackendStatus *be = pgstat_fetch_stat_beentry(curr_backend);
beentry = &local_beentry->backendStatus;
} if (!be || be->st_procpid != pid)
else continue;
{
/* Get the next one in the list */
local_beentry = pgstat_fetch_stat_local_beentry(funcctx->call_cntr + 1); /* 1-based index */
beentry = &local_beentry->backendStatus;
} }
/* Get the next one in the list */
local_beentry = pgstat_fetch_stat_local_beentry(curr_backend);
if (!local_beentry)
continue;
beentry = &local_beentry->backendStatus;
if (!beentry) if (!beentry)
{ {
int i; int i;
@ -665,8 +603,8 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[5] = false; nulls[5] = false;
values[5] = CStringGetTextDatum("<backend information not available>"); values[5] = CStringGetTextDatum("<backend information not available>");
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); continue;
} }
/* Values available to all callers */ /* Values available to all callers */
@ -839,15 +777,17 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[13] = true; nulls[13] = true;
} }
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); /* If only a single backend was requested, and we found it, break. */
} if (pid != -1)
else break;
{
/* nothing left */
SRF_RETURN_DONE(funcctx);
} }
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum) 0;
} }