From 75943bcd3fbabb8195f92acf288342bf35bc1ae6 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Thu, 5 Mar 2009 11:21:07 +0000 Subject: [PATCH] ITS#5985 Only play one queued response at a time per psearch thread --- servers/slapd/overlays/syncprov.c | 94 +++++++++---------------------- 1 file changed, 27 insertions(+), 67 deletions(-) diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 7520656cf7..a890f5be6f 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -71,12 +71,11 @@ typedef struct syncops { #define PS_WROTE_BASE 0x04 #define PS_FIND_BASE 0x08 #define PS_FIX_FILTER 0x10 +#define PS_TASK_QUEUED 0x20 int s_inuse; /* reference count */ struct syncres *s_res; struct syncres *s_restail; - struct re_s *s_qtask; /* task for playing psearch responses */ -#define RUNQ_INTERVAL 36000 /* a long time */ ldap_pvt_thread_mutex_t s_mutex; } syncops; @@ -746,13 +745,6 @@ syncprov_free_syncop( syncops *so ) ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return; } - if ( so->s_qtask ) { - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) ) - ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); - ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask ); - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); @@ -851,11 +843,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, return rs.sr_err; } +static void +syncprov_qstart( syncops *so ); + /* Play back queued responses */ static int -syncprov_qplay( Operation *op, struct re_s *rtask ) +syncprov_qplay( Operation *op, syncops *so ) { - syncops *so = rtask->arg; slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; Entry *e; @@ -864,7 +858,7 @@ syncprov_qplay( Operation *op, struct re_s *rtask ) opc.son = on; - for (;;) { + do { ldap_pvt_thread_mutex_lock( &so->s_mutex ); sr = so->s_res; if ( sr ) @@ -908,37 +902,31 @@ syncprov_qplay( Operation *op, struct re_s *rtask ) ch_free( sr ); - if ( rc ) { - /* Exit loop with mutex held */ - ldap_pvt_thread_mutex_lock( &so->s_mutex ); - break; - } - } + /* Exit loop with mutex held */ + ldap_pvt_thread_mutex_lock( &so->s_mutex ); - /* wait until we get explicitly scheduled again */ - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - ldap_pvt_runqueue_stoptask( &slapd_rq, rtask ); - if ( rc == 0 ) { - ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 ); + } while (0); + + /* We now only send one change at a time, to prevent one + * psearch from hogging all the CPU. Resubmit this + * task if there are more responses queued. + */ + + if ( so->s_res ) { + syncprov_qstart( so ); } else { - /* bail out on any error */ - ldap_pvt_runqueue_remove( &slapd_rq, rtask ); - - /* Prevent duplicate remove */ - if ( so->s_qtask == rtask ) - so->s_qtask = NULL; + so->s_flags ^= PS_TASK_QUEUED; } - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return rc; } -/* runqueue task for playing back queued responses */ +/* task for playing back queued responses */ static void * syncprov_qtask( void *ctx, void *arg ) { - struct re_s *rtask = arg; - syncops *so = rtask->arg; + syncops *so = arg; OperationBuffer opbuf; Operation *op; BackendDB be; @@ -963,22 +951,11 @@ syncprov_qtask( void *ctx, void *arg ) LDAP_SLIST_FIRST(&op->o_extra) = NULL; op->o_callback = NULL; - rc = syncprov_qplay( op, rtask ); + rc = syncprov_qplay( op, so ); /* decrement use count... */ syncprov_free_syncop( so ); -#if 0 /* FIXME: connection_close isn't exported from slapd. - * should it be? - */ - if ( rc ) { - ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); - if ( connection_state_closing( op->o_conn )) { - connection_close( op->o_conn ); - } - ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); - } -#endif return NULL; } @@ -986,27 +963,10 @@ syncprov_qtask( void *ctx, void *arg ) static void syncprov_qstart( syncops *so ) { - int wake=0; - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - if ( !so->s_qtask ) { - so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, - syncprov_qtask, so, "syncprov_qtask", - so->s_op->o_conn->c_peer_name.bv_val ); - ++so->s_inuse; - wake = 1; - } else { - if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && - !so->s_qtask->next_sched.tv_sec ) { - so->s_qtask->interval.tv_sec = 0; - ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); - so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; - ++so->s_inuse; - wake = 1; - } - } - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - if ( wake ) - slap_wake_listener(); + so->s_flags |= PS_TASK_QUEUED; + so->s_inuse++; + ldap_pvt_thread_pool_submit( &connection_pool, + syncprov_qtask, so ); } /* Queue a persistent search response */ @@ -1071,7 +1031,7 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode ) so->s_flags ^= PS_WROTE_BASE; so->s_flags |= PS_FIND_BASE; } - if ( so->s_flags & PS_IS_DETACHED ) { + if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { syncprov_qstart( so ); } ldap_pvt_thread_mutex_unlock( &so->s_mutex );