Use a thread for LDIF parsing in slapadd -q

This commit is contained in:
Howard Chu 2011-10-04 19:19:25 -07:00
parent 1adc0b9b1c
commit 34adb86929
2 changed files with 223 additions and 131 deletions

View File

@ -101,6 +101,8 @@ static void * bdb_tool_index_task( void *ctx, void *ptr );
static int
bdb_tool_entry_get_int( BackendDB *be, ID id, Entry **ep );
static int bdb_tool_threads;
int bdb_tool_entry_open(
BackendDB *be, int mode )
{
@ -138,10 +140,11 @@ int bdb_tool_entry_open(
ldap_pvt_thread_cond_init( &bdb_tool_index_cond_work );
if ( bdb->bi_nattrs ) {
int i;
bdb_tool_index_threads = ch_malloc( slap_tool_thread_max * sizeof( int ));
bdb_tool_threads = slap_tool_thread_max - 1;
bdb_tool_index_threads = ch_malloc( bdb_tool_threads * sizeof( int ));
bdb_tool_index_rec = ch_malloc( bdb->bi_nattrs * sizeof( IndexRec ));
bdb_tool_index_tcount = slap_tool_thread_max - 1;
for (i=1; i<slap_tool_thread_max; i++) {
bdb_tool_index_tcount = bdb_tool_threads - 1;
for (i=1; i<bdb_tool_threads; i++) {
int *ptr = ch_malloc( sizeof( int ));
*ptr = i;
ldap_pvt_thread_pool_submit( &connection_pool,
@ -182,7 +185,7 @@ int bdb_tool_entry_close(
&bdb_tool_index_mutex );
}
bdb_tool_index_tcount = slap_tool_thread_max - 1;
bdb_tool_index_tcount = bdb_tool_threads - 1;
ldap_pvt_thread_cond_broadcast( &bdb_tool_index_cond_work );
/* Make sure all threads are stopped */
@ -196,7 +199,7 @@ int bdb_tool_entry_close(
slapd_shutdown = 0;
ch_free( bdb_tool_index_threads );
ch_free( bdb_tool_index_rec );
bdb_tool_index_tcount = slap_tool_thread_max - 1;
bdb_tool_index_tcount = bdb_tool_threads - 1;
}
if( eh.bv.bv_val ) {
@ -602,16 +605,16 @@ bdb_tool_index_add(
ldap_pvt_thread_cond_wait( &bdb_tool_index_cond_main,
&bdb_tool_index_mutex );
}
for ( i=1; i<slap_tool_thread_max; i++ )
for ( i=1; i<bdb_tool_threads; i++ )
bdb_tool_index_threads[i] = LDAP_BUSY;
bdb_tool_index_tcount = slap_tool_thread_max - 1;
bdb_tool_index_tcount = bdb_tool_threads - 1;
ldap_pvt_thread_cond_broadcast( &bdb_tool_index_cond_work );
ldap_pvt_thread_mutex_unlock( &bdb_tool_index_mutex );
rc = bdb_index_recrun( op, bdb, ir, e->e_id, 0 );
if ( rc )
return rc;
ldap_pvt_thread_mutex_lock( &bdb_tool_index_mutex );
for ( i=1; i<slap_tool_thread_max; i++ ) {
for ( i=1; i<bdb_tool_threads; i++ ) {
if ( bdb_tool_index_threads[i] == LDAP_BUSY ) {
ldap_pvt_thread_cond_wait( &bdb_tool_index_cond_main,
&bdb_tool_index_mutex );

View File

@ -42,110 +42,62 @@
static char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ];
int
slapadd( int argc, char **argv )
typedef struct Erec {
Entry *e;
int lineno;
int nextline;
} Erec;
typedef struct Trec {
Entry *e;
int lineno;
int nextline;
int rc;
int ready;
} Trec;
static Trec trec;
static unsigned long sid = SLAP_SYNC_SID_MAX + 1;
static int checkvals;
static int enable_meter;
static lutil_meter_t meter;
static const char *progname = "slapadd";
static OperationBuffer opbuf;
static char *buf;
static ldap_pvt_thread_mutex_t add_mutex;
static ldap_pvt_thread_cond_t add_cond_r, add_cond_w;
static int add_stop;
/* returns:
* 1: got a record
* 0: EOF
* -1: read failure
* -2: parse failure
*/
static int
getrec0(Erec *erec)
{
char *buf = NULL;
const char *text;
int ldifrc, lmax = 0;
char textbuf[SLAP_TEXT_BUFLEN] = { '\0' };
size_t textlen = sizeof textbuf;
const char *progname = "slapadd";
struct berval csn;
unsigned long sid = SLAP_SYNC_SID_MAX + 1;
struct berval bvtext;
ID id;
OperationBuffer opbuf;
Operation *op;
int checkvals;
int lineno, nextline, ldifrc;
int lmax;
int rc = EXIT_SUCCESS;
int enable_meter = 0;
lutil_meter_t meter;
struct stat stat_buf;
/* default "000" */
csnsid = 0;
if ( isatty (2) ) enable_meter = 1;
slap_tool_init( progname, SLAPADD, argc, argv );
memset( &opbuf, 0, sizeof(opbuf) );
op = &opbuf.ob_op;
Operation *op = &opbuf.ob_op;
op->o_hdr = &opbuf.ob_hdr;
if( !be->be_entry_open ||
!be->be_entry_close ||
!be->be_entry_put ||
(update_ctxcsn &&
(!be->be_dn2id_get ||
!be->be_entry_get ||
!be->be_entry_modify)) )
{
fprintf( stderr, "%s: database doesn't support necessary operations.\n",
progname );
if ( dryrun ) {
fprintf( stderr, "\t(dry) continuing...\n" );
} else {
exit( EXIT_FAILURE );
}
}
checkvals = (slapMode & SLAP_TOOL_QUICK) ? 0 : 1;
/* do not check values in quick mode */
if ( slapMode & SLAP_TOOL_QUICK ) {
if ( slapMode & SLAP_TOOL_VALUE_CHECK ) {
fprintf( stderr, "%s: value-check incompatible with quick mode; disabled.\n", progname );
slapMode &= ~SLAP_TOOL_VALUE_CHECK;
}
}
lmax = 0;
nextline = 0;
/* enforce schema checking unless not disabled */
if ( (slapMode & SLAP_TOOL_NO_SCHEMA_CHECK) == 0) {
SLAP_DBFLAGS(be) &= ~(SLAP_DBFLAG_NO_SCHEMA_CHECK);
}
if( !dryrun && be->be_entry_open( be, 1 ) != 0 ) {
fprintf( stderr, "%s: could not open database.\n",
progname );
exit( EXIT_FAILURE );
}
(void)slap_tool_update_ctxcsn_init();
if ( enable_meter
#ifdef LDAP_DEBUG
/* tools default to "none" */
&& slap_debug == LDAP_DEBUG_NONE
#endif
&& !fstat ( fileno ( ldiffp->fp ), &stat_buf )
&& S_ISREG(stat_buf.st_mode) ) {
enable_meter = !lutil_meter_open(
&meter,
&lutil_meter_text_display,
&lutil_meter_linear_estimator,
stat_buf.st_size);
} else {
enable_meter = 0;
}
again:
erec->lineno = erec->nextline+1;
/* nextline is the line number of the end of the current entry */
for( lineno=1; ( ldifrc = ldif_read_record( ldiffp, &nextline, &buf, &lmax )) > 0;
lineno=nextline+1 )
ldifrc = ldif_read_record( ldiffp, &erec->nextline, &buf, &lmax );
if (ldifrc < 1)
return ldifrc < 0 ? -1 : 0;
{
BackendDB *bd;
Entry *e;
if ( lineno < jumpline )
continue;
if ( erec->lineno < jumpline )
goto again;
e = str2entry2( buf, checkvals );
@ -154,19 +106,10 @@ slapadd( int argc, char **argv )
ftell( ldiffp->fp ),
0);
/*
* Initialize text buffer
*/
bvtext.bv_len = textlen;
bvtext.bv_val = textbuf;
bvtext.bv_val[0] = '\0';
if( e == NULL ) {
fprintf( stderr, "%s: could not parse entry (line=%d)\n",
progname, lineno );
rc = EXIT_FAILURE;
if( continuemode ) continue;
break;
progname, erec->lineno );
return -2;
}
/* make sure the DN is not empty */
@ -175,7 +118,7 @@ slapadd( int argc, char **argv )
{
fprintf( stderr, "%s: line %d: "
"cannot add entry with empty dn=\"%s\"",
progname, lineno, e->e_dn );
progname, erec->lineno, e->e_dn );
bd = select_backend( &e->e_nname, nosubordinates );
if ( bd ) {
BackendDB *bdtmp;
@ -193,10 +136,8 @@ slapadd( int argc, char **argv )
}
fprintf( stderr, "\n" );
rc = EXIT_FAILURE;
entry_free( e );
if( continuemode ) continue;
break;
return -2;
}
/* check backend */
@ -204,7 +145,7 @@ slapadd( int argc, char **argv )
if ( bd != be ) {
fprintf( stderr, "%s: line %d: "
"database #%d (%s) not configured to hold \"%s\"",
progname, lineno,
progname, erec->lineno,
dbnum,
be->be_suffix[0].bv_val,
e->e_dn );
@ -226,18 +167,14 @@ slapadd( int argc, char **argv )
fprintf( stderr, "; no database configured for that naming context" );
}
fprintf( stderr, "\n" );
rc = EXIT_FAILURE;
entry_free( e );
if( continuemode ) continue;
break;
return -2;
}
rc = slap_tool_entry_check( progname, op, e, lineno, &text, textbuf, textlen );
if ( rc != LDAP_SUCCESS ) {
rc = EXIT_FAILURE;
if ( slap_tool_entry_check( progname, op, e, erec->lineno, &text, textbuf, textlen ) !=
LDAP_SUCCESS ) {
entry_free( e );
if( continuemode ) continue;
break;
return -2;
}
if ( SLAP_LASTMOD(be) ) {
@ -342,28 +279,180 @@ slapadd( int argc, char **argv )
sid = slap_tool_update_ctxcsn_check( progname, e );
}
erec->e = e;
}
return 1;
}
static void *
getrec_thr(void *ctx)
{
ldap_pvt_thread_mutex_lock( &add_mutex );
while (!add_stop) {
trec.rc = getrec0((Erec *)&trec);
trec.ready = 1;
ldap_pvt_thread_cond_signal( &add_cond_w );
while (trec.ready)
ldap_pvt_thread_cond_wait( &add_cond_r, &add_mutex );
/* eof or read failure */
if ( trec.rc == 0 || trec.rc == -1 )
break;
}
ldap_pvt_thread_mutex_unlock( &add_mutex );
return NULL;
}
static int
getrec(Erec *erec)
{
int rc;
if ( slap_tool_thread_max < 2 )
return getrec0(erec);
ldap_pvt_thread_mutex_lock( &add_mutex );
while (!trec.ready)
ldap_pvt_thread_cond_wait( &add_cond_w, &add_mutex );
erec->e = trec.e;
erec->lineno = trec.lineno;
erec->nextline = trec.nextline;
trec.ready = 0;
rc = trec.rc;
ldap_pvt_thread_mutex_unlock( &add_mutex );
ldap_pvt_thread_cond_signal( &add_cond_r );
return rc;
}
int
slapadd( int argc, char **argv )
{
char textbuf[SLAP_TEXT_BUFLEN] = { '\0' };
size_t textlen = sizeof textbuf;
Erec erec;
struct berval bvtext;
ldap_pvt_thread_t thr;
ID id;
int ldifrc;
int rc = EXIT_SUCCESS;
struct stat stat_buf;
/* default "000" */
csnsid = 0;
if ( isatty (2) ) enable_meter = 1;
slap_tool_init( progname, SLAPADD, argc, argv );
if( !be->be_entry_open ||
!be->be_entry_close ||
!be->be_entry_put ||
(update_ctxcsn &&
(!be->be_dn2id_get ||
!be->be_entry_get ||
!be->be_entry_modify)) )
{
fprintf( stderr, "%s: database doesn't support necessary operations.\n",
progname );
if ( dryrun ) {
fprintf( stderr, "\t(dry) continuing...\n" );
} else {
exit( EXIT_FAILURE );
}
}
checkvals = (slapMode & SLAP_TOOL_QUICK) ? 0 : 1;
/* do not check values in quick mode */
if ( slapMode & SLAP_TOOL_QUICK ) {
if ( slapMode & SLAP_TOOL_VALUE_CHECK ) {
fprintf( stderr, "%s: value-check incompatible with quick mode; disabled.\n", progname );
slapMode &= ~SLAP_TOOL_VALUE_CHECK;
}
}
/* enforce schema checking unless not disabled */
if ( (slapMode & SLAP_TOOL_NO_SCHEMA_CHECK) == 0) {
SLAP_DBFLAGS(be) &= ~(SLAP_DBFLAG_NO_SCHEMA_CHECK);
}
if( !dryrun && be->be_entry_open( be, 1 ) != 0 ) {
fprintf( stderr, "%s: could not open database.\n",
progname );
exit( EXIT_FAILURE );
}
(void)slap_tool_update_ctxcsn_init();
if ( enable_meter
#ifdef LDAP_DEBUG
/* tools default to "none" */
&& slap_debug == LDAP_DEBUG_NONE
#endif
&& !fstat ( fileno ( ldiffp->fp ), &stat_buf )
&& S_ISREG(stat_buf.st_mode) ) {
enable_meter = !lutil_meter_open(
&meter,
&lutil_meter_text_display,
&lutil_meter_linear_estimator,
stat_buf.st_size);
} else {
enable_meter = 0;
}
if ( slap_tool_thread_max > 1 ) {
ldap_pvt_thread_mutex_init( &add_mutex );
ldap_pvt_thread_cond_init( &add_cond_r );
ldap_pvt_thread_cond_init( &add_cond_w );
ldap_pvt_thread_create( &thr, 0, getrec_thr, NULL );
}
erec.nextline = 0;
erec.e = NULL;
for (;;) {
ldifrc = getrec( &erec );
if ( ldifrc < 1 ) {
if ( ldifrc == -2 && continuemode )
continue;
break;
}
if ( !dryrun ) {
id = be->be_entry_put( be, e, &bvtext );
/*
* Initialize text buffer
*/
bvtext.bv_len = textlen;
bvtext.bv_val = textbuf;
bvtext.bv_val[0] = '\0';
id = be->be_entry_put( be, erec.e, &bvtext );
if( id == NOID ) {
fprintf( stderr, "%s: could not add entry dn=\"%s\" "
"(line=%d): %s\n", progname, e->e_dn,
lineno, bvtext.bv_val );
"(line=%d): %s\n", progname, erec.e->e_dn,
erec.lineno, bvtext.bv_val );
rc = EXIT_FAILURE;
entry_free( e );
entry_free( erec.e );
if( continuemode ) continue;
break;
}
if ( verbose )
fprintf( stderr, "added: \"%s\" (%08lx)\n",
e->e_dn, (long) id );
erec.e->e_dn, (long) id );
} else {
if ( verbose )
fprintf( stderr, "added: \"%s\"\n",
e->e_dn );
erec.e->e_dn );
}
entry_free( e );
entry_free( erec.e );
}
if ( slap_tool_thread_max > 1 ) {
add_stop = 1;
trec.ready = 0;
ldap_pvt_thread_cond_signal( &add_cond_r );
ldap_pvt_thread_join( thr, NULL );
}
if ( ldifrc < 0 )