mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-30 19:00:29 +08:00
Use atomic ops to hand out pages to scan in parallel scan.
With a lot of CPUs, the spinlock that protects the current scan location in a parallel scan can become a bottleneck. Use an atomic fetch-and-add instruction instead. David Rowley Discussion: https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com
This commit is contained in:
parent
0c504a80cf
commit
3cda10f41b
@ -58,6 +58,7 @@
|
|||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
|
#include "port/atomics.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
#include "storage/freespace.h"
|
#include "storage/freespace.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
|
|||||||
bool is_bitmapscan,
|
bool is_bitmapscan,
|
||||||
bool is_samplescan,
|
bool is_samplescan,
|
||||||
bool temp_snap);
|
bool temp_snap);
|
||||||
|
static void heap_parallelscan_startblock_init(HeapScanDesc scan);
|
||||||
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
|
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
|
||||||
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
|
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
|
||||||
TransactionId xid, CommandId cid, int options);
|
TransactionId xid, CommandId cid, int options);
|
||||||
@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
|
|||||||
}
|
}
|
||||||
if (scan->rs_parallel != NULL)
|
if (scan->rs_parallel != NULL)
|
||||||
{
|
{
|
||||||
|
heap_parallelscan_startblock_init(scan);
|
||||||
|
|
||||||
page = heap_parallelscan_nextpage(scan);
|
page = heap_parallelscan_nextpage(scan);
|
||||||
|
|
||||||
/* Other processes might have already finished the scan. */
|
/* Other processes might have already finished the scan. */
|
||||||
@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
|
|||||||
}
|
}
|
||||||
if (scan->rs_parallel != NULL)
|
if (scan->rs_parallel != NULL)
|
||||||
{
|
{
|
||||||
|
heap_parallelscan_startblock_init(scan);
|
||||||
|
|
||||||
page = heap_parallelscan_nextpage(scan);
|
page = heap_parallelscan_nextpage(scan);
|
||||||
|
|
||||||
/* Other processes might have already finished the scan. */
|
/* Other processes might have already finished the scan. */
|
||||||
@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Caller is responsible for making sure that all workers have
|
* Caller is responsible for making sure that all workers have
|
||||||
* finished the scan before calling this, so it really shouldn't be
|
* finished the scan before calling this.
|
||||||
* necessary to acquire the mutex at all. We acquire it anyway, just
|
|
||||||
* to be tidy.
|
|
||||||
*/
|
*/
|
||||||
parallel_scan = scan->rs_parallel;
|
parallel_scan = scan->rs_parallel;
|
||||||
SpinLockAcquire(¶llel_scan->phs_mutex);
|
pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0);
|
||||||
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
|
|
||||||
SpinLockRelease(¶llel_scan->phs_mutex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
|
|||||||
!RelationUsesLocalBuffers(relation) &&
|
!RelationUsesLocalBuffers(relation) &&
|
||||||
target->phs_nblocks > NBuffers / 4;
|
target->phs_nblocks > NBuffers / 4;
|
||||||
SpinLockInit(&target->phs_mutex);
|
SpinLockInit(&target->phs_mutex);
|
||||||
target->phs_cblock = InvalidBlockNumber;
|
|
||||||
target->phs_startblock = InvalidBlockNumber;
|
target->phs_startblock = InvalidBlockNumber;
|
||||||
|
pg_atomic_write_u64(&target->phs_nallocated, 0);
|
||||||
SerializeSnapshot(snapshot, target->phs_snapshot_data);
|
SerializeSnapshot(snapshot, target->phs_snapshot_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
* heap_parallelscan_nextpage - get the next page to scan
|
* heap_parallelscan_startblock_init - find and set the scan's startblock
|
||||||
*
|
*
|
||||||
* Get the next page to scan. Even if there are no pages left to scan,
|
* Determine where the parallel seq scan should start. This function may
|
||||||
* another backend could have grabbed a page to scan and not yet finished
|
* be called many times, once by each parallel worker. We must be careful
|
||||||
* looking at it, so it doesn't follow that the scan is done when the
|
* only to set the startblock once.
|
||||||
* first backend gets an InvalidBlockNumber return.
|
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
static BlockNumber
|
static void
|
||||||
heap_parallelscan_nextpage(HeapScanDesc scan)
|
heap_parallelscan_startblock_init(HeapScanDesc scan)
|
||||||
{
|
{
|
||||||
BlockNumber page = InvalidBlockNumber;
|
|
||||||
BlockNumber sync_startpage = InvalidBlockNumber;
|
BlockNumber sync_startpage = InvalidBlockNumber;
|
||||||
BlockNumber report_page = InvalidBlockNumber;
|
|
||||||
ParallelHeapScanDesc parallel_scan;
|
ParallelHeapScanDesc parallel_scan;
|
||||||
|
|
||||||
Assert(scan->rs_parallel);
|
Assert(scan->rs_parallel);
|
||||||
@ -1705,46 +1704,63 @@ retry:
|
|||||||
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
|
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
|
|
||||||
}
|
}
|
||||||
|
SpinLockRelease(¶llel_scan->phs_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ----------------
|
||||||
|
* heap_parallelscan_nextpage - get the next page to scan
|
||||||
|
*
|
||||||
|
* Get the next page to scan. Even if there are no pages left to scan,
|
||||||
|
* another backend could have grabbed a page to scan and not yet finished
|
||||||
|
* looking at it, so it doesn't follow that the scan is done when the
|
||||||
|
* first backend gets an InvalidBlockNumber return.
|
||||||
|
* ----------------
|
||||||
|
*/
|
||||||
|
static BlockNumber
|
||||||
|
heap_parallelscan_nextpage(HeapScanDesc scan)
|
||||||
|
{
|
||||||
|
BlockNumber page;
|
||||||
|
ParallelHeapScanDesc parallel_scan;
|
||||||
|
uint64 nallocated;
|
||||||
|
|
||||||
|
Assert(scan->rs_parallel);
|
||||||
|
parallel_scan = scan->rs_parallel;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The current block number is the next one that needs to be scanned,
|
* phs_nallocated tracks how many pages have been allocated to workers
|
||||||
* unless it's InvalidBlockNumber already, in which case there are no more
|
* already. When phs_nallocated >= rs_nblocks, all blocks have been
|
||||||
* blocks to scan. After remembering the current value, we must advance
|
* allocated.
|
||||||
* it so that the next call to this function returns the next block to be
|
*
|
||||||
* scanned.
|
* Because we use an atomic fetch-and-add to fetch the current value, the
|
||||||
|
* phs_nallocated counter will exceed rs_nblocks, because workers will
|
||||||
|
* still increment the value, when they try to allocate the next block but
|
||||||
|
* all blocks have been allocated already. The counter must be 64 bits
|
||||||
|
* wide because of that, to avoid wrapping around when rs_nblocks is close
|
||||||
|
* to 2^32.
|
||||||
|
*
|
||||||
|
* The actual page to return is calculated by adding the counter to the
|
||||||
|
* starting block number, modulo nblocks.
|
||||||
*/
|
*/
|
||||||
page = parallel_scan->phs_cblock;
|
nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1);
|
||||||
if (page != InvalidBlockNumber)
|
if (nallocated >= scan->rs_nblocks)
|
||||||
{
|
page = InvalidBlockNumber; /* all blocks have been allocated */
|
||||||
parallel_scan->phs_cblock++;
|
else
|
||||||
if (parallel_scan->phs_cblock >= scan->rs_nblocks)
|
page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
|
||||||
parallel_scan->phs_cblock = 0;
|
|
||||||
if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
|
|
||||||
{
|
|
||||||
parallel_scan->phs_cblock = InvalidBlockNumber;
|
|
||||||
report_page = parallel_scan->phs_startblock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Release the lock. */
|
|
||||||
SpinLockRelease(¶llel_scan->phs_mutex);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Report scan location. Normally, we report the current page number.
|
* Report scan location. Normally, we report the current page number.
|
||||||
* When we reach the end of the scan, though, we report the starting page,
|
* When we reach the end of the scan, though, we report the starting page,
|
||||||
* not the ending page, just so the starting positions for later scans
|
* not the ending page, just so the starting positions for later scans
|
||||||
* doesn't slew backwards. We only report the position at the end of the
|
* doesn't slew backwards. We only report the position at the end of the
|
||||||
* scan once, though: subsequent callers will have report nothing, since
|
* scan once, though: subsequent callers will report nothing.
|
||||||
* they will have page == InvalidBlockNumber.
|
|
||||||
*/
|
*/
|
||||||
if (scan->rs_syncscan)
|
if (scan->rs_syncscan)
|
||||||
{
|
{
|
||||||
if (report_page == InvalidBlockNumber)
|
if (page != InvalidBlockNumber)
|
||||||
report_page = page;
|
ss_report_location(scan->rs_rd, page);
|
||||||
if (report_page != InvalidBlockNumber)
|
else if (nallocated == scan->rs_nblocks)
|
||||||
ss_report_location(scan->rs_rd, report_page);
|
ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return page;
|
return page;
|
||||||
|
@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData
|
|||||||
Oid phs_relid; /* OID of relation to scan */
|
Oid phs_relid; /* OID of relation to scan */
|
||||||
bool phs_syncscan; /* report location to syncscan logic? */
|
bool phs_syncscan; /* report location to syncscan logic? */
|
||||||
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
|
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
|
||||||
slock_t phs_mutex; /* mutual exclusion for block number fields */
|
slock_t phs_mutex; /* mutual exclusion for setting startblock */
|
||||||
BlockNumber phs_startblock; /* starting block number */
|
BlockNumber phs_startblock; /* starting block number */
|
||||||
BlockNumber phs_cblock; /* current block number */
|
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
|
||||||
|
* workers so far. */
|
||||||
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
|
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} ParallelHeapScanDescData;
|
} ParallelHeapScanDescData;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user