diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index d96627a774..90df5930e1 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -139,7 +139,7 @@ struct shm_mq_handle }; static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, - void *data, bool nowait, Size *bytes_written); + const void *data, bool nowait, Size *bytes_written); static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, Size *nbytesp, void **datap); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr, @@ -300,8 +300,34 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) return mqh; } +/* + * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had + * been passed to shm_mq_attach. + */ +void +shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) +{ + Assert(mqh->mqh_handle == NULL); + mqh->mqh_handle = handle; +} + /* * Write a message into a shared message queue. + */ +shm_mq_result +shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) +{ + shm_mq_iovec iov; + + iov.data = data; + iov.len = nbytes; + + return shm_mq_sendv(mqh, &iov, 1, nowait); +} + +/* + * Write a message into a shared message queue, gathered from multiple + * addresses. * * When nowait = false, we'll wait on our process latch when the ring buffer * fills up, and then continue writing once the receiver has drained some data. @@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) * the length or payload will corrupt the queue.) */ shm_mq_result -shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait) +shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) { shm_mq_result res; shm_mq *mq = mqh->mqh_queue; + Size nbytes = 0; Size bytes_written; + int i; + int which_iov = 0; + Size offset; Assert(mq->mq_sender == MyProc); + /* Compute total size of write. */ + for (i = 0; i < iovcnt; ++i) + nbytes += iov[i].len; + /* Try to write, or finish writing, the length word into the buffer. */ while (!mqh->mqh_length_word_complete) { @@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait) /* Write the actual data bytes into the buffer. */ Assert(mqh->mqh_partial_bytes <= nbytes); - res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes, - ((char *) data) + mqh->mqh_partial_bytes, - nowait, &bytes_written); - if (res == SHM_MQ_WOULD_BLOCK) - mqh->mqh_partial_bytes += bytes_written; - else + offset = mqh->mqh_partial_bytes; + do { - mqh->mqh_partial_bytes = 0; - mqh->mqh_length_word_complete = false; - } - if (res != SHM_MQ_SUCCESS) - return res; + Size chunksize; + + /* Figure out which bytes need to be sent next. */ + if (offset >= iov[which_iov].len) + { + offset -= iov[which_iov].len; + ++which_iov; + if (which_iov >= iovcnt) + break; + continue; + } + + /* + * We want to avoid copying the data if at all possible, but every + * chunk of bytes we write into the queue has to be MAXALIGN'd, + * except the last. Thus, if a chunk other than the last one ends + * on a non-MAXALIGN'd boundary, we have to combine the tail end of + * its data with data from one or more following chunks until we + * either reach the last chunk or accumulate a number of bytes which + * is MAXALIGN'd. + */ + if (which_iov + 1 < iovcnt && + offset + MAXIMUM_ALIGNOF > iov[which_iov].len) + { + char tmpbuf[MAXIMUM_ALIGNOF]; + int j = 0; + + for (;;) + { + if (offset < iov[which_iov].len) + { + tmpbuf[j] = iov[which_iov].data[offset]; + j++; + offset++; + if (j == MAXIMUM_ALIGNOF) + break; + } + else + { + offset -= iov[which_iov].len; + which_iov++; + if (which_iov >= iovcnt) + break; + } + } + res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); + mqh->mqh_partial_bytes += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + continue; + } + + /* + * If this is the last chunk, we can write all the data, even if it + * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to + * MAXALIGN_DOWN the write size. + */ + chunksize = iov[which_iov].len - offset; + if (which_iov + 1 < iovcnt) + chunksize = MAXALIGN_DOWN(chunksize); + res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], + nowait, &bytes_written); + mqh->mqh_partial_bytes += bytes_written; + offset += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + } while (mqh->mqh_partial_bytes < nbytes); + + /* Reset for next message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; /* Notify receiver of the newly-written data, and return. */ return shm_mq_notify_receiver(mq); @@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq) * Write bytes into a shared message queue. */ static shm_mq_result -shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait, - Size *bytes_written) +shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, + bool nowait, Size *bytes_written) { shm_mq *mq = mqh->mqh_queue; Size sent = 0; diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 5bae3807af..063400ae28 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq; struct shm_mq_handle; typedef struct shm_mq_handle shm_mq_handle; +/* Descriptors for a single write spanning multiple locations. */ +typedef struct +{ + const char *data; + Size len; +} shm_mq_iovec; + /* Possible results of a send or receive operation. */ typedef enum { @@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *); extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle); +/* Associate worker handle with shm_mq. */ +extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *); + /* Break connection. */ extern void shm_mq_detach(shm_mq *); /* Send or receive messages. */ extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, - Size nbytes, void *data, bool nowait); + Size nbytes, const void *data, bool nowait); +extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, + shm_mq_iovec *iov, int iovcnt, bool nowait); extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait);