Multiple queues per threadpool

This commit is contained in:
Howard Chu 2013-08-15 06:54:35 -07:00
parent f230503b10
commit 34f832faee
5 changed files with 366 additions and 168 deletions

View File

@ -213,6 +213,13 @@ ldap_pvt_thread_pool_init LDAP_P((
int max_threads,
int max_pending ));
LDAP_F( int )
ldap_pvt_thread_pool_init_q LDAP_P((
ldap_pvt_thread_pool_t *pool_out,
int max_threads,
int max_pending,
int num_qs ));
LDAP_F( int )
ldap_pvt_thread_pool_submit LDAP_P((
ldap_pvt_thread_pool_t *pool,

View File

@ -55,6 +55,7 @@ enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
/* Context: thread ID and thread-specific key/data pairs */
typedef struct ldap_int_thread_userctx_s {
struct ldap_int_thread_poolq_s *ltu_pq;
ldap_pvt_thread_t ltu_id;
ldap_int_tpool_key_t ltu_key[MAXKEYS];
} ldap_int_thread_userctx_t;
@ -91,8 +92,8 @@ typedef struct ldap_int_thread_task_s {
typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
struct ldap_int_thread_pool_s {
LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
struct ldap_int_thread_poolq_s {
struct ldap_int_thread_pool_s *ltp_pool;
/* protect members below, and protect thread_keys[] during pauses */
ldap_pvt_thread_mutex_t ltp_mutex;
@ -112,19 +113,6 @@ struct ldap_int_thread_pool_s {
ldap_int_tpool_plist_t ltp_pending_list;
LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
/* The pool is finishing, waiting for its threads to close.
* They close when ltp_pending_list is done. pool_submit()
* rejects new tasks. ltp_max_pending = -(its old value).
*/
int ltp_finishing;
/* Some active task needs to be the sole active task.
* Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
* Note: Pauses adjust ltp_<open_count/vary_open_count/work_list>,
* so pool_<submit/wrapper>() mostly can avoid testing ltp_pause.
*/
volatile sig_atomic_t ltp_pause;
/* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
int ltp_max_count;
@ -134,20 +122,39 @@ struct ldap_int_thread_pool_s {
int ltp_pending_count; /* Pending + paused + idle tasks */
int ltp_active_count; /* Active, not paused/idle tasks */
int ltp_open_count; /* Number of threads, negated when ltp_pause */
int ltp_starting; /* Currenlty starting threads */
int ltp_starting; /* Currently starting threads */
};
/* >0 if paused or we may open a thread, <0 if we should close a thread.
* Updated when ltp_<finishing/pause/max_count/open_count> change.
* Maintained to reduce the time ltp_mutex must be locked in
* ldap_pvt_thread_pool_<submit/wrapper>().
struct ldap_int_thread_pool_s {
LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
struct ldap_int_thread_poolq_s *ltp_wqs;
/* number of poolqs */
int ltp_numqs;
/* protect members below, and protect thread_keys[] during pauses */
ldap_pvt_thread_mutex_t ltp_mutex;
/* The pool is finishing, waiting for its threads to close.
* They close when ltp_pending_list is done. pool_submit()
* rejects new tasks. ltp_max_pending = -(its old value).
*/
int ltp_vary_open_count;
# define SET_VARY_OPEN_COUNT(pool) \
((pool)->ltp_vary_open_count = \
(pool)->ltp_pause ? 1 : \
(pool)->ltp_finishing ? -1 : \
((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \
- (pool)->ltp_open_count)
int ltp_finishing;
/* Some active task needs to be the sole active task.
* Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
*/
volatile sig_atomic_t ltp_pause;
/* Max number of threads in pool */
int ltp_max_count;
/* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
int ltp_conf_max_count;
/* Max pending + paused + idle tasks, negated when ltp_finishing */
int ltp_max_pending;
};
static ldap_int_tpool_plist_t empty_pending_list =
@ -191,13 +198,15 @@ ldap_int_thread_pool_shutdown ( void )
/* Create a thread pool */
int
ldap_pvt_thread_pool_init (
ldap_pvt_thread_pool_init_q (
ldap_pvt_thread_pool_t *tpool,
int max_threads,
int max_pending )
int max_pending,
int numqs )
{
ldap_pvt_thread_pool_t pool;
int rc;
struct ldap_int_thread_poolq_s *pq;
int i, rc, rem_thr, rem_pend;
/* multiple pools are currently not supported (ITS#4943) */
assert(!ldap_int_has_thread_pool);
@ -209,30 +218,56 @@ ldap_pvt_thread_pool_init (
*tpool = NULL;
pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
sizeof(struct ldap_int_thread_pool_s));
sizeof(struct ldap_int_thread_pool_s) +
numqs * sizeof(struct ldap_int_thread_poolq_s));
if (pool == NULL) return(-1);
pool->ltp_wqs = (struct ldap_int_thread_poolq_s *)(pool+1);
pool->ltp_numqs = numqs;
pool->ltp_conf_max_count = max_threads;
if ( !max_threads )
max_threads = LDAP_MAXTHR;
rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
if (rc != 0)
return(rc);
rem_thr = max_threads % numqs;
rem_pend = max_pending % numqs;
for ( i=0; i<numqs; i++ ) {
pq = &pool->ltp_wqs[i];
pq->ltp_pool = pool;
rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
if (rc != 0)
return(rc);
rc = ldap_pvt_thread_cond_init(&pq->ltp_pcond);
if (rc != 0)
return(rc);
LDAP_STAILQ_INIT(&pq->ltp_pending_list);
pq->ltp_work_list = &pq->ltp_pending_list;
LDAP_SLIST_INIT(&pq->ltp_free_list);
pq->ltp_max_count = max_threads / numqs;
if ( rem_thr ) {
pq->ltp_max_count++;
rem_thr--;
}
pq->ltp_max_pending = max_pending / numqs;
if ( rem_pend ) {
pq->ltp_max_pending++;
rem_pend--;
}
}
ldap_int_has_thread_pool = 1;
pool->ltp_max_count = max_threads;
SET_VARY_OPEN_COUNT(pool);
pool->ltp_max_pending = max_pending;
LDAP_STAILQ_INIT(&pool->ltp_pending_list);
pool->ltp_work_list = &pool->ltp_pending_list;
LDAP_SLIST_INIT(&pool->ltp_free_list);
ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
@ -248,6 +283,28 @@ ldap_pvt_thread_pool_init (
return(0);
}
int
ldap_pvt_thread_pool_init (
ldap_pvt_thread_pool_t *tpool,
int max_threads,
int max_pending )
{
return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
}
static int
ldap_int_poolq_hash(
struct ldap_int_thread_pool_s *pool,
void *arg )
{
int i = 0, j;
unsigned char *ptr = (unsigned char *)&arg;
/* dumb hash of arg to choose a queue */
for (j=0; j<sizeof(arg); j++)
i += *ptr++;
i %= pool->ltp_numqs;
return i;
}
/* Submit a task to be performed by the thread pool */
int
@ -256,8 +313,10 @@ ldap_pvt_thread_pool_submit (
ldap_pvt_thread_start_t *start_routine, void *arg )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
ldap_int_thread_task_t *task;
ldap_pvt_thread_t thr;
int i, j;
if (tpool == NULL)
return(-1);
@ -267,14 +326,28 @@ ldap_pvt_thread_pool_submit (
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
if ( pool->ltp_numqs > 1 )
i = ldap_int_poolq_hash( pool, arg );
else
i = 0;
if (pool->ltp_pending_count >= pool->ltp_max_pending)
goto failed;
j = i;
while(1) {
ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i].ltp_mutex);
if (pool->ltp_wqs[i].ltp_pending_count < pool->ltp_wqs[i].ltp_max_pending) {
break;
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i].ltp_mutex);
i++;
i %= pool->ltp_numqs;
if ( i == j )
return -1;
}
task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
pq = &pool->ltp_wqs[i];
task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
if (task) {
LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
} else {
task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
if (task == NULL)
@ -284,49 +357,47 @@ ldap_pvt_thread_pool_submit (
task->ltt_start_routine = start_routine;
task->ltt_arg = arg;
pool->ltp_pending_count++;
LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q);
pq->ltp_pending_count++;
LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
/* true if ltp_pause != 0 or we should open (create) a thread */
if (pool->ltp_vary_open_count > 0 &&
pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count)
/* should we open (create) a thread? */
if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
pq->ltp_open_count < pq->ltp_max_count)
{
if (pool->ltp_pause)
goto done;
pool->ltp_starting++;
pool->ltp_open_count++;
SET_VARY_OPEN_COUNT(pool);
pq->ltp_starting++;
pq->ltp_open_count++;
if (0 != ldap_pvt_thread_create(
&thr, 1, ldap_int_thread_pool_wrapper, pool))
&thr, 1, ldap_int_thread_pool_wrapper, pq))
{
/* couldn't create thread. back out of
* ltp_open_count and check for even worse things.
*/
pool->ltp_starting--;
pool->ltp_open_count--;
SET_VARY_OPEN_COUNT(pool);
pq->ltp_starting--;
pq->ltp_open_count--;
if (pool->ltp_open_count == 0) {
if (pq->ltp_open_count == 0) {
/* no open threads at all?!?
*/
ldap_int_thread_task_t *ptr;
/* let pool_destroy know there are no more threads */
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
ldap_pvt_thread_cond_signal(&pq->ltp_cond);
LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltt_next.q)
LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
if (ptr == task) break;
if (ptr == task) {
/* no open threads, task not handled, so
* back out of ltp_pending_count, free the task,
* report the error.
*/
pool->ltp_pending_count--;
LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task,
pq->ltp_pending_count--;
LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
ldap_int_thread_task_s, ltt_next.q);
LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task,
LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
ltt_next.l);
goto failed;
}
@ -336,14 +407,14 @@ ldap_pvt_thread_pool_submit (
*/
}
}
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
ldap_pvt_thread_cond_signal(&pq->ltp_cond);
done:
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
return(0);
failed:
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
return(-1);
}
@ -363,7 +434,9 @@ ldap_pvt_thread_pool_retract (
ldap_pvt_thread_start_t *start_routine, void *arg )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
ldap_int_thread_task_t *task;
int i;
if (tpool == NULL)
return(-1);
@ -373,8 +446,11 @@ ldap_pvt_thread_pool_retract (
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pool->ltp_pending_list, ltt_next.q)
i = ldap_int_poolq_hash( pool, arg );
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
if (task->ltt_start_routine == start_routine &&
task->ltt_arg == arg) {
/* Could LDAP_STAILQ_REMOVE the task, but that
@ -384,7 +460,7 @@ ldap_pvt_thread_pool_retract (
task->ltt_arg = NULL;
break;
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
return task != NULL;
}
@ -395,6 +471,8 @@ ldap_pvt_thread_pool_maxthreads(
int max_threads )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
int remthr, i;
if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
max_threads = 0;
@ -407,12 +485,24 @@ ldap_pvt_thread_pool_maxthreads(
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_conf_max_count = max_threads;
if ( !max_threads )
max_threads = LDAP_MAXTHR;
pool->ltp_max_count = max_threads;
SET_VARY_OPEN_COUNT(pool);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
remthr = max_threads % pool->ltp_numqs;
max_threads /= pool->ltp_numqs;
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_max_count = max_threads;
if (remthr) {
pq->ltp_max_count++;
remthr--;
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
return(0);
}
@ -436,10 +526,9 @@ ldap_pvt_thread_pool_query(
return 0;
}
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
switch ( param ) {
case LDAP_PVT_THREAD_POOL_PARAM_MAX:
count = pool->ltp_max_count;
count = pool->ltp_conf_max_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
@ -450,30 +539,45 @@ ldap_pvt_thread_pool_query(
count = 0;
break;
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
count = pool->ltp_open_count;
if (count < 0)
count = -count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
count = pool->ltp_starting;
break;
case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
count = pool->ltp_active_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
count = (pool->ltp_pause != 0);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
break;
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
count = pool->ltp_pending_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
count = pool->ltp_pending_count + pool->ltp_active_count;
{
int i;
count = 0;
for (i=0; i<pool->ltp_numqs; i++) {
struct ldap_int_thread_poolq_s *pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
switch(param) {
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
count += pq->ltp_open_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
count += pq->ltp_starting;
break;
case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
count += pq->ltp_active_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
count += pq->ltp_pending_count;
break;
case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
count += pq->ltp_pending_count + pq->ltp_active_count;
break;
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
if (count < 0)
count = -count;
}
break;
case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
@ -486,16 +590,24 @@ ldap_pvt_thread_pool_query(
break;
case LDAP_PVT_THREAD_POOL_PARAM_STATE:
*((char **)value) =
pool->ltp_pause ? "pausing" :
!pool->ltp_finishing ? "running" :
pool->ltp_pending_count ? "finishing" : "stopping";
if (pool->ltp_pause)
*((char **)value) = "pausing";
else if (!pool->ltp_finishing)
*((char **)value) = "running";
else {
int i;
for (i=0; i<pool->ltp_numqs; i++)
if (pool->ltp_wqs[i].ltp_pending_count) break;
if (i<pool->ltp_numqs)
*((char **)value) = "finishing";
else
*((char **)value) = "stopping";
}
break;
case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
break;
}
ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
if ( count > -1 ) {
*((int *)value) = count;
@ -545,7 +657,9 @@ int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
{
struct ldap_int_thread_pool_s *pool, *pptr;
struct ldap_int_thread_poolq_s *pq;
ldap_int_thread_task_t *task;
int i;
if (tpool == NULL)
return(-1);
@ -567,33 +681,40 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_finishing = 1;
SET_VARY_OPEN_COUNT(pool);
if (pool->ltp_max_pending > 0)
pool->ltp_max_pending = -pool->ltp_max_pending;
if (!run_pending) {
while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
if (pq->ltp_max_pending > 0)
pq->ltp_max_pending = -pq->ltp_max_pending;
if (!run_pending) {
while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
LDAP_FREE(task);
}
pq->ltp_pending_count = 0;
}
while (pq->ltp_open_count) {
if (!pool->ltp_pause)
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
}
while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
{
LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
LDAP_FREE(task);
}
pool->ltp_pending_count = 0;
}
while (pool->ltp_open_count) {
if (!pool->ltp_pause)
ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
}
while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
{
LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
LDAP_FREE(task);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
ldap_pvt_thread_cond_destroy(&pq->ltp_pcond);
ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
LDAP_FREE(pool);
*tpool = NULL;
@ -606,7 +727,8 @@ static void *
ldap_int_thread_pool_wrapper (
void *xpool )
{
struct ldap_int_thread_pool_s *pool = xpool;
struct ldap_int_thread_poolq_s *pq = xpool;
struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
ldap_int_thread_task_t *task;
ldap_int_tpool_plist_t *work_list;
ldap_int_thread_userctx_t ctx, *kctx;
@ -618,16 +740,17 @@ ldap_int_thread_pool_wrapper (
ctx.ltu_key[i].ltk_key = NULL;
}
ctx.ltu_pq = pq;
ctx.ltu_id = ldap_pvt_thread_self();
TID_HASH(ctx.ltu_id, hash);
ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* thread_keys[] is read-only when paused */
while (pool->ltp_pause)
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
/* find a key slot to give this thread ID and store a
* pointer to our keys there; start at the thread ID
@ -640,20 +763,20 @@ ldap_int_thread_pool_wrapper (
thread_keys[keyslot].ctx = &ctx;
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
pool->ltp_starting--;
pool->ltp_active_count++;
pq->ltp_starting--;
pq->ltp_active_count++;
for (;;) {
work_list = pool->ltp_work_list; /* help the compiler a bit */
work_list = pq->ltp_work_list; /* help the compiler a bit */
task = LDAP_STAILQ_FIRST(work_list);
if (task == NULL) { /* paused or no pending tasks */
if (--(pool->ltp_active_count) < 2) {
if (--(pq->ltp_active_count) < 2) {
/* Notify pool_pause it is the sole active thread. */
ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
}
do {
if (pool->ltp_vary_open_count < 0) {
if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
/* Not paused, and either finishing or too many
* threads running (can happen if ltp_max_count
* was reduced). Let this thread die.
@ -672,23 +795,23 @@ ldap_int_thread_pool_wrapper (
* Just use pthread_cond_timedwait() if we want to
* check idle time.
*/
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
work_list = pool->ltp_work_list;
work_list = pq->ltp_work_list;
task = LDAP_STAILQ_FIRST(work_list);
} while (task == NULL);
pool->ltp_active_count++;
pq->ltp_active_count++;
}
LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
pool->ltp_pending_count--;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
pq->ltp_pending_count--;
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
task->ltt_start_routine(&ctx, task->ltt_arg);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l);
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
}
done:
@ -702,13 +825,12 @@ ldap_int_thread_pool_wrapper (
thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
pool->ltp_open_count--;
SET_VARY_OPEN_COUNT(pool);
pq->ltp_open_count--;
/* let pool_destroy know we're all done */
if (pool->ltp_open_count == 0)
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
if (pq->ltp_open_count == 0)
ldap_pvt_thread_cond_signal(&pq->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
ldap_pvt_thread_exit(NULL);
return(NULL);
@ -728,6 +850,7 @@ static int
handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
int ret = 0, pause, max_ltp_pause;
if (tpool == NULL)
@ -741,6 +864,11 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
return(0);
{
ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
pq = ctx->ltu_pq;
}
/* Let pool_unidle() ignore requests for new pauses */
max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
@ -752,46 +880,74 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
pause_type -= pause;
if (pause_type & GO_IDLE) {
pool->ltp_pending_count++;
pool->ltp_active_count--;
if (pause && pool->ltp_active_count < 2) {
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
pq->ltp_pending_count++;
pq->ltp_active_count--;
if (pause && pq->ltp_active_count < 1) {
/* Tell the task waiting to DO_PAUSE it can proceed */
ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
if (pause_type & GO_UNIDLE) {
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* Wait out pause if any, then cancel GO_IDLE */
if (pause > max_ltp_pause) {
ret = 1;
do {
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
} while (pool->ltp_pause > max_ltp_pause);
}
pool->ltp_pending_count--;
pool->ltp_active_count++;
pq->ltp_pending_count--;
pq->ltp_active_count++;
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
if (pause_type & DO_PAUSE) {
int i, j;
/* Tell everyone else to pause or finish, then await that */
ret = 0;
assert(!pool->ltp_pause);
pool->ltp_pause = WANT_PAUSE;
/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
* and do not finish threads in ldap_pvt_thread_pool_wrapper() */
pool->ltp_open_count = -pool->ltp_open_count;
SET_VARY_OPEN_COUNT(pool);
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
pool->ltp_work_list = &empty_pending_list;
/* Wait for this task to become the sole active task */
while (pool->ltp_active_count > 1) {
ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
}
for (i=0; i<pool->ltp_numqs; i++)
if (&pool->ltp_wqs[i] == pq) break;
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* temporarily remove ourself from active count */
pq->ltp_active_count--;
j=i;
do {
if (j != i)
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
* and do not finish threads in ldap_pvt_thread_pool_wrapper() */
pq->ltp_open_count = -pq->ltp_open_count;
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
pq->ltp_work_list = &empty_pending_list;
/* Wait for this task to become the sole active task */
while (pq->ltp_active_count > 0) {
ldap_pvt_thread_cond_wait(&pq->ltp_pcond, &pq->ltp_mutex);
}
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
if (pool->ltp_numqs > 1) {
j++;
j %= pool->ltp_numqs;
}
} while (j != i);
/* restore us to active count */
pool->ltp_wqs[i].ltp_active_count++;
assert(pool->ltp_pause == WANT_PAUSE);
pool->ltp_pause = PAUSED;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(ret);
}
@ -837,6 +993,8 @@ ldap_pvt_thread_pool_resume (
ldap_pvt_thread_pool_t *tpool )
{
struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq;
int i;
if (tpool == NULL)
return(-1);
@ -850,12 +1008,16 @@ ldap_pvt_thread_pool_resume (
assert(pool->ltp_pause == PAUSED);
pool->ltp_pause = 0;
if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */
pool->ltp_open_count = -pool->ltp_open_count;
SET_VARY_OPEN_COUNT(pool);
pool->ltp_work_list = &pool->ltp_pending_list;
for (i=0; i<pool->ltp_numqs; i++) {
pq = &pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
if (pq->ltp_open_count <= 0) /* true when paused, but be paranoid */
pq->ltp_open_count = -pq->ltp_open_count;
pq->ltp_work_list = &pq->ltp_pending_list;
ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(0);

View File

@ -198,6 +198,7 @@ enum {
CFG_LTHREADS,
CFG_IX_HASH64,
CFG_DISABLED,
CFG_THREADQS,
CFG_LAST
};
@ -683,6 +684,14 @@ static ConfigTable config_back_cf_table[] = {
#endif
"( OLcfgGlAt:66 NAME 'olcThreads' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "threadqueues", "count", 2, 2, 0,
#ifdef NO_THREADS
ARG_IGNORED, NULL,
#else
ARG_INT|ARG_MAGIC|CFG_THREADQS, &config_generic,
#endif
"( OLcfgGlAt:95 NAME 'olcThreadQueues' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "timelimit", "limit", 2, 0, 0, ARG_MAY_DB|ARG_MAGIC,
&config_timelimit, "( OLcfgGlAt:67 NAME 'olcTimeLimit' "
"SYNTAX OMsDirectoryString )", NULL, NULL },
@ -835,7 +844,8 @@ static ConfigOCs cf_ocs[] = {
"olcDisallows $ olcGentleHUP $ olcIdleTimeout $ "
"olcIndexSubstrIfMaxLen $ olcIndexSubstrIfMinLen $ "
"olcIndexSubstrAnyLen $ olcIndexSubstrAnyStep $ olcIndexHash64 $ "
"olcIndexIntLen $ olcLocalSSF $ olcLogFile $ olcLogLevel $ "
"olcIndexIntLen $ "
"olcListenerThreads $ olcLocalSSF $ olcLogFile $ olcLogLevel $ "
"olcPasswordCryptSaltFormat $ olcPasswordHash $ olcPidFile $ "
"olcPluginLogFile $ olcReadOnly $ olcReferral $ "
"olcReplogFile $ olcRequires $ olcRestrict $ olcReverseLookup $ "
@ -845,7 +855,8 @@ static ConfigOCs cf_ocs[] = {
"olcSecurity $ olcServerID $ olcSizeLimit $ "
"olcSockbufMaxIncoming $ olcSockbufMaxIncomingAuth $ "
"olcTCPBuffer $ "
"olcThreads $ olcTimeLimit $ olcTLSCACertificateFile $ "
"olcThreads $ olcThreadQueues $ "
"olcTimeLimit $ olcTLSCACertificateFile $ "
"olcTLSCACertificatePath $ olcTLSCertificateFile $ "
"olcTLSCertificateKeyFile $ olcTLSCipherSuite $ olcTLSCRLCheck $ "
"olcTLSRandFile $ olcTLSVerifyClient $ olcTLSDHParamFile $ "
@ -947,6 +958,9 @@ config_generic(ConfigArgs *c) {
case CFG_THREADS:
c->value_int = connection_pool_max;
break;
case CFG_THREADQS:
c->value_int = connection_pool_queues;
break;
case CFG_TTHREADS:
c->value_int = slap_tool_thread_max;
break;
@ -1314,6 +1328,7 @@ config_generic(ConfigArgs *c) {
/* single-valued attrs, no-ops */
case CFG_CONCUR:
case CFG_THREADS:
case CFG_THREADQS:
case CFG_TTHREADS:
case CFG_LTHREADS:
case CFG_RO:
@ -1693,6 +1708,18 @@ config_generic(ConfigArgs *c) {
connection_pool_max = c->value_int; /* save for reference */
break;
case CFG_THREADQS:
if ( c->value_int < 1 ) {
snprintf( c->cr_msg, sizeof( c->cr_msg ),
"threadqueuess=%d smaller than minimum value 1",
c->value_int );
Debug(LDAP_DEBUG_ANY, "%s: %s.\n",
c->log, c->cr_msg, 0 );
return 1;
}
connection_pool_queues = c->value_int; /* save for reference */
break;
case CFG_TTHREADS:
if ( slapMode & SLAP_TOOL_MODE )
ldap_pvt_thread_pool_maxthreads(&connection_pool, c->value_int);

View File

@ -59,7 +59,8 @@ BerVarray default_referral = NULL;
* global variables that need mutex protection
*/
ldap_pvt_thread_pool_t connection_pool;
int connection_pool_max = SLAP_MAX_WORKER_THREADS;
int connection_pool_max = SLAP_MAX_WORKER_THREADS;
int connection_pool_queues = 1;
int slap_tool_thread_max = 1;
slap_counters_t slap_counters, *slap_counters_list;
@ -135,8 +136,8 @@ slap_init( int mode, const char *name )
slap_name = name;
ldap_pvt_thread_pool_init( &connection_pool,
connection_pool_max, 0);
ldap_pvt_thread_pool_init_q( &connection_pool,
connection_pool_max, 0, 4);
slap_counters_init( &slap_counters );

View File

@ -2066,6 +2066,7 @@ LDAP_SLAPD_V (time_t) starttime;
LDAP_SLAPD_V (ldap_pvt_thread_pool_t) connection_pool;
LDAP_SLAPD_V (int) connection_pool_max;
LDAP_SLAPD_V (int) connection_pool_queues;
LDAP_SLAPD_V (int) slap_tool_thread_max;
LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) entry2str_mutex;