mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-06 15:24:56 +08:00
6a605cd6bd
Report by Robert Haas
265 lines
7.4 KiB
C
265 lines
7.4 KiB
C
/*--------------------------------------------------------------------------
|
|
*
|
|
* test.c
|
|
* Test harness code for shared memory message queues.
|
|
*
|
|
* Copyright (C) 2013, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* contrib/test_shm_mq/test.c
|
|
*
|
|
* -------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "fmgr.h"
|
|
#include "miscadmin.h"
|
|
|
|
#include "test_shm_mq.h"
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
PG_FUNCTION_INFO_V1(test_shm_mq);
|
|
PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
|
|
|
|
void _PG_init(void);
|
|
|
|
static void verify_message(Size origlen, char *origdata, Size newlen,
|
|
char *newdata);
|
|
|
|
/*
|
|
* Simple test of the shared memory message queue infrastructure.
|
|
*
|
|
* We set up a ring of message queues passing through 1 or more background
|
|
* processes and eventually looping back to ourselves. We then send a message
|
|
* through the ring a number of times indicated by the loop count. At the end,
|
|
* we check whether the final message matches the one we started with.
|
|
*/
|
|
Datum
|
|
test_shm_mq(PG_FUNCTION_ARGS)
|
|
{
|
|
int64 queue_size = PG_GETARG_INT64(0);
|
|
text *message = PG_GETARG_TEXT_PP(1);
|
|
char *message_contents = VARDATA_ANY(message);
|
|
int message_size = VARSIZE_ANY_EXHDR(message);
|
|
int32 loop_count = PG_GETARG_INT32(2);
|
|
int32 nworkers = PG_GETARG_INT32(3);
|
|
dsm_segment *seg;
|
|
shm_mq_handle *outqh;
|
|
shm_mq_handle *inqh;
|
|
shm_mq_result res;
|
|
Size len;
|
|
void *data;
|
|
|
|
/* A negative loopcount is nonsensical. */
|
|
if (loop_count < 0)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("repeat count size must be a non-negative integer")));
|
|
|
|
/*
|
|
* Since this test sends data using the blocking interfaces, it cannot
|
|
* send data to itself. Therefore, a minimum of 1 worker is required. Of
|
|
* course, a negative worker count is nonsensical.
|
|
*/
|
|
if (nworkers < 1)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("number of workers must be a positive integer")));
|
|
|
|
/* Set up dynamic shared memory segment and background workers. */
|
|
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
|
|
|
|
/* Send the initial message. */
|
|
res = shm_mq_send(outqh, message_size, message_contents, false);
|
|
if (res != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send message")));
|
|
|
|
/*
|
|
* Receive a message and send it back out again. Do this a number of
|
|
* times equal to the loop count.
|
|
*/
|
|
for (;;)
|
|
{
|
|
/* Receive a message. */
|
|
res = shm_mq_receive(inqh, &len, &data, false);
|
|
if (res != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not receive message")));
|
|
|
|
/* If this is supposed to be the last iteration, stop here. */
|
|
if (--loop_count <= 0)
|
|
break;
|
|
|
|
/* Send it back out. */
|
|
res = shm_mq_send(outqh, len, data, false);
|
|
if (res != SHM_MQ_SUCCESS)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send message")));
|
|
}
|
|
|
|
/*
|
|
* Finally, check that we got back the same message from the last
|
|
* iteration that we originally sent.
|
|
*/
|
|
verify_message(message_size, message_contents, len, data);
|
|
|
|
/* Clean up. */
|
|
dsm_detach(seg);
|
|
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/*
|
|
* Pipelined test of the shared memory message queue infrastructure.
|
|
*
|
|
* As in the basic test, we set up a ring of message queues passing through
|
|
* 1 or more background processes and eventually looping back to ourselves.
|
|
* Then, we send N copies of the user-specified message through the ring and
|
|
* receive them all back. Since this might fill up all message queues in the
|
|
* ring and then stall, we must be prepared to begin receiving the messages
|
|
* back before we've finished sending them.
|
|
*/
|
|
Datum
|
|
test_shm_mq_pipelined(PG_FUNCTION_ARGS)
|
|
{
|
|
int64 queue_size = PG_GETARG_INT64(0);
|
|
text *message = PG_GETARG_TEXT_PP(1);
|
|
char *message_contents = VARDATA_ANY(message);
|
|
int message_size = VARSIZE_ANY_EXHDR(message);
|
|
int32 loop_count = PG_GETARG_INT32(2);
|
|
int32 nworkers = PG_GETARG_INT32(3);
|
|
bool verify = PG_GETARG_BOOL(4);
|
|
int32 send_count = 0;
|
|
int32 receive_count = 0;
|
|
dsm_segment *seg;
|
|
shm_mq_handle *outqh;
|
|
shm_mq_handle *inqh;
|
|
shm_mq_result res;
|
|
Size len;
|
|
void *data;
|
|
|
|
/* A negative loopcount is nonsensical. */
|
|
if (loop_count < 0)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("repeat count size must be a non-negative integer")));
|
|
|
|
/*
|
|
* Using the nonblocking interfaces, we can even send data to ourselves,
|
|
* so the minimum number of workers for this test is zero.
|
|
*/
|
|
if (nworkers < 0)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("number of workers must be a non-negative integer")));
|
|
|
|
/* Set up dynamic shared memory segment and background workers. */
|
|
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
|
|
|
|
/* Main loop. */
|
|
for (;;)
|
|
{
|
|
bool wait = true;
|
|
|
|
/*
|
|
* If we haven't yet sent the message the requisite number of times,
|
|
* try again to send it now. Note that when shm_mq_send() returns
|
|
* SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
|
|
* same message size and contents; that's not an issue here because
|
|
* we're sending the same message every time.
|
|
*/
|
|
if (send_count < loop_count)
|
|
{
|
|
res = shm_mq_send(outqh, message_size, message_contents, true);
|
|
if (res == SHM_MQ_SUCCESS)
|
|
{
|
|
++send_count;
|
|
wait = false;
|
|
}
|
|
else if (res == SHM_MQ_DETACHED)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send message")));
|
|
}
|
|
|
|
/*
|
|
* If we haven't yet received the message the requisite number of
|
|
* times, try to receive it again now.
|
|
*/
|
|
if (receive_count < loop_count)
|
|
{
|
|
res = shm_mq_receive(inqh, &len, &data, true);
|
|
if (res == SHM_MQ_SUCCESS)
|
|
{
|
|
++receive_count;
|
|
/* Verifying every time is slow, so it's optional. */
|
|
if (verify)
|
|
verify_message(message_size, message_contents, len, data);
|
|
wait = false;
|
|
}
|
|
else if (res == SHM_MQ_DETACHED)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not receive message")));
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Otherwise, we've received the message enough times. This
|
|
* shouldn't happen unless we've also sent it enough times.
|
|
*/
|
|
if (send_count != receive_count)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INTERNAL_ERROR),
|
|
errmsg("message sent %d times, but received %d times",
|
|
send_count, receive_count)));
|
|
break;
|
|
}
|
|
|
|
if (wait)
|
|
{
|
|
/*
|
|
* If we made no progress, wait for one of the other processes to
|
|
* which we are connected to set our latch, indicating that they
|
|
* have read or written data and therefore there may now be work
|
|
* for us to do.
|
|
*/
|
|
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
|
|
CHECK_FOR_INTERRUPTS();
|
|
ResetLatch(&MyProc->procLatch);
|
|
}
|
|
}
|
|
|
|
/* Clean up. */
|
|
dsm_detach(seg);
|
|
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/*
|
|
* Verify that two messages are the same.
|
|
*/
|
|
static void
|
|
verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
|
|
{
|
|
Size i;
|
|
|
|
if (origlen != newlen)
|
|
ereport(ERROR,
|
|
(errmsg("message corrupted"),
|
|
errdetail("The original message was %zu bytes but the final message is %zu bytes.",
|
|
origlen, newlen)));
|
|
|
|
for (i = 0; i < origlen; ++i)
|
|
if (origdata[i] != newdata[i])
|
|
ereport(ERROR,
|
|
(errmsg("message corrupted"),
|
|
errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
|
|
}
|