Cache-thrashing protection: if the DB has more entries than the cache,

don't allow large search operations that touch more entries churn the
existing cached entries.
This commit is contained in:
Howard Chu 2007-11-12 11:16:30 +00:00
parent eea5e60bf0
commit b4bcc325ee
5 changed files with 83 additions and 37 deletions

View File

@ -117,6 +117,7 @@ typedef struct bdb_entry_info {
#define CACHE_ENTRY_WALKING 0x20
#define CACHE_ENTRY_ONELEVEL 0x40
#define CACHE_ENTRY_REFERENCED 0x80
#define CACHE_ENTRY_NOT_CACHED 0x100
int bei_finders;
/*

View File

@ -119,13 +119,13 @@ static void
bdb_cache_lru_link( struct bdb_info *bdb, EntryInfo *ei )
{
/* Already linked, ignore */
if ( ei->bei_lruprev )
return;
/* Insert into circular LRU list */
ldap_pvt_thread_mutex_lock( &bdb->bi_cache.c_lru_mutex );
/* Still linked, remove */
if ( ei->bei_lruprev ) {
LRU_DEL( &bdb->bi_cache, ei );
}
ei->bei_lruprev = bdb->bi_cache.c_lrutail;
if ( bdb->bi_cache.c_lrutail ) {
ei->bei_lrunext = bdb->bi_cache.c_lrutail->bei_lrunext;
@ -250,6 +250,28 @@ bdb_cache_entry_db_unlock ( struct bdb_info *bdb, DB_LOCK *lock )
#endif
}
void
bdb_cache_return_entry_rw( struct bdb_info *bdb, Entry *e,
int rw, DB_LOCK *lock )
{
EntryInfo *ei;
int free = 0;
ei = e->e_private;
bdb_cache_entryinfo_lock( ei );
if ( ei->bei_state & CACHE_ENTRY_NOT_CACHED ) {
ei->bei_e = NULL;
ei->bei_state ^= CACHE_ENTRY_NOT_CACHED;
free = 1;
}
bdb_cache_entryinfo_unlock( ei );
bdb_cache_entry_db_unlock( bdb, lock );
if ( free ) {
e->e_private = NULL;
bdb_entry_return( e );
}
}
static int
bdb_cache_entryinfo_destroy( EntryInfo *e )
{
@ -541,6 +563,7 @@ hdb_cache_find_parent(
/* Got the parent, link in and we're done. */
if ( ei2 ) {
bdb_cache_entryinfo_lock( eir );
bdb_cache_entryinfo_lock( ei2 );
ein->bei_parent = ei2;
@ -553,7 +576,6 @@ hdb_cache_find_parent(
ein->bei_state &= ~CACHE_ENTRY_NOT_LINKED;
bdb_cache_entryinfo_unlock( ei2 );
bdb_cache_entryinfo_lock( eir );
*res = eir;
break;
@ -737,7 +759,7 @@ bdb_cache_find_info(
/*
* cache_find_id - find an entry in the cache, given id.
* The entry is locked for Read upon return. Call with islocked TRUE if
* The entry is locked for Read upon return. Call with flag ID_LOCKED if
* the supplied *eip was already locked.
*/
@ -747,7 +769,7 @@ bdb_cache_find_id(
DB_TXN *tid,
ID id,
EntryInfo **eip,
int islocked,
int flag,
BDB_LOCKER locker,
DB_LOCK *lock )
{
@ -789,7 +811,7 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
ldap_pvt_thread_yield();
goto again;
}
islocked = 1;
flag |= ID_LOCKED;
}
ldap_pvt_thread_rdwr_runlock( &bdb->bi_cache.c_rwlock );
}
@ -801,7 +823,7 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
if ( rc == 0 ) {
rc = bdb_cache_find_ndn( op, tid,
&ep->e_nname, eip );
if ( *eip ) islocked = 1;
if ( *eip ) flag |= ID_LOCKED;
if ( rc ) {
ep->e_private = NULL;
#ifdef SLAP_ZONE_ALLOC
@ -814,7 +836,7 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
}
#else
rc = hdb_cache_find_parent(op, tid, locker, id, eip );
if ( rc == 0 ) islocked = 1;
if ( rc == 0 ) flag |= ID_LOCKED;
#endif
}
@ -839,9 +861,18 @@ load1:
(*eip)->bei_state |= CACHE_ENTRY_LOADING;
}
if ( islocked ) {
/* If the entry was loaded before but uncached, and we need
* it again, clear the uncached state
*/
if ( (*eip)->bei_state & CACHE_ENTRY_NOT_CACHED ) {
(*eip)->bei_state ^= CACHE_ENTRY_NOT_CACHED;
if ( flag & ID_NOCACHE )
flag ^= ID_NOCACHE;
}
if ( flag & ID_LOCKED ) {
bdb_cache_entryinfo_unlock( *eip );
islocked = 0;
flag ^= ID_LOCKED;
}
rc = bdb_cache_entry_db_lock( bdb, locker, *eip, load, 0, lock );
if ( (*eip)->bei_state & CACHE_ENTRY_DELETED ) {
@ -863,6 +894,11 @@ load1:
#endif
ep = NULL;
bdb_cache_lru_link( bdb, *eip );
if ( flag & ID_NOCACHE ) {
bdb_cache_entryinfo_lock( *eip );
(*eip)->bei_state |= CACHE_ENTRY_NOT_CACHED;
bdb_cache_entryinfo_unlock( *eip );
}
}
if ( rc == 0 ) {
/* If we succeeded, downgrade back to a readlock. */
@ -878,7 +914,7 @@ load1:
*/
bdb_cache_entry_db_unlock( bdb, lock );
bdb_cache_entryinfo_lock( *eip );
islocked = 1;
flag |= ID_LOCKED;
goto load1;
#ifdef BDB_HIER
} else {
@ -904,7 +940,7 @@ load1:
}
}
}
if ( islocked ) {
if ( flag & ID_LOCKED ) {
bdb_cache_entryinfo_unlock( *eip );
}
if ( ep ) {
@ -919,14 +955,16 @@ load1:
int purge = 0;
if ( load ) {
ldap_pvt_thread_mutex_lock( &bdb->bi_cache.c_count_mutex );
bdb->bi_cache.c_cursize++;
if ( bdb->bi_cache.c_cursize > bdb->bi_cache.c_maxsize &&
!bdb->bi_cache.c_purging ) {
purge = 1;
bdb->bi_cache.c_purging = 1;
if ( !( flag & ID_NOCACHE )) {
ldap_pvt_thread_mutex_lock( &bdb->bi_cache.c_count_mutex );
bdb->bi_cache.c_cursize++;
if ( bdb->bi_cache.c_cursize > bdb->bi_cache.c_maxsize &&
!bdb->bi_cache.c_purging ) {
purge = 1;
bdb->bi_cache.c_purging = 1;
}
ldap_pvt_thread_mutex_unlock( &bdb->bi_cache.c_count_mutex );
}
ldap_pvt_thread_mutex_unlock( &bdb->bi_cache.c_count_mutex );
}
if ( purge )
bdb_cache_lru_purge( bdb );

View File

@ -54,7 +54,7 @@ bdb_dn2entry(
*e = ei;
if ( ei && ei->bei_id ) {
rc2 = bdb_cache_find_id( op, tid, ei->bei_id,
&ei, 1, locker, lock );
&ei, ID_LOCKED, locker, lock );
if ( rc2 ) rc = rc2;
} else if ( ei ) {
bdb_cache_entryinfo_unlock( ei );
@ -65,7 +65,7 @@ bdb_dn2entry(
bdb_cache_entryinfo_unlock( ei );
}
} else {
rc = bdb_cache_find_id( op, tid, ei->bei_id, &ei, 1,
rc = bdb_cache_find_id( op, tid, ei->bei_id, &ei, ID_LOCKED,
locker, lock );
if ( rc == 0 ) {
*e = ei;
@ -73,7 +73,7 @@ bdb_dn2entry(
/* always return EntryInfo */
if ( ei->bei_parent ) {
ei = ei->bei_parent;
rc2 = bdb_cache_find_id( op, tid, ei->bei_id, &ei, 1,
rc2 = bdb_cache_find_id( op, tid, ei->bei_id, &ei, ID_LOCKED,
locker, lock );
if ( rc2 ) rc = rc2;
}

View File

@ -456,6 +456,7 @@ bdb_monitor_idx_add(
* cache.c
*/
#define bdb_cache_entry_db_unlock BDB_SYMBOL(cache_entry_db_unlock)
#define bdb_cache_return_entry_rw BDB_SYMBOL(cache_return_entry_rw)
#define bdb_cache_entryinfo_lock(e) \
ldap_pvt_thread_mutex_lock( &(e)->bei_kids_mutex )
@ -467,15 +468,8 @@ bdb_monitor_idx_add(
/* What a mess. Hopefully the current cache scheme will stabilize
* and we can trim out all of this stuff.
*/
#if 0
void bdb_cache_return_entry_rw( struct bdb_info *bdb, Entry *e,
int rw, DB_LOCK *lock );
#else
#define bdb_cache_return_entry_rw( bdb, e, rw, lock ) \
bdb_cache_entry_db_unlock( bdb, lock )
#define bdb_cache_return_entry( bdb, lock ) \
bdb_cache_entry_db_unlock( bdb, lock )
#endif
#define bdb_cache_return_entry_r(bdb, e, l) \
bdb_cache_return_entry_rw((bdb), (e), 0, (l))
#define bdb_cache_return_entry_w(bdb, e, l) \
@ -542,12 +536,15 @@ EntryInfo * bdb_cache_find_info(
struct bdb_info *bdb,
ID id
);
#define ID_LOCKED 1
#define ID_NOCACHE 2
int bdb_cache_find_id(
Operation *op,
DB_TXN *tid,
ID id,
EntryInfo **eip,
int islocked,
int flag,
BDB_LOCKER locker,
DB_LOCK *lock
);

View File

@ -303,19 +303,20 @@ int
bdb_search( Operation *op, SlapReply *rs )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
time_t stoptime;
ID id, cursor;
ID lastid = NOID;
ID candidates[BDB_IDL_UM_SIZE];
ID scopes[BDB_IDL_DB_SIZE];
Entry *e = NULL, base, *e_root;
Entry *matched = NULL;
EntryInfo *ei;
AttributeName *attrs;
struct berval realbase = BER_BVNULL;
slap_mask_t mask;
time_t stoptime;
int manageDSAit;
int tentries = 0;
ID lastid = NOID;
AttributeName *attrs;
int tentries = 0, nentries = 0;
int idflag = 0;
BDB_LOCKER locker = 0;
DB_LOCK lock;
@ -640,6 +641,7 @@ dn2entry_retry:
rs->sr_err = LDAP_OTHER;
goto done;
}
nentries = ps->ps_count;
goto loop_begin;
}
@ -676,11 +678,19 @@ loop_begin:
goto done;
}
/* If we inspect more entries than will
* fit into the entry cache, stop caching
* any subsequent entries
*/
nentries++;
if ( nentries > bdb->bi_cache.c_maxsize && !idflag )
idflag = ID_NOCACHE;
fetch_entry_retry:
/* get the entry with reader lock */
ei = NULL;
rs->sr_err = bdb_cache_find_id( op, ltid,
id, &ei, 0, locker, &lock );
id, &ei, idflag, locker, &lock );
if (rs->sr_err == LDAP_BUSY) {
rs->sr_text = "ldap server busy";