postgresql/contrib/test_shm_mq/test.c
2014-03-18 14:26:44 -04:00

266 lines
7.5 KiB
C

/*--------------------------------------------------------------------------
*
* test.c
* Test harness code for shared memory message queues.
*
* Copyright (C) 2013, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/test_shm_mq/test.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "test_shm_mq.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(test_shm_mq);
PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
void _PG_init(void);
Datum test_shm_mq(PG_FUNCTION_ARGS);
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS);
static void verify_message(Size origlen, char *origdata, Size newlen,
char *newdata);
/*
* Simple test of the shared memory message queue infrastructure.
*
* We set up a ring of message queues passing through 1 or more background
* processes and eventually looping back to ourselves. We then send a message
* through the ring a number of times indicated by the loop count. At the end,
* we check whether the final message matches the one we started with.
*/
Datum
test_shm_mq(PG_FUNCTION_ARGS)
{
int64 queue_size = PG_GETARG_INT64(0);
text *message = PG_GETARG_TEXT_PP(1);
char *message_contents = VARDATA_ANY(message);
int message_size = VARSIZE_ANY_EXHDR(message);
int32 loop_count = PG_GETARG_INT32(2);
int32 nworkers = PG_GETARG_INT32(3);
dsm_segment *seg;
shm_mq_handle *outqh;
shm_mq_handle *inqh;
shm_mq_result res;
Size len;
void *data;
/* A negative loopcount is nonsensical. */
if (loop_count < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("repeat count size must be a non-negative integer")));
/*
* Since this test sends data using the blocking interfaces, it cannot
* send data to itself. Therefore, a minimum of 1 worker is required.
* Of course, a negative worker count is nonsensical.
*/
if (nworkers < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("number of workers must be a positive integer")));
/* Set up dynamic shared memory segment and background workers. */
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
/* Send the initial message. */
res = shm_mq_send(outqh, message_size, message_contents, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
/*
* Receive a message and send it back out again. Do this a number of
* times equal to the loop count.
*/
for (;;)
{
/* Receive a message. */
res = shm_mq_receive(inqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not receive message")));
/* If this is supposed to be the last iteration, stop here. */
if (--loop_count <= 0)
break;
/* Send it back out. */
res = shm_mq_send(outqh, len, data, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
}
/*
* Finally, check that we got back the same message from the last
* iteration that we originally sent.
*/
verify_message(message_size, message_contents, len, data);
/* Clean up. */
dsm_detach(seg);
PG_RETURN_VOID();
}
/*
* Pipelined test of the shared memory message queue infrastructure.
*
* As in the basic test, we set up a ring of message queues passing through
* 1 or more background processes and eventually looping back to ourselves.
* Then, we send N copies of the user-specified message through the ring and
* receive them all back. Since this might fill up all message queues in the
* ring and then stall, we must be prepared to begin receiving the messages
* back before we've finished sending them.
*/
Datum
test_shm_mq_pipelined(PG_FUNCTION_ARGS)
{
int64 queue_size = PG_GETARG_INT64(0);
text *message = PG_GETARG_TEXT_PP(1);
char *message_contents = VARDATA_ANY(message);
int message_size = VARSIZE_ANY_EXHDR(message);
int32 loop_count = PG_GETARG_INT32(2);
int32 nworkers = PG_GETARG_INT32(3);
bool verify = PG_GETARG_BOOL(4);
int32 send_count = 0;
int32 receive_count = 0;
dsm_segment *seg;
shm_mq_handle *outqh;
shm_mq_handle *inqh;
shm_mq_result res;
Size len;
void *data;
/* A negative loopcount is nonsensical. */
if (loop_count < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("repeat count size must be a non-negative integer")));
/*
* Using the nonblocking interfaces, we can even send data to ourselves,
* so the minimum number of workers for this test is zero.
*/
if (nworkers < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("number of workers must be a non-negative integer")));
/* Set up dynamic shared memory segment and background workers. */
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
/* Main loop. */
for (;;)
{
bool wait = true;
/*
* If we haven't yet sent the message the requisite number of times,
* try again to send it now. Note that when shm_mq_send() returns
* SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
* same message size and contents; that's not an issue here because
* we're sending the same message every time.
*/
if (send_count < loop_count)
{
res = shm_mq_send(outqh, message_size, message_contents, true);
if (res == SHM_MQ_SUCCESS)
{
++send_count;
wait = false;
}
else if (res == SHM_MQ_DETACHED)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
}
/*
* If we haven't yet received the message the requisite number of
* times, try to receive it again now.
*/
if (receive_count < loop_count)
{
res = shm_mq_receive(inqh, &len, &data, true);
if (res == SHM_MQ_SUCCESS)
{
++receive_count;
/* Verifying every time is slow, so it's optional. */
if (verify)
verify_message(message_size, message_contents, len, data);
wait = false;
}
else if (res == SHM_MQ_DETACHED)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not receive message")));
}
else
{
/*
* Otherwise, we've received the message enough times. This
* shouldn't happen unless we've also sent it enough times.
*/
if (send_count != receive_count)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("message sent %d times, but received %d times",
send_count, receive_count)));
break;
}
if (wait)
{
/*
* If we made no progress, wait for one of the other processes
* to which we are connected to set our latch, indicating that
* they have read or written data and therefore there may now be
* work for us to do.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(&MyProc->procLatch);
}
}
/* Clean up. */
dsm_detach(seg);
PG_RETURN_VOID();
}
/*
* Verify that two messages are the same.
*/
static void
verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
{
Size i;
if (origlen != newlen)
ereport(ERROR,
(errmsg("message corrupted"),
errdetail("The original message was %zu bytes but the final message is %zu bytes.",
origlen, newlen)));
for (i = 0; i < origlen; ++i)
if (origdata[i] != newdata[i])
ereport(ERROR,
(errmsg("message corrupted"),
errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
}