ITS#9642 Notify runner of any changes to the runqueue

This commit is contained in:
Ondřej Kuzník 2021-09-13 11:57:14 +01:00 committed by Quanah Gibson-Mount
parent fbd4f9abdd
commit 19d4a69882
3 changed files with 34 additions and 3 deletions

View File

@ -20,6 +20,9 @@
LDAP_BEGIN_DECL LDAP_BEGIN_DECL
struct runqueue_s;
typedef void (ldap_pvt_rq_notify_cb) LDAP_P(( struct runqueue_s *rq ));
typedef struct re_s { typedef struct re_s {
struct timeval next_sched; struct timeval next_sched;
struct timeval interval; struct timeval interval;
@ -36,6 +39,7 @@ typedef struct runqueue_s {
LDAP_STAILQ_HEAD(l, re_s) task_list; LDAP_STAILQ_HEAD(l, re_s) task_list;
LDAP_STAILQ_HEAD(rl, re_s) run_list; LDAP_STAILQ_HEAD(rl, re_s) run_list;
ldap_pvt_thread_mutex_t rq_mutex; ldap_pvt_thread_mutex_t rq_mutex;
ldap_pvt_rq_notify_cb *rq_notify_cb;
} runqueue_t; } runqueue_t;
LDAP_F( struct re_s* ) LDAP_F( struct re_s* )

View File

@ -59,6 +59,9 @@ ldap_pvt_runqueue_insert(
entry->tname = tname; entry->tname = tname;
entry->tspec = tspec; entry->tspec = tspec;
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
if ( rq->rq_notify_cb ) {
rq->rq_notify_cb( rq );
}
} }
return entry; return entry;
} }
@ -95,6 +98,9 @@ ldap_pvt_runqueue_remove(
assert( e == entry ); assert( e == entry );
LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext ); LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
if ( rq->rq_notify_cb ) {
rq->rq_notify_cb( rq );
}
LDAP_FREE( entry ); LDAP_FREE( entry );
} }
@ -123,6 +129,9 @@ ldap_pvt_runqueue_runtask(
) )
{ {
LDAP_STAILQ_INSERT_TAIL( &rq->run_list, entry, rnext ); LDAP_STAILQ_INSERT_TAIL( &rq->run_list, entry, rnext );
if ( rq->rq_notify_cb ) {
rq->rq_notify_cb( rq );
}
} }
void void
@ -132,6 +141,9 @@ ldap_pvt_runqueue_stoptask(
) )
{ {
LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext ); LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext );
if ( rq->rq_notify_cb ) {
rq->rq_notify_cb( rq );
}
} }
int int
@ -188,19 +200,24 @@ ldap_pvt_runqueue_resched(
} else { } else {
LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext ); LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext );
} }
return; goto done;
} else if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) { } else if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
if ( prev == NULL ) { if ( prev == NULL ) {
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
} else { } else {
LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext ); LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext );
} }
return; goto done;
} }
prev = e; prev = e;
} }
LDAP_STAILQ_INSERT_TAIL( &rq->task_list, entry, tnext ); LDAP_STAILQ_INSERT_TAIL( &rq->task_list, entry, tnext );
} }
done:
if ( rq->rq_notify_cb ) {
rq->rq_notify_cb( rq );
}
} }
int int

View File

@ -79,11 +79,15 @@ int slap_inet4or6 = AF_UNSPEC;
int slap_inet4or6 = AF_INET; int slap_inet4or6 = AF_INET;
#endif /* ! INETv6 */ #endif /* ! INETv6 */
void slap_runqueue_notify( runqueue_t *rq );
/* globals */ /* globals */
time_t starttime; time_t starttime;
ber_socket_t dtblsize; ber_socket_t dtblsize;
slap_ssf_t local_ssf = LDAP_PVT_SASL_LOCAL_SSF; slap_ssf_t local_ssf = LDAP_PVT_SASL_LOCAL_SSF;
struct runqueue_s slapd_rq; struct runqueue_s slapd_rq = {
.rq_notify_cb = slap_runqueue_notify,
};
int slapd_daemon_threads = 1; int slapd_daemon_threads = 1;
int slapd_daemon_mask; int slapd_daemon_mask;
@ -3572,6 +3576,12 @@ slap_wake_listener()
WAKE_LISTENER(0,1); WAKE_LISTENER(0,1);
} }
void
slap_runqueue_notify( runqueue_t *rq )
{
slap_wake_listener();
}
/* return 0 on timeout, 1 on writer ready /* return 0 on timeout, 1 on writer ready
* -1 on general error * -1 on general error
*/ */