mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-01-18 11:05:48 +08:00
runqueue update
This commit is contained in:
parent
e64bb13148
commit
580ae073e1
@ -1,12 +1,62 @@
|
||||
/* $OpenLDAP$ */
|
||||
|
||||
#ifdef LDAP_SYNCREPL
|
||||
|
||||
typedef struct re_s {
|
||||
struct timeval next_sched;
|
||||
struct timeval interval;
|
||||
LDAP_STAILQ_ENTRY(re_s) next;
|
||||
LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */
|
||||
LDAP_STAILQ_ENTRY(re_s) rnext;
|
||||
void *private;
|
||||
} re_t;
|
||||
|
||||
typedef struct runqueue_s {
|
||||
LDAP_STAILQ_HEAD(l, re_s) task_list;
|
||||
LDAP_STAILQ_HEAD(rl, re_s) run_list;
|
||||
ldap_pvt_thread_mutex_t rq_mutex;
|
||||
} runqueue_t;
|
||||
|
||||
LDAP_F( void )
|
||||
ldap_pvt_runqueue_insert(
|
||||
struct runqueue_s* rq,
|
||||
time_t interval,
|
||||
void *private
|
||||
);
|
||||
|
||||
LDAP_F( void )
|
||||
ldap_pvt_runqueue_remove(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
);
|
||||
|
||||
LDAP_F( struct re_s* )
|
||||
ldap_pvt_runqueue_next_sched(
|
||||
struct runqueue_s* rq,
|
||||
struct timeval** next_run
|
||||
);
|
||||
|
||||
LDAP_F( void )
|
||||
ldap_pvt_runqueue_runtask(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
);
|
||||
|
||||
LDAP_F( void )
|
||||
ldap_pvt_runqueue_stoptask(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
);
|
||||
|
||||
LDAP_F( int )
|
||||
ldap_pvt_runqueue_isrunning(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
);
|
||||
|
||||
LDAP_F( void )
|
||||
ldap_pvt_runqueue_resched(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
);
|
||||
|
||||
#endif
|
||||
|
@ -14,6 +14,8 @@
|
||||
#include "ldap_queue.h"
|
||||
#include "ldap_rq.h"
|
||||
|
||||
#ifdef LDAP_SYNCREPL
|
||||
|
||||
void
|
||||
ldap_pvt_runqueue_insert(
|
||||
struct runqueue_s* rq,
|
||||
@ -22,62 +24,123 @@ ldap_pvt_runqueue_insert(
|
||||
)
|
||||
{
|
||||
struct re_s* entry;
|
||||
|
||||
entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
|
||||
entry->interval.tv_sec = interval;
|
||||
entry->interval.tv_usec = 0;
|
||||
entry->next_sched.tv_sec = time( NULL );
|
||||
entry->next_sched.tv_usec = 0;
|
||||
entry->private = private;
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
|
||||
}
|
||||
|
||||
void
|
||||
ldap_pvt_runqueue_remove(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
)
|
||||
{
|
||||
struct re_s* e;
|
||||
|
||||
LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
|
||||
if ( e == entry)
|
||||
break;
|
||||
}
|
||||
|
||||
assert ( e == entry );
|
||||
|
||||
LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
|
||||
|
||||
ch_free( entry );
|
||||
|
||||
}
|
||||
|
||||
struct re_s*
|
||||
ldap_pvt_runqueue_next_sched(
|
||||
struct runqueue_s* rq,
|
||||
struct timeval** next_run,
|
||||
void **private
|
||||
struct timeval** next_run
|
||||
)
|
||||
{
|
||||
struct re_s* entry;
|
||||
entry = LDAP_STAILQ_FIRST( &rq->run_list );
|
||||
|
||||
entry = LDAP_STAILQ_FIRST( &rq->task_list );
|
||||
if ( entry == NULL ) {
|
||||
*next_run = NULL;
|
||||
*private = NULL;
|
||||
return NULL;
|
||||
} else {
|
||||
*next_run = &entry->next_sched;
|
||||
*private = entry->private;
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ldap_pvt_runqueue_runtask(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
)
|
||||
{
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, rnext );
|
||||
}
|
||||
|
||||
void
|
||||
ldap_pvt_runqueue_stoptask(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
)
|
||||
{
|
||||
LDAP_STAILQ_REMOVE( &rq->run_list, entry, re_s, rnext );
|
||||
}
|
||||
|
||||
int
|
||||
ldap_pvt_runqueue_isrunning(
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
)
|
||||
{
|
||||
struct re_s* e;
|
||||
|
||||
LDAP_STAILQ_FOREACH( e, &rq->run_list, rnext ) {
|
||||
if ( e == entry ) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
ldap_pvt_runqueue_resched(
|
||||
struct runqueue_s* rq
|
||||
struct runqueue_s* rq,
|
||||
struct re_s* entry
|
||||
)
|
||||
{
|
||||
struct re_s* entry;
|
||||
struct re_s* prev;
|
||||
struct re_s* e;
|
||||
|
||||
entry = LDAP_STAILQ_FIRST( &rq->run_list );
|
||||
if ( entry == NULL ) {
|
||||
return;
|
||||
LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
|
||||
if ( e == entry )
|
||||
break;
|
||||
}
|
||||
|
||||
assert ( e == entry );
|
||||
|
||||
LDAP_STAILQ_REMOVE( &rq->task_list, entry, re_s, tnext );
|
||||
|
||||
entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
|
||||
if ( LDAP_STAILQ_EMPTY( &rq->task_list )) {
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
|
||||
} else {
|
||||
entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
|
||||
LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
|
||||
if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
|
||||
} else {
|
||||
prev = entry;
|
||||
LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
|
||||
if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
|
||||
if ( prev == entry ) {
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
|
||||
} else {
|
||||
LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
|
||||
}
|
||||
prev = NULL;
|
||||
LDAP_STAILQ_FOREACH( e, &rq->task_list, tnext ) {
|
||||
if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
|
||||
if ( prev == NULL ) {
|
||||
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
|
||||
} else {
|
||||
LDAP_STAILQ_INSERT_AFTER( &rq->task_list, prev, entry, tnext );
|
||||
}
|
||||
prev = e;
|
||||
}
|
||||
prev = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -18,6 +18,8 @@
|
||||
#include "lutil.h"
|
||||
#include "lber_pvt.h"
|
||||
|
||||
#include "ldap_rq.h"
|
||||
|
||||
#ifdef LDAP_SLAPI
|
||||
#include "slapi.h"
|
||||
#endif
|
||||
@ -331,6 +333,12 @@ int backend_startup(Backend *be)
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef LDAP_SYNCREPL
|
||||
ldap_pvt_thread_mutex_init( &syncrepl_rq.rq_mutex );
|
||||
LDAP_STAILQ_INIT( &syncrepl_rq.task_list );
|
||||
LDAP_STAILQ_INIT( &syncrepl_rq.run_list );
|
||||
#endif
|
||||
|
||||
/* open each backend database */
|
||||
for( i = 0; i < nBackendDB; i++ ) {
|
||||
/* append global access controls */
|
||||
@ -355,8 +363,10 @@ int backend_startup(Backend *be)
|
||||
#ifdef LDAP_SYNCREPL
|
||||
if ( backendDB[i].syncinfo != NULL ) {
|
||||
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
|
||||
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
|
||||
ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
|
||||
(void *) &backendDB[i] );
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -1219,8 +1219,8 @@ slapd_daemon_task(
|
||||
#ifdef LDAP_SYNCREPL
|
||||
struct timeval diff;
|
||||
struct timeval *cat;
|
||||
BackendDB *db;
|
||||
time_t tdelta = 1;
|
||||
struct re_s* rtask;
|
||||
#endif
|
||||
|
||||
now = slap_get_time();
|
||||
@ -1298,13 +1298,13 @@ slapd_daemon_task(
|
||||
|
||||
#ifdef LDAP_SYNCREPL
|
||||
/* cat is of struct timeval containing the earliest schedule */
|
||||
ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
|
||||
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
|
||||
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
if ( cat != NULL ) {
|
||||
diff.tv_sec = cat->tv_sec - slap_get_time();
|
||||
if ( diff.tv_sec == 0 )
|
||||
diff.tv_sec = tdelta;
|
||||
} else {
|
||||
cat = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -1387,11 +1387,19 @@ slapd_daemon_task(
|
||||
#endif
|
||||
|
||||
#ifdef LDAP_SYNCREPL
|
||||
if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
|
||||
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
|
||||
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
|
||||
if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
|
||||
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
} else if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
|
||||
ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask );
|
||||
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
ldap_pvt_thread_pool_submit( &connection_pool,
|
||||
do_syncrepl, (void *) db );
|
||||
/* FIXME : reschedule upon do_syncrepl termination */
|
||||
ldap_pvt_runqueue_resched( &syncrepl_rq );
|
||||
do_syncrepl, (void *) rtask );
|
||||
} else {
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
}
|
||||
#endif
|
||||
ldap_pvt_thread_yield();
|
||||
|
@ -118,7 +118,8 @@ do_syncrepl(
|
||||
void *ctx,
|
||||
void *arg )
|
||||
{
|
||||
Backend *be = arg;
|
||||
struct re_s* rtask = arg;
|
||||
Backend *be = rtask->private;
|
||||
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
|
||||
|
||||
SlapReply rs = {REP_RESULT};
|
||||
@ -634,7 +635,6 @@ do_syncrepl(
|
||||
Debug( LDAP_DEBUG_ANY,
|
||||
"do_syncrepl : unknown result\n", 0, 0, 0 );
|
||||
#endif
|
||||
return NULL;
|
||||
}
|
||||
|
||||
done:
|
||||
@ -646,6 +646,16 @@ done:
|
||||
if ( res )
|
||||
ldap_msgfree( res );
|
||||
ldap_unbind( ld );
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
|
||||
ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
|
||||
if ( si->type == LDAP_SYNC_REFRESH_ONLY ) {
|
||||
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
|
||||
} else {
|
||||
ldap_pvt_runqueue_remove( &syncrepl_rq, rtask );
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -256,11 +256,11 @@ struct runqueue_s syncrepl_rq;
|
||||
|
||||
void init_syncrepl( )
|
||||
{
|
||||
return -1;
|
||||
return;
|
||||
}
|
||||
|
||||
void* do_syncrepl( void *ctx, void *arg )
|
||||
{
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user