Flesh out txn support

Only in back-mdb; back-bdb needs work but it's deprecated;
not worth the effort. In particular txn, retry after deadlocks
makes the whole thing too messy.
This commit is contained in:
Howard Chu 2014-09-15 21:44:20 +01:00
parent 20407ec5da
commit 32f05e96f7
6 changed files with 163 additions and 16 deletions

View File

@ -526,6 +526,29 @@ ok:
return 0;
}
#ifdef LDAP_X_TXN
int mdb_txn( Operation *op, int txnop, OpExtra **ptr )
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
mdb_op_info **moip = (mdb_op_info **)ptr, *moi = *moip;
int rc;
switch( txnop ) {
case SLAP_TXN_BEGIN:
return mdb_opinfo_get( op, mdb, 0, moip );
case SLAP_TXN_COMMIT:
rc = mdb_txn_commit( moi->moi_txn );
op->o_tmpfree( op->o_tmpmemctx, moi );
return rc;
case SLAP_TXN_ABORT:
mdb_txn_abort( moi->moi_txn );
op->o_tmpfree( op->o_tmpmemctx, moi );
return 0;
}
return LDAP_OTHER;
}
#endif
/* Count up the sizes of the components of an entry */
static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
Ecount *eh)

View File

@ -456,6 +456,7 @@ mdb_back_initialize(
bi->bi_op_search = mdb_search;
bi->bi_op_unbind = 0;
bi->bi_op_txn = mdb_txn;
bi->bi_extended = mdb_extended;

View File

@ -196,6 +196,7 @@ int mdb_id2edata(
int mdb_entry_return( Operation *op, Entry *e );
BI_entry_release_rw mdb_entry_release;
BI_entry_get_rw mdb_entry_get;
BI_op_txn mdb_txn;
int mdb_entry_decode( Operation *op, MDB_txn *txn, MDB_val *data, Entry **e );

View File

@ -1172,8 +1172,13 @@ operations_error:
ber_set_option( op->o_ber, LBER_OPT_BER_MEMCTX, &memctx_null );
LDAP_STAILQ_REMOVE( &conn->c_ops, op, Operation, o_next);
LDAP_STAILQ_NEXT(op, o_next) = NULL;
#ifdef LDAP_X_TXN
if ( rc != LDAP_X_TXN_SPECIFY_OKAY )
#endif
{
LDAP_STAILQ_REMOVE( &conn->c_ops, op, Operation, o_next);
LDAP_STAILQ_NEXT(op, o_next) = NULL;
}
conn->c_n_ops_executing--;
conn->c_n_ops_completed++;
@ -1188,7 +1193,12 @@ operations_error:
connection_resched( conn );
ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
slap_op_free( op, ctx );
#ifdef LDAP_X_TXN
if ( rc != LDAP_X_TXN_SPECIFY_OKAY )
#endif
{
slap_op_free( op, ctx );
}
return NULL;
}

View File

@ -2183,6 +2183,13 @@ typedef int (BI_acl_group) LDAP_P(( Operation *op, Entry *target,
typedef int (BI_acl_attribute) LDAP_P(( Operation *op, Entry *target,
struct berval *entry_ndn, AttributeDescription *entry_at,
BerVarray *vals, slap_access_t access ));
#ifdef LDAP_X_TXN
struct OpExtra;
typedef int (BI_op_txn) LDAP_P(( Operation *op, int txnop, struct OpExtra **ptr ));
#define SLAP_TXN_BEGIN 1
#define SLAP_TXN_COMMIT 2
#define SLAP_TXN_ABORT 3
#endif
typedef int (BI_conn_func) LDAP_P(( BackendDB *bd, Connection *c ));
typedef BI_conn_func BI_connection_init;
@ -2279,6 +2286,9 @@ struct BackendInfo {
BI_operational *bi_operational;
BI_chk_referrals *bi_chk_referrals;
BI_chk_controls *bi_chk_controls;
#ifdef LDAP_X_TXN
BI_op_txn *bi_op_txn;
#endif
BI_entry_get_rw *bi_entry_get_rw;
BI_entry_release_rw *bi_entry_release_rw;
@ -2402,6 +2412,9 @@ typedef enum slap_operation_e {
op_aux_operational,
op_aux_chk_referrals,
op_aux_chk_controls,
#ifdef LDAP_X_TXN
op_txn,
#endif
op_last
} slap_operation_t;

View File

@ -111,6 +111,11 @@ int txn_spec_ctrl(
return LDAP_SUCCESS;
}
static int txn_result( Operation *op, SlapReply *rs )
{
return rs->sr_err;
}
int txn_end_extop(
Operation *op, SlapReply *rs )
{
@ -121,6 +126,8 @@ int txn_end_extop(
ber_len_t len;
ber_int_t commit=1;
struct berval txnid;
Operation *o, *p;
Connection *c = op->o_conn;
Statslog( LDAP_DEBUG_STATS, "%s TXN END\n",
op->o_log_prefix, 0, 0, 0, 0 );
@ -134,7 +141,7 @@ int txn_end_extop(
return LDAP_PROTOCOL_ERROR;
}
op->o_bd = op->o_conn->c_authz_backend;
op->o_bd = c->c_authz_backend;
if( backend_check_restrictions( op, rs,
(struct berval *)&slap_EXOP_TXN_END ) != LDAP_SUCCESS )
{
@ -170,48 +177,140 @@ int txn_end_extop(
}
/* acquire connection lock */
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
ldap_pvt_thread_mutex_lock( &c->c_mutex );
if( op->o_conn->c_txn != CONN_TXN_SPECIFY ) {
if( c->c_txn != CONN_TXN_SPECIFY ) {
rs->sr_text = "invalid transaction identifier";
rc = LDAP_X_TXN_ID_INVALID;
goto done;
}
op->o_conn->c_txn = CONN_TXN_SETTLE;
c->c_txn = CONN_TXN_SETTLE;
if( commit ) {
slap_callback cb = {0};
OpExtra *txn = NULL;
if ( op->o_abandon ) {
goto drain;
}
if( LDAP_STAILQ_EMPTY(&op->o_conn->c_txn_ops) ) {
if( LDAP_STAILQ_EMPTY(&c->c_txn_ops) ) {
/* no updates to commit */
rs->sr_text = "no updates to commit";
rc = LDAP_OPERATIONS_ERROR;
goto settled;
}
rs->sr_text = "not yet implemented";
rc = LDAP_UNWILLING_TO_PERFORM;
cb.sc_response = txn_result;
LDAP_STAILQ_FOREACH( o, &c->c_txn_ops, o_next ) {
o->o_bd = c->c_txn_backend;
p = o;
if ( !txn ) {
rc = o->o_bd->bd_info->bi_op_txn(o, SLAP_TXN_BEGIN, &txn );
if ( rc ) {
rs->sr_text = "couldn't start DB transaction";
rc = LDAP_OTHER;
goto drain;
}
} else {
LDAP_SLIST_INSERT_HEAD( &o->o_extra, txn, oe_next );
}
cb.sc_next = o->o_callback;
o->o_callback = &cb;
{
SlapReply rs = {REP_RESULT};
int opidx = slap_req2op( o->o_tag );
assert( opidx != SLAP_OP_LAST );
o->o_threadctx = op->o_threadctx;
o->o_tid = op->o_tid;
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
rc = (&o->o_bd->bd_info->bi_op_bind)[opidx]( o, &rs );
ldap_pvt_thread_mutex_lock( &c->c_mutex );
}
if ( rc ) {
struct berval *bv = NULL;
BerElementBuffer berbuf;
BerElement *ber = (BerElement *)&berbuf;
ber_init_w_nullc( ber, LBER_USE_DER );
ber_printf( ber, "{i}", o->o_msgid );
ber_flatten( ber, &bv );
ber_free_buf( ber );
rs->sr_rspdata = bv;
o->o_bd->bd_info->bi_op_txn(o, SLAP_TXN_ABORT, &txn );
goto drain;
}
}
o = p;
rc = o->o_bd->bd_info->bi_op_txn(o, SLAP_TXN_COMMIT, &txn );
if ( rc ) {
rs->sr_text = "transaction commit failed";
rc = LDAP_OTHER;
}
} else {
rs->sr_text = "transaction aborted";
rc = LDAP_SUCCESS;;
rc = LDAP_SUCCESS;
}
drain:
/* drain txn ops list */
while (( o = LDAP_STAILQ_FIRST( &c->c_txn_ops )) != NULL ) {
LDAP_STAILQ_REMOVE_HEAD( &c->c_txn_ops, o_next );
LDAP_STAILQ_NEXT( o, o_next ) = NULL;
slap_op_free( o, NULL );
}
settled:
assert( LDAP_STAILQ_EMPTY(&op->o_conn->c_txn_ops) );
assert( op->o_conn->c_txn == CONN_TXN_SETTLE );
op->o_conn->c_txn = CONN_TXN_INACTIVE;
op->o_conn->c_txn_backend = NULL;
assert( LDAP_STAILQ_EMPTY(&c->c_txn_ops) );
assert( c->c_txn == CONN_TXN_SETTLE );
c->c_txn = CONN_TXN_INACTIVE;
c->c_txn_backend = NULL;
done:
/* release connection lock */
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return rc;
}
int txn_preop( Operation *op, SlapReply *rs )
{
int settle = 0;
/* acquire connection lock */
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
if( op->o_conn->c_txn == CONN_TXN_INACTIVE ) {
rs->sr_text = "invalid transaction identifier";
rs->sr_err = LDAP_X_TXN_ID_INVALID;
goto txnReturn;
} else if( op->o_conn->c_txn == CONN_TXN_SETTLE ) {
settle=1;
goto txnReturn;
}
if( op->o_conn->c_txn_backend == NULL ) {
op->o_conn->c_txn_backend = op->o_bd;
} else if( op->o_conn->c_txn_backend != op->o_bd ) {
rs->sr_text = "transaction cannot span multiple database contexts";
rs->sr_err = LDAP_AFFECTS_MULTIPLE_DSAS;
goto txnReturn;
}
/* insert operation into transaction */
LDAP_STAILQ_REMOVE( &op->o_conn->c_ops, op, Operation, o_next );
LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_txn_ops, op, o_next );
txnReturn:
/* release connection lock */
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
if( !settle ) {
send_ldap_result( op, rs );
if ( !rs->sr_err )
rs->sr_err = LDAP_X_TXN_SPECIFY_OKAY;
return rs->sr_err;
}
return LDAP_SUCCESS; /* proceed with operation */
}
#endif /* LDAP_X_TXN */