mirror of
https://git.openldap.org/openldap/openldap.git
synced 2024-12-21 03:10:25 +08:00
ITS#5985 Only play one queued response at a time per psearch thread
This commit is contained in:
parent
f3014a235d
commit
75943bcd3f
@ -71,12 +71,11 @@ typedef struct syncops {
|
||||
#define PS_WROTE_BASE 0x04
|
||||
#define PS_FIND_BASE 0x08
|
||||
#define PS_FIX_FILTER 0x10
|
||||
#define PS_TASK_QUEUED 0x20
|
||||
|
||||
int s_inuse; /* reference count */
|
||||
struct syncres *s_res;
|
||||
struct syncres *s_restail;
|
||||
struct re_s *s_qtask; /* task for playing psearch responses */
|
||||
#define RUNQ_INTERVAL 36000 /* a long time */
|
||||
ldap_pvt_thread_mutex_t s_mutex;
|
||||
} syncops;
|
||||
|
||||
@ -746,13 +745,6 @@ syncprov_free_syncop( syncops *so )
|
||||
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
|
||||
return;
|
||||
}
|
||||
if ( so->s_qtask ) {
|
||||
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
|
||||
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) )
|
||||
ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
|
||||
ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask );
|
||||
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
|
||||
if ( so->s_flags & PS_IS_DETACHED ) {
|
||||
filter_free( so->s_op->ors_filter );
|
||||
@ -851,11 +843,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
|
||||
return rs.sr_err;
|
||||
}
|
||||
|
||||
static void
|
||||
syncprov_qstart( syncops *so );
|
||||
|
||||
/* Play back queued responses */
|
||||
static int
|
||||
syncprov_qplay( Operation *op, struct re_s *rtask )
|
||||
syncprov_qplay( Operation *op, syncops *so )
|
||||
{
|
||||
syncops *so = rtask->arg;
|
||||
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
|
||||
syncres *sr;
|
||||
Entry *e;
|
||||
@ -864,7 +858,7 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
|
||||
|
||||
opc.son = on;
|
||||
|
||||
for (;;) {
|
||||
do {
|
||||
ldap_pvt_thread_mutex_lock( &so->s_mutex );
|
||||
sr = so->s_res;
|
||||
if ( sr )
|
||||
@ -908,37 +902,31 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
|
||||
|
||||
ch_free( sr );
|
||||
|
||||
if ( rc ) {
|
||||
/* Exit loop with mutex held */
|
||||
ldap_pvt_thread_mutex_lock( &so->s_mutex );
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* Exit loop with mutex held */
|
||||
ldap_pvt_thread_mutex_lock( &so->s_mutex );
|
||||
|
||||
/* wait until we get explicitly scheduled again */
|
||||
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
|
||||
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
|
||||
if ( rc == 0 ) {
|
||||
ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
|
||||
} while (0);
|
||||
|
||||
/* We now only send one change at a time, to prevent one
|
||||
* psearch from hogging all the CPU. Resubmit this
|
||||
* task if there are more responses queued.
|
||||
*/
|
||||
|
||||
if ( so->s_res ) {
|
||||
syncprov_qstart( so );
|
||||
} else {
|
||||
/* bail out on any error */
|
||||
ldap_pvt_runqueue_remove( &slapd_rq, rtask );
|
||||
|
||||
/* Prevent duplicate remove */
|
||||
if ( so->s_qtask == rtask )
|
||||
so->s_qtask = NULL;
|
||||
so->s_flags ^= PS_TASK_QUEUED;
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* runqueue task for playing back queued responses */
|
||||
/* task for playing back queued responses */
|
||||
static void *
|
||||
syncprov_qtask( void *ctx, void *arg )
|
||||
{
|
||||
struct re_s *rtask = arg;
|
||||
syncops *so = rtask->arg;
|
||||
syncops *so = arg;
|
||||
OperationBuffer opbuf;
|
||||
Operation *op;
|
||||
BackendDB be;
|
||||
@ -963,22 +951,11 @@ syncprov_qtask( void *ctx, void *arg )
|
||||
LDAP_SLIST_FIRST(&op->o_extra) = NULL;
|
||||
op->o_callback = NULL;
|
||||
|
||||
rc = syncprov_qplay( op, rtask );
|
||||
rc = syncprov_qplay( op, so );
|
||||
|
||||
/* decrement use count... */
|
||||
syncprov_free_syncop( so );
|
||||
|
||||
#if 0 /* FIXME: connection_close isn't exported from slapd.
|
||||
* should it be?
|
||||
*/
|
||||
if ( rc ) {
|
||||
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
|
||||
if ( connection_state_closing( op->o_conn )) {
|
||||
connection_close( op->o_conn );
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
|
||||
}
|
||||
#endif
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -986,27 +963,10 @@ syncprov_qtask( void *ctx, void *arg )
|
||||
static void
|
||||
syncprov_qstart( syncops *so )
|
||||
{
|
||||
int wake=0;
|
||||
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
|
||||
if ( !so->s_qtask ) {
|
||||
so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
|
||||
syncprov_qtask, so, "syncprov_qtask",
|
||||
so->s_op->o_conn->c_peer_name.bv_val );
|
||||
++so->s_inuse;
|
||||
wake = 1;
|
||||
} else {
|
||||
if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
|
||||
!so->s_qtask->next_sched.tv_sec ) {
|
||||
so->s_qtask->interval.tv_sec = 0;
|
||||
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
|
||||
so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
|
||||
++so->s_inuse;
|
||||
wake = 1;
|
||||
}
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
|
||||
if ( wake )
|
||||
slap_wake_listener();
|
||||
so->s_flags |= PS_TASK_QUEUED;
|
||||
so->s_inuse++;
|
||||
ldap_pvt_thread_pool_submit( &connection_pool,
|
||||
syncprov_qtask, so );
|
||||
}
|
||||
|
||||
/* Queue a persistent search response */
|
||||
@ -1071,7 +1031,7 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
|
||||
so->s_flags ^= PS_WROTE_BASE;
|
||||
so->s_flags |= PS_FIND_BASE;
|
||||
}
|
||||
if ( so->s_flags & PS_IS_DETACHED ) {
|
||||
if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
|
||||
syncprov_qstart( so );
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
|
||||
|
Loading…
Reference in New Issue
Block a user