Use runqueue for consistency checker

This commit is contained in:
Howard Chu 2003-12-06 23:32:56 +00:00
parent 74db966ebb
commit 11a4b3eb57

View File

@ -33,6 +33,7 @@
#include "slap.h"
#include "ldap_pvt.h"
#include "lutil.h"
#include "ldap_rq.h"
/* query cache structs */
/* query */
@ -120,8 +121,8 @@ typedef struct cache_manager_s {
int num_entries_limit; /* max # of entries in a cacheable query */
int cc_period; /* interval between successive consistency checks (sec) */
int cc_thread_started;
ldap_pvt_thread_t cc_thread;
int cc_paused;
void *cc_arg;
ldap_pvt_thread_mutex_t cache_mutex;
ldap_pvt_thread_mutex_t remove_mutex;
@ -136,6 +137,7 @@ static char *queryid_schema = "( 1.3.6.1.4.1.4203.666.1.12 NAME 'queryid' "
"SYNTAX 1.3.6.1.4.1.1466.115.121.1.40{64} "
"NO-USER-MODIFICATION USAGE directoryOperation )";
/* Return 1 for an added entry, else 0 */
static int
merge_entry(
Operation *op,
@ -179,19 +181,24 @@ merge_entry(
modlist->sml_op = LDAP_MOD_ADD;
op->o_tag = LDAP_REQ_MODIFY;
op->orm_modlist = modlist;
rc = op->o_bd->be_modify( op, &sreply );
op->o_bd->be_modify( op, &sreply );
slap_mods_free( modlist );
} else if ( rc == LDAP_REFERRAL ||
rc == LDAP_NO_SUCH_OBJECT ) {
syncrepl_add_glue( op, e );
e = NULL;
rc = 1;
}
if ( e ) {
entry_free( e );
rc = 0;
}
if ( e ) entry_free( e );
} else {
be_entry_release_w( op, e );
rc = 1;
}
return 0;
return rc;
}
/* compare base and scope of incoming and cached queries */
@ -960,11 +967,6 @@ is_temp_answerable(
strcasecmp(qt->querystr.bv_val, tempstr->bv_val) == 0);
}
static void*
consistency_check(
void *op
);
static int
filter2template(
Filter *f,
@ -1161,41 +1163,19 @@ cache_entries(
}
return_val = merge_entry(&op_tmp, e, query_uuid);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"ENTRY ADDED/MERGED, CACHE =%d entries\n",
cm->cur_entries, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY,
"ENTRY ADDED/MERGED, CACHE =%d entries\n",
cm->cur_entries, 0, 0 );
#endif /* !NEW_LOGGING */
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL2, "UNLOCKING REMOVE MUTEX\n",
0, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_NONE, "UNLOCKING REMOVE MUTEX\n", 0, 0, 0 );
#endif /* !NEW_LOGGING */
ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL2, "UNLOCKED REMOVE MUTEX\n",
0, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_NONE, "UNLOCKED REMOVE MUTEX\n", 0, 0, 0 );
#endif /* !NEW_LOGGING */
if (return_val)
return 0;
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
cm->cur_entries++;
cm->cur_entries += return_val;
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
return_val, cm->cur_entries, 0 );
"ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
cm->cur_entries, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY,
"ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
return_val, cm->cur_entries, 0 );
"ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
cm->cur_entries, 0, 0 );
#endif /* !NEW_LOGGING */
return_val = 0;
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
}
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
@ -1248,8 +1228,20 @@ proxy_cache_response(
}
}
} else if ( rs->sr_type == REP_RESULT && !si->over ) {
if ( cache_entries( op, rs, &uuid ) == 0)
if ( cache_entries( op, rs, &uuid ) == 0) {
qm->addfunc(qm, &si->query, si->template_id, &uuid);
/* If the consistency checker suspended itself,
* wake it back up
*/
if ( cm->cc_paused ) {
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
if ( cm->cc_paused ) {
cm->cc_paused = 0;
ldap_pvt_runqueue_resched( &syncrepl_rq, cm->cc_arg, 0 );
}
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
}
}
return SLAP_CB_CONTINUE;
}
@ -1467,10 +1459,10 @@ proxy_cache_search(
} else {
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"QUERY NOT CACHEABLE no\n",
"QUERY NOT CACHEABLE\n",
0, 0, 0);
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE no\n",
Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE\n",
0, 0, 0);
#endif /* !NEW_LOGGING */
}
@ -1525,101 +1517,118 @@ get_attr_set(
}
static void*
consistency_check(void* operation)
consistency_check(
void *ctx,
void *arg )
{
Operation op = *(Operation*)operation;
BackendDB be = *op.o_bd;
struct re_s *rtask = arg;
slap_overinst *on = rtask->arg;
cache_manager *cm = on->on_bi.bi_private;
query_manager *qm = cm->qm;
Operation op = {0};
BackendDB be = on->on_info->oi_bd;
SlapReply rs = {REP_RESULT};
slap_overinst *on = (slap_overinst *)op.o_bd->bd_info;
cache_manager* cm = on->on_bi.bi_private;
query_manager* qm = cm->qm;
CachedQuery* query, *query_prev;
time_t curr_time;
struct berval uuid;
int i, return_val;
int i, return_val, pause = 1;
QueryTemplate* templ;
op.o_bd = &be;
op.o_dn = be.be_rootdn;
op.o_ndn = be.be_rootndn;
op.o_threadctx = ctx;
op.o_tmpmemctx = sl_mem_create( SLMALLOC_SLAB_SIZE, ctx );
op.o_tmpmfuncs = &sl_mfuncs;
be.bd_info = cm->bi;
be.be_private = cm->be_private;
for(;;) {
ldap_pvt_thread_sleep(cm->cc_period);
for (i=0; qm->templates[i].querystr.bv_val; i++) {
templ = qm->templates + i;
query = templ->query_last;
curr_time = slap_get_time();
ldap_pvt_thread_mutex_lock(&cm->remove_mutex);
while (query && (query->expiry_time < curr_time)) {
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
remove_query(qm, query);
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
cm->cc_arg = arg;
for (i=0; qm->templates[i].querystr.bv_val; i++) {
templ = qm->templates + i;
query = templ->query_last;
if ( query ) pause = 0;
op.o_time = slap_get_time();
ldap_pvt_thread_mutex_lock(&cm->remove_mutex);
while (query && (query->expiry_time < op.o_time)) {
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
remove_query(qm, query);
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
i, 0, 0 );
LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
i, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
i, 0, 0 );
Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
i, 0, 0 );
#endif /* !NEW_LOGGING */
ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
remove_from_template(query, templ);
ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
remove_from_template(query, templ);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"TEMPLATE %d QUERIES-- %d\n",
i, templ->no_of_queries, 0 );
LDAP_LOG( BACK_META, DETAIL1,
"TEMPLATE %d QUERIES-- %d\n",
i, templ->no_of_queries, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
i, templ->no_of_queries, 0 );
Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
i, templ->no_of_queries, 0 );
#endif /* !NEW_LOGGING */
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
i, 0, 0 );
LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
i, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
i, 0, 0 );
Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
i, 0, 0 );
#endif /* !NEW_LOGGING */
ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
uuid = query->q_uuid;
return_val = remove_query_data(&op, &rs, &uuid);
ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
return_val = remove_query_data(&op, &rs, &query->q_uuid);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"STALE QUERY REMOVED, SIZE=%d\n",
LDAP_LOG( BACK_META, DETAIL1,
"STALE QUERY REMOVED, SIZE=%d\n",
return_val, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
return_val, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
return_val, 0, 0 );
#endif /* !NEW_LOGGING */
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
cm->cur_entries -= return_val;
cm->num_cached_queries--;
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
cm->cur_entries -= return_val;
cm->num_cached_queries--;
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
#endif /* !NEW_LOGGING */
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
"STALE QUERY REMOVED, CACHE ="
"%d entries\n",
cm->cur_entries, 0, 0 );
LDAP_LOG( BACK_META, DETAIL1,
"STALE QUERY REMOVED, CACHE ="
"%d entries\n",
cm->cur_entries, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY,
"STALE QUERY REMOVED, CACHE ="
"%d entries\n",
cm->cur_entries, 0, 0 );
Debug( LDAP_DEBUG_ANY,
"STALE QUERY REMOVED, CACHE ="
"%d entries\n",
cm->cur_entries, 0, 0 );
#endif /* !NEW_LOGGING */
query_prev = query;
query = query->prev;
free_query(query_prev);
}
ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
query_prev = query;
query = query->prev;
free_query(query_prev);
}
ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
}
/* If there were no queries, defer processing for a while */
if ( pause ) {
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
cm->cc_paused = 1;
if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
}
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask, 1 );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
return NULL;
}
@ -1842,8 +1851,8 @@ proxy_cache_init(
cm->max_entries = 0;
cm->cur_entries = 0;
cm->max_queries = 10000;
cm->cc_thread_started = 0;
cm->cc_period = 1000;
cm->cc_paused = 0;
qm->attr_sets = NULL;
qm->templates = NULL;
@ -1875,6 +1884,12 @@ proxy_cache_open(
rc = cm->bi->bi_db_open( be );
be->be_private = private;
}
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq, cm->cc_period,
consistency_check, on );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
return rc;
}