More for threadpool queues

Allow dynamic reconfig
This commit is contained in:
Howard Chu 2013-09-03 15:06:37 -07:00
parent c645a58569
commit 0ef9e6107b
2 changed files with 125 additions and 24 deletions

View File

@ -32,6 +32,10 @@
#ifndef LDAP_THREAD_HAVE_TPOOL
#ifndef CACHELINE
#define CACHELINE 64
#endif
/* Thread-specific key with data and optional free function */
typedef struct ldap_int_tpool_key_s {
void *ltk_key;
@ -93,6 +97,8 @@ typedef struct ldap_int_thread_task_s {
typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
struct ldap_int_thread_poolq_s {
void *ltp_free;
struct ldap_int_thread_pool_s *ltp_pool;
/* protect members below */
@ -129,7 +135,7 @@ struct ldap_int_thread_poolq_s {
struct ldap_int_thread_pool_s {
LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
struct ldap_int_thread_poolq_s *ltp_wqs;
struct ldap_int_thread_poolq_s **ltp_wqs;
/* number of poolqs */
int ltp_numqs;
@ -231,12 +237,29 @@ ldap_pvt_thread_pool_init_q (
*tpool = NULL;
pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
sizeof(struct ldap_int_thread_pool_s) +
numqs * sizeof(struct ldap_int_thread_poolq_s));
sizeof(struct ldap_int_thread_pool_s));
if (pool == NULL) return(-1);
pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1);
pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
if (pool->ltp_wqs == NULL) {
LDAP_FREE(pool);
return(-1);
}
for (i=0; i<numqs; i++) {
char *ptr = LDAP_MALLOC(sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
if (ptr == NULL) {
for (--i; i>=0; i--)
LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
LDAP_FREE(pool->ltp_wqs);
LDAP_FREE(pool);
return(-1);
}
pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
pool->ltp_wqs[i]->ltp_free = ptr;
}
pool->ltp_numqs = numqs;
pool->ltp_conf_max_count = max_threads;
if ( !max_threads )
@ -257,7 +280,7 @@ ldap_pvt_thread_pool_init_q (
rem_thr = max_threads % numqs;
rem_pend = max_pending % numqs;
for ( i=0; i<numqs; i++ ) {
pq = &pool->ltp_wqs[i];
pq = pool->ltp_wqs[i];
pq->ltp_pool = pool;
rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
if (rc != 0)
@ -351,18 +374,18 @@ ldap_pvt_thread_pool_submit (
j = i;
while(1) {
ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex);
if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) {
ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
break;
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
i++;
i %= pool->ltp_numqs;
if ( i == j )
return -1;
}
pq = &pool->ltp_wqs[i];
pq = pool->ltp_wqs[i];
task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
if (task) {
LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
@ -465,7 +488,7 @@ ldap_pvt_thread_pool_retract (
return(-1);
i = ldap_int_poolq_hash( pool, arg );
pq = &pool->ltp_wqs[i];
pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
@ -482,6 +505,75 @@ ldap_pvt_thread_pool_retract (
return task != NULL;
}
/* Set number of work queues in this pool. Should not be
* more than the number of CPUs. */
int
ldap_pvt_thread_pool_queues(
ldap_pvt_thread_pool_t *tpool,
int numqs )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
int i, rc, rem_thr, rem_pend;
if (numqs < 1 || tpool == NULL)
return(-1);
pool = *tpool;
if (pool == NULL)
return(-1);
if (numqs < pool->ltp_numqs) {
for (i=numqs; i<pool->ltp_numqs; i++)
pool->ltp_wqs[i]->ltp_max_count = 0;
} else if (numqs > pool->ltp_numqs) {
struct ldap_int_thread_poolq_s **wqs;
wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
if (wqs == NULL)
return(-1);
pool->ltp_wqs = wqs;
for (i=pool->ltp_numqs; i<numqs; i++) {
char *ptr = LDAP_MALLOC(sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
if (ptr == NULL) {
for (; i<numqs; i++)
pool->ltp_wqs[i] = NULL;
return(-1);
}
pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
pq->ltp_free = ptr;
pool->ltp_wqs[i] = pq;
pq->ltp_pool = pool;
rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
if (rc != 0)
return(rc);
LDAP_STAILQ_INIT(&pq->ltp_pending_list);
pq->ltp_work_list = &pq->ltp_pending_list;
LDAP_SLIST_INIT(&pq->ltp_free_list);
}
}
rem_thr = pool->ltp_max_count % numqs;
rem_pend = pool->ltp_max_pending % numqs;
for ( i=0; i<numqs; i++ ) {
pq = pool->ltp_wqs[i];
pq->ltp_max_count = pool->ltp_max_count / numqs;
if ( rem_thr ) {
pq->ltp_max_count++;
rem_thr--;
}
pq->ltp_max_pending = pool->ltp_max_pending / numqs;
if ( rem_pend ) {
pq->ltp_max_pending++;
rem_pend--;
}
}
pool->ltp_numqs = numqs;
return 0;
}
/* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */
int
ldap_pvt_thread_pool_maxthreads(
@ -512,14 +604,12 @@ ldap_pvt_thread_pool_maxthreads(
max_threads /= pool->ltp_numqs;
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq = pool->ltp_wqs[i];
pq->ltp_max_count = max_threads;
if (remthr) {
pq->ltp_max_count++;
remthr--;
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
return(0);
}
@ -572,7 +662,7 @@ ldap_pvt_thread_pool_query(
int i;
count = 0;
for (i=0; i<pool->ltp_numqs; i++) {
struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i];
struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
switch(param) {
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
@ -615,7 +705,7 @@ ldap_pvt_thread_pool_query(
else {
int i;
for (i=0; i<pool->ltp_numqs; i++)
if (pool->ltp_wqs[i].ltp_pending_count) break;
if (pool->ltp_wqs[i]->ltp_pending_count) break;
if (i<pool->ltp_numqs)
*((char **)value) = "finishing";
else
@ -706,7 +796,7 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
if (pq->ltp_max_pending > 0)
pq->ltp_max_pending = -pq->ltp_max_pending;
@ -753,7 +843,7 @@ ldap_int_thread_pool_wrapper (
ldap_int_tpool_plist_t *work_list;
ldap_int_thread_userctx_t ctx, *kctx;
unsigned i, keyslot, hash;
int pool_lock = 0;
int pool_lock = 0, freeme = 0;
assert(pool != NULL);
@ -869,15 +959,24 @@ ldap_int_thread_pool_wrapper (
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
pq->ltp_open_count--;
/* let pool_destroy know we're all done */
if (pq->ltp_open_count == 0)
ldap_pvt_thread_cond_signal(&pq->ltp_cond);
if (pq->ltp_open_count == 0) {
if (pool->ltp_finishing)
/* let pool_destroy know we're all done */
ldap_pvt_thread_cond_signal(&pq->ltp_cond);
else
freeme = 1;
}
if (pool_lock)
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
else
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
if (freeme) {
ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
LDAP_FREE(pq->ltp_free);
}
ldap_pvt_thread_exit(NULL);
return(NULL);
}
@ -965,7 +1064,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
pool->ltp_active_queues = 0;
for (i=0; i<pool->ltp_numqs; i++)
if (&pool->ltp_wqs[i] == pq) break;
if (pool->ltp_wqs[i] == pq) break;
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* temporarily remove ourself from active count */
@ -973,7 +1072,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
j=i;
do {
pq = &pool->ltp_wqs[j];
pq = pool->ltp_wqs[j];
if (j != i)
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
@ -998,7 +1097,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
/* restore us to active count */
pool->ltp_wqs[i].ltp_active_count++;
pool->ltp_wqs[i]->ltp_active_count++;
assert(pool->ltp_pause == WANT_PAUSE);
pool->ltp_pause = PAUSED;
@ -1065,7 +1164,7 @@ ldap_pvt_thread_pool_resume (
assert(pool->ltp_pause == PAUSED);
pool->ltp_pause = 0;
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
pq = pool->ltp_wqs[i];
if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
pq->ltp_open_count = -pq->ltp_open_count;
pq->ltp_work_list = &pq->ltp_pending_list;

View File

@ -1717,6 +1717,8 @@ config_generic(ConfigArgs *c) {
c->log, c->cr_msg, 0 );
return 1;
}
if ( slapMode & SLAP_SERVER_MODE )
ldap_pvt_thread_pool_queues(&connection_pool, c->value_int);
connection_pool_queues = c->value_int; /* save for reference */
break;