From 11a4b3eb5742b74126de3905523421a2d29a52a1 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Sat, 6 Dec 2003 23:32:56 +0000 Subject: [PATCH] Use runqueue for consistency checker --- servers/slapd/overlays/pcache.c | 229 +++++++++++++++++--------------- 1 file changed, 122 insertions(+), 107 deletions(-) diff --git a/servers/slapd/overlays/pcache.c b/servers/slapd/overlays/pcache.c index 1f27391e47..bda597b379 100644 --- a/servers/slapd/overlays/pcache.c +++ b/servers/slapd/overlays/pcache.c @@ -33,6 +33,7 @@ #include "slap.h" #include "ldap_pvt.h" #include "lutil.h" +#include "ldap_rq.h" /* query cache structs */ /* query */ @@ -120,8 +121,8 @@ typedef struct cache_manager_s { int num_entries_limit; /* max # of entries in a cacheable query */ int cc_period; /* interval between successive consistency checks (sec) */ - int cc_thread_started; - ldap_pvt_thread_t cc_thread; + int cc_paused; + void *cc_arg; ldap_pvt_thread_mutex_t cache_mutex; ldap_pvt_thread_mutex_t remove_mutex; @@ -136,6 +137,7 @@ static char *queryid_schema = "( 1.3.6.1.4.1.4203.666.1.12 NAME 'queryid' " "SYNTAX 1.3.6.1.4.1.1466.115.121.1.40{64} " "NO-USER-MODIFICATION USAGE directoryOperation )"; +/* Return 1 for an added entry, else 0 */ static int merge_entry( Operation *op, @@ -179,19 +181,24 @@ merge_entry( modlist->sml_op = LDAP_MOD_ADD; op->o_tag = LDAP_REQ_MODIFY; op->orm_modlist = modlist; - rc = op->o_bd->be_modify( op, &sreply ); + op->o_bd->be_modify( op, &sreply ); slap_mods_free( modlist ); } else if ( rc == LDAP_REFERRAL || rc == LDAP_NO_SUCH_OBJECT ) { syncrepl_add_glue( op, e ); e = NULL; + rc = 1; + } + if ( e ) { + entry_free( e ); + rc = 0; } - if ( e ) entry_free( e ); } else { be_entry_release_w( op, e ); + rc = 1; } - return 0; + return rc; } /* compare base and scope of incoming and cached queries */ @@ -960,11 +967,6 @@ is_temp_answerable( strcasecmp(qt->querystr.bv_val, tempstr->bv_val) == 0); } -static void* -consistency_check( - void *op -); - static int filter2template( Filter *f, @@ -1161,41 +1163,19 @@ cache_entries( } return_val = merge_entry(&op_tmp, e, query_uuid); -#ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, - "ENTRY ADDED/MERGED, CACHE =%d entries\n", - cm->cur_entries, 0, 0 ); -#else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, - "ENTRY ADDED/MERGED, CACHE =%d entries\n", - cm->cur_entries, 0, 0 ); -#endif /* !NEW_LOGGING */ -#ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL2, "UNLOCKING REMOVE MUTEX\n", - 0, 0, 0 ); -#else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_NONE, "UNLOCKING REMOVE MUTEX\n", 0, 0, 0 ); -#endif /* !NEW_LOGGING */ ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); -#ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL2, "UNLOCKED REMOVE MUTEX\n", - 0, 0, 0 ); -#else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_NONE, "UNLOCKED REMOVE MUTEX\n", 0, 0, 0 ); -#endif /* !NEW_LOGGING */ - if (return_val) - return 0; ldap_pvt_thread_mutex_lock(&cm->cache_mutex); - cm->cur_entries++; + cm->cur_entries += return_val; #ifdef NEW_LOGGING LDAP_LOG( BACK_META, DETAIL1, - "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n", - return_val, cm->cur_entries, 0 ); + "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n", + cm->cur_entries, 0, 0 ); #else /* !NEW_LOGGING */ Debug( LDAP_DEBUG_ANY, - "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n", - return_val, cm->cur_entries, 0 ); + "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n", + cm->cur_entries, 0, 0 ); #endif /* !NEW_LOGGING */ + return_val = 0; ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); } ldap_pvt_thread_mutex_lock(&cm->cache_mutex); @@ -1248,8 +1228,20 @@ proxy_cache_response( } } } else if ( rs->sr_type == REP_RESULT && !si->over ) { - if ( cache_entries( op, rs, &uuid ) == 0) + if ( cache_entries( op, rs, &uuid ) == 0) { qm->addfunc(qm, &si->query, si->template_id, &uuid); + /* If the consistency checker suspended itself, + * wake it back up + */ + if ( cm->cc_paused ) { + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + if ( cm->cc_paused ) { + cm->cc_paused = 0; + ldap_pvt_runqueue_resched( &syncrepl_rq, cm->cc_arg, 0 ); + } + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); + } + } } return SLAP_CB_CONTINUE; } @@ -1467,10 +1459,10 @@ proxy_cache_search( } else { #ifdef NEW_LOGGING LDAP_LOG( BACK_META, DETAIL1, - "QUERY NOT CACHEABLE no\n", + "QUERY NOT CACHEABLE\n", 0, 0, 0); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE no\n", + Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE\n", 0, 0, 0); #endif /* !NEW_LOGGING */ } @@ -1525,101 +1517,118 @@ get_attr_set( } static void* -consistency_check(void* operation) +consistency_check( + void *ctx, + void *arg ) { - Operation op = *(Operation*)operation; - BackendDB be = *op.o_bd; + struct re_s *rtask = arg; + slap_overinst *on = rtask->arg; + cache_manager *cm = on->on_bi.bi_private; + query_manager *qm = cm->qm; + Operation op = {0}; + BackendDB be = on->on_info->oi_bd; SlapReply rs = {REP_RESULT}; - - slap_overinst *on = (slap_overinst *)op.o_bd->bd_info; - cache_manager* cm = on->on_bi.bi_private; - query_manager* qm = cm->qm; CachedQuery* query, *query_prev; - time_t curr_time; - struct berval uuid; - int i, return_val; + int i, return_val, pause = 1; QueryTemplate* templ; op.o_bd = &be; + op.o_dn = be.be_rootdn; + op.o_ndn = be.be_rootndn; + op.o_threadctx = ctx; + + op.o_tmpmemctx = sl_mem_create( SLMALLOC_SLAB_SIZE, ctx ); + op.o_tmpmfuncs = &sl_mfuncs; + be.bd_info = cm->bi; be.be_private = cm->be_private; - for(;;) { - ldap_pvt_thread_sleep(cm->cc_period); - for (i=0; qm->templates[i].querystr.bv_val; i++) { - templ = qm->templates + i; - query = templ->query_last; - curr_time = slap_get_time(); - ldap_pvt_thread_mutex_lock(&cm->remove_mutex); - while (query && (query->expiry_time < curr_time)) { - ldap_pvt_thread_mutex_lock(&qm->lru_mutex); - remove_query(qm, query); - ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); + cm->cc_arg = arg; + + for (i=0; qm->templates[i].querystr.bv_val; i++) { + templ = qm->templates + i; + query = templ->query_last; + if ( query ) pause = 0; + op.o_time = slap_get_time(); + ldap_pvt_thread_mutex_lock(&cm->remove_mutex); + while (query && (query->expiry_time < op.o_time)) { + ldap_pvt_thread_mutex_lock(&qm->lru_mutex); + remove_query(qm, query); + ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n", - i, 0, 0 ); + LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n", + i, 0, 0 ); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n", - i, 0, 0 ); + Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n", + i, 0, 0 ); #endif /* !NEW_LOGGING */ - ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock); - remove_from_template(query, templ); + ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock); + remove_from_template(query, templ); #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, - "TEMPLATE %d QUERIES-- %d\n", - i, templ->no_of_queries, 0 ); + LDAP_LOG( BACK_META, DETAIL1, + "TEMPLATE %d QUERIES-- %d\n", + i, templ->no_of_queries, 0 ); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n", - i, templ->no_of_queries, 0 ); + Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n", + i, templ->no_of_queries, 0 ); #endif /* !NEW_LOGGING */ #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n", - i, 0, 0 ); + LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n", + i, 0, 0 ); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n", - i, 0, 0 ); + Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n", + i, 0, 0 ); #endif /* !NEW_LOGGING */ - ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock); - uuid = query->q_uuid; - return_val = remove_query_data(&op, &rs, &uuid); + ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock); + return_val = remove_query_data(&op, &rs, &query->q_uuid); #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, - "STALE QUERY REMOVED, SIZE=%d\n", + LDAP_LOG( BACK_META, DETAIL1, + "STALE QUERY REMOVED, SIZE=%d\n", + return_val, 0, 0 ); +#else /* !NEW_LOGGING */ + Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n", return_val, 0, 0 ); -#else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n", - return_val, 0, 0 ); #endif /* !NEW_LOGGING */ - ldap_pvt_thread_mutex_lock(&cm->cache_mutex); - cm->cur_entries -= return_val; - cm->num_cached_queries--; + ldap_pvt_thread_mutex_lock(&cm->cache_mutex); + cm->cur_entries -= return_val; + cm->num_cached_queries--; #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n", - cm->num_cached_queries, 0, 0 ); + LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n", + cm->num_cached_queries, 0, 0 ); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n", - cm->num_cached_queries, 0, 0 ); + Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n", + cm->num_cached_queries, 0, 0 ); #endif /* !NEW_LOGGING */ - ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); + ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); #ifdef NEW_LOGGING - LDAP_LOG( BACK_META, DETAIL1, - "STALE QUERY REMOVED, CACHE =" - "%d entries\n", - cm->cur_entries, 0, 0 ); + LDAP_LOG( BACK_META, DETAIL1, + "STALE QUERY REMOVED, CACHE =" + "%d entries\n", + cm->cur_entries, 0, 0 ); #else /* !NEW_LOGGING */ - Debug( LDAP_DEBUG_ANY, - "STALE QUERY REMOVED, CACHE =" - "%d entries\n", - cm->cur_entries, 0, 0 ); + Debug( LDAP_DEBUG_ANY, + "STALE QUERY REMOVED, CACHE =" + "%d entries\n", + cm->cur_entries, 0, 0 ); #endif /* !NEW_LOGGING */ - query_prev = query; - query = query->prev; - free_query(query_prev); - } - ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); + query_prev = query; + query = query->prev; + free_query(query_prev); } + ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); } + /* If there were no queries, defer processing for a while */ + if ( pause ) { + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + cm->cc_paused = 1; + if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) { + ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask ); + } + ldap_pvt_runqueue_resched( &syncrepl_rq, rtask, 1 ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); + } + return NULL; } @@ -1842,8 +1851,8 @@ proxy_cache_init( cm->max_entries = 0; cm->cur_entries = 0; cm->max_queries = 10000; - cm->cc_thread_started = 0; cm->cc_period = 1000; + cm->cc_paused = 0; qm->attr_sets = NULL; qm->templates = NULL; @@ -1875,6 +1884,12 @@ proxy_cache_open( rc = cm->bi->bi_db_open( be ); be->be_private = private; } + + ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); + ldap_pvt_runqueue_insert( &syncrepl_rq, cm->cc_period, + consistency_check, on ); + ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); + return rc; }