a runqueue for periodic thread execution (for syncrepl)

This commit is contained in:
Jong Hyuk Choi 2003-05-07 02:06:01 +00:00
parent 2ed0725491
commit 45776bff04
9 changed files with 153 additions and 17 deletions

12
include/ldap_rq.h Normal file
View File

@ -0,0 +1,12 @@
/* $OpenLDAP$ */
typedef struct re_s {
struct timeval next_sched;
struct timeval interval;
LDAP_STAILQ_ENTRY(re_s) next;
void *private;
} re_t;
typedef struct runqueue_s {
LDAP_STAILQ_HEAD(rl, re_s) run_list;
} runqueue_t;

View File

@ -19,10 +19,10 @@ XXSRCS = apitest.c test.c \
request.c os-ip.c url.c sortctrl.c vlvctrl.c \
init.c options.c print.c string.c util-int.c schema.c \
charray.c tls.c os-local.c dnssrv.c utf-8.c utf-8-conv.c
SRCS = threads.c rdwr.c tpool.c \
SRCS = threads.c rdwr.c tpool.c rq.c \
thr_posix.c thr_cthreads.c thr_thr.c thr_lwp.c thr_nt.c \
thr_pth.c thr_stub.c
OBJS = threads.lo rdwr.lo tpool.lo \
OBJS = threads.lo rdwr.lo tpool.lo rq.lo \
thr_posix.lo thr_cthreads.lo thr_thr.lo thr_lwp.lo thr_nt.lo \
thr_pth.lo thr_stub.lo \
bind.lo open.lo result.lo error.lo compare.lo search.lo \

83
libraries/libldap_r/rq.c Normal file
View File

@ -0,0 +1,83 @@
/* $OpenLDAP$ */
#include "portable.h"
#include <stdio.h>
#include <ac/stdarg.h>
#include <ac/stdlib.h>
#include <ac/string.h>
#include <ac/time.h>
#include <ac/errno.h>
#include "ldap-int.h"
#include "ldap_pvt_thread.h"
#include "ldap_queue.h"
#include "ldap_rq.h"
void
ldap_pvt_runqueue_insert(
struct runqueue_s* rq,
time_t interval,
void *private
)
{
struct re_s* entry;
entry = (struct re_s *) ch_calloc( 1, sizeof( struct re_s ));
entry->interval.tv_sec = interval;
entry->interval.tv_usec = 0;
entry->next_sched.tv_sec = time( NULL );
entry->next_sched.tv_usec = 0;
entry->private = private;
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
}
void
ldap_pvt_runqueue_next_sched(
struct runqueue_s* rq,
struct timeval** next_run,
void **private
)
{
struct re_s* entry;
entry = LDAP_STAILQ_FIRST( &rq->run_list );
if ( entry == NULL ) {
*next_run = NULL;
*private = NULL;
} else {
*next_run = &entry->next_sched;
*private = entry->private;
}
}
void
ldap_pvt_runqueue_resched(
struct runqueue_s* rq
)
{
struct re_s* entry;
struct re_s* prev;
struct re_s* e;
entry = LDAP_STAILQ_FIRST( &rq->run_list );
if ( entry == NULL ) {
return;
} else {
entry->next_sched.tv_sec = time( NULL ) + entry->interval.tv_sec;
LDAP_STAILQ_REMOVE_HEAD( &rq->run_list, next );
if ( LDAP_STAILQ_EMPTY( &rq->run_list )) {
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
} else {
prev = entry;
LDAP_STAILQ_FOREACH( e, &rq->run_list, next ) {
if ( e->next_sched.tv_sec > entry->next_sched.tv_sec ) {
if ( prev == entry ) {
LDAP_STAILQ_INSERT_HEAD( &rq->run_list, entry, next );
} else {
LDAP_STAILQ_INSERT_AFTER( &rq->run_list, prev, entry, next );
}
}
prev = e;
}
}
}
}

View File

@ -559,4 +559,4 @@ void *ldap_pvt_thread_pool_context( )
return thread_keys[i].ctx;
}
#endif /* LDAP_HAVE_THREAD_POOL */
#endif /* LDAP_THREAD_HAVE_TPOOL */

View File

@ -354,20 +354,9 @@ int backend_startup(Backend *be)
#ifdef LDAP_SYNCREPL
if ( backendDB[i].syncinfo != NULL ) {
int ret;
ret = ldap_pvt_thread_pool_submit( &syncrepl_pool,
do_syncrepl, (void *) &backendDB[i] );
if ( ret != 0 ) {
#ifdef NEW_LOGGING
LDAP_LOG( BACKEND, CRIT,
"syncrepl thread pool submit failed (%d)\n",
ret, 0, 0 );
#else
Debug( LDAP_DEBUG_ANY,
"ldap_pvt_thread_pool_submit failed (%d) \n",
ret, 0, 0 );
#endif
}
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
(void *) &backendDB[i] );
}
#endif
}

View File

@ -20,6 +20,10 @@
#include "lutil.h"
#include "slap.h"
#ifdef LDAP_SYNCREPL
#include "ldap_rq.h"
#endif
#ifdef HAVE_TCPD
#include <tcpd.h>
#define SLAP_STRING_UNKNOWN STRING_UNKNOWN
@ -1212,6 +1216,13 @@ slapd_daemon_task(
struct timeval zero;
struct timeval *tvp;
#ifdef LDAP_SYNCREPL
struct timeval diff;
struct timeval *cat;
BackendDB *db;
time_t tdelta = 1;
#endif
if( emfile ) {
now = slap_get_time();
connections_timeout_idle( now );
@ -1285,12 +1296,32 @@ slapd_daemon_task(
at = ldap_pvt_thread_pool_backload(&connection_pool);
#ifdef LDAP_SYNCREPL
/* cat is of struct timeval containing the earliest schedule */
ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat, &db );
if ( cat != NULL ) {
diff.tv_sec = cat->tv_sec - slap_get_time();
if ( diff.tv_sec == 0 )
diff.tv_sec = tdelta;
} else {
cat = NULL;
}
#endif
#if defined( HAVE_YIELDING_SELECT ) || defined( NO_THREADS )
tvp = NULL;
#else
tvp = at ? &zero : NULL;
#endif
#ifdef LDAP_SYNCREPL
if ( cat != NULL ) {
if ( tvp == NULL ) {
tvp = &diff;
}
}
#endif
for ( l = 0; slap_listeners[l] != NULL; l++ ) {
if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID )
continue;
@ -1354,6 +1385,15 @@ slapd_daemon_task(
Debug( LDAP_DEBUG_CONNS, "daemon: select timeout - yielding\n",
0, 0, 0 );
#endif
#ifdef LDAP_SYNCREPL
if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
ldap_pvt_thread_pool_submit( &connection_pool,
do_syncrepl, (void *) db );
/* FIXME : reschedule upon do_syncrepl termination */
ldap_pvt_runqueue_resched( &syncrepl_rq );
}
#endif
ldap_pvt_thread_yield();
continue;

View File

@ -1123,6 +1123,8 @@ LDAP_SLAPD_F (int) do_extended LDAP_P((Operation *op, SlapReply *rs));
*/
#ifdef LDAP_SYNCREPL
LDAP_SLAPD_V (struct runqueue_s) syncrepl_rq;
LDAP_SLAPD_F (void) init_syncrepl LDAP_P(());
LDAP_SLAPD_F (void*) do_syncrepl LDAP_P((void *, void *));
#endif

View File

@ -35,6 +35,8 @@
#ifdef LDAP_SYNCREPL
#include "ldap_rq.h"
static Entry*
syncrepl_message_to_entry ( LDAP *, Operation *, LDAPMessage *,
Modifications **, int*, struct berval *, struct berval * );
@ -75,6 +77,8 @@ static AttributeDescription **add_descs_lastmod;
static AttributeDescription **del_descs;
static AttributeDescription **del_descs_lastmod;
struct runqueue_s syncrepl_rq;
void
init_syncrepl()
{

View File

@ -13,6 +13,10 @@
#include "../slap.h"
#ifdef LDAP_SYNCREPL
#include "ldap_rq.h"
#endif
/* needed by WIN32 and back-monitor */
time_t starttime;
@ -248,6 +252,8 @@ int root_dse_info( Connection *conn, Entry **entry, const char **text )
}
#ifdef LDAP_SYNCREPL
struct runqueue_s syncrepl_rq;
void init_syncrepl( )
{
return -1;