mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-01-18 11:05:48 +08:00
Add locking support
This commit is contained in:
parent
a4c3626267
commit
f367441b69
@ -19,6 +19,7 @@
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <openssl/sha.h>
|
||||
|
||||
@ -47,6 +48,46 @@
|
||||
typedef ulong pgno_t;
|
||||
typedef uint16_t indx_t;
|
||||
|
||||
#define DEFAULT_READERS 126
|
||||
#define DEFAULT_MAPSIZE 1048576
|
||||
|
||||
/* Lock descriptor stuff */
|
||||
#define RXBODY \
|
||||
ulong mr_txnid; \
|
||||
pid_t mr_pid; \
|
||||
pthread_t mr_tid
|
||||
typedef struct MDB_rxbody {
|
||||
RXBODY;
|
||||
} MDB_rxbody;
|
||||
|
||||
#ifndef CACHELINE
|
||||
#define CACHELINE 64 /* most CPUs. Itanium uses 128 */
|
||||
#endif
|
||||
|
||||
typedef struct MDB_reader {
|
||||
RXBODY;
|
||||
/* cache line alignment */
|
||||
char pad[CACHELINE-sizeof(MDB_rxbody)];
|
||||
} MDB_reader;
|
||||
|
||||
#define TXBODY \
|
||||
uint32_t mt_magic; \
|
||||
uint32_t mt_version; \
|
||||
pthread_mutex_t mt_mutex; \
|
||||
ulong mt_txnid; \
|
||||
uint32_t mt_numreaders
|
||||
typedef struct MDB_txbody {
|
||||
TXBODY;
|
||||
} MDB_txbody;
|
||||
|
||||
typedef struct MDB_txninfo {
|
||||
TXBODY;
|
||||
char pad[CACHELINE-sizeof(MDB_txbody)];
|
||||
pthread_mutex_t mt_wmutex;
|
||||
char pad2[CACHELINE-sizeof(pthread_mutex_t)];
|
||||
MDB_reader mt_readers[1];
|
||||
} MDB_txninfo;
|
||||
|
||||
/* Common header for all page types. Overflow pages
|
||||
* occupy a number of contiguous pages with no
|
||||
* headers on any page after the first.
|
||||
@ -101,6 +142,7 @@ typedef struct MDB_meta { /* meta (footer) page content */
|
||||
pgno_t mm_root; /* page number of root page */
|
||||
MDB_stat mm_stat;
|
||||
pgno_t mm_prev_meta; /* previous meta page number */
|
||||
ulong mm_txnid;
|
||||
#define MDB_TOMBSTONE 0x01 /* file is replaced */
|
||||
uint32_t mm_flags;
|
||||
#define mm_revisions mm_stat.ms_revisions
|
||||
@ -171,8 +213,12 @@ struct MDB_txn {
|
||||
pgno_t mt_root; /* current / new root page */
|
||||
pgno_t mt_next_pgno; /* next unallocated page */
|
||||
pgno_t mt_first_pgno;
|
||||
ulong mt_txnid;
|
||||
MDB_env *mt_env;
|
||||
struct dirty_queue *mt_dirty_queue; /* modified pages */
|
||||
union {
|
||||
struct dirty_queue *dirty_queue; /* modified pages */
|
||||
MDB_reader *reader;
|
||||
} mt_u;
|
||||
#define MDB_TXN_RDONLY 0x01 /* read-only transaction */
|
||||
#define MDB_TXN_ERROR 0x02 /* an error has occurred */
|
||||
unsigned int mt_flags;
|
||||
@ -199,16 +245,20 @@ struct MDB_db {
|
||||
|
||||
struct MDB_env {
|
||||
int me_fd;
|
||||
int me_lfd;
|
||||
#define MDB_FIXPADDING 0x01 /* internal */
|
||||
uint32_t me_flags;
|
||||
int me_maxreaders;
|
||||
char *me_path;
|
||||
char *me_map;
|
||||
MDB_txninfo *me_txns;
|
||||
MDB_head me_head;
|
||||
MDB_db0 me_db; /* first DB, overlaps with meta */
|
||||
MDB_meta me_meta;
|
||||
MDB_txn *me_txn; /* current write transaction */
|
||||
size_t me_mapsize;
|
||||
off_t me_size; /* current file size */
|
||||
pthread_key_t me_txkey; /* thread-key for readers */
|
||||
};
|
||||
|
||||
#define NODESIZE offsetof(MDB_node, mn_data)
|
||||
@ -356,7 +406,7 @@ mdb_newpage(MDB_txn *txn, MDB_page *parent, int parent_idx, int num)
|
||||
dp->h.md_num = num;
|
||||
dp->h.md_parent = parent;
|
||||
dp->h.md_pi = parent_idx;
|
||||
SIMPLEQ_INSERT_TAIL(txn->mt_dirty_queue, dp, h.md_next);
|
||||
SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
|
||||
dp->p.mp_pgno = txn->mt_next_pgno;
|
||||
txn->mt_next_pgno += num;
|
||||
|
||||
@ -410,11 +460,6 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
|
||||
MDB_txn *txn;
|
||||
int rc;
|
||||
|
||||
if (!rdonly && env->me_txn != NULL) {
|
||||
DPRINTF("write transaction already begun");
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
if ((txn = calloc(1, sizeof(*txn))) == NULL) {
|
||||
DPRINTF("calloc: %s", strerror(errno));
|
||||
return ENOMEM;
|
||||
@ -423,23 +468,40 @@ mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
|
||||
if (rdonly) {
|
||||
txn->mt_flags |= MDB_TXN_RDONLY;
|
||||
} else {
|
||||
txn->mt_dirty_queue = calloc(1, sizeof(*txn->mt_dirty_queue));
|
||||
if (txn->mt_dirty_queue == NULL) {
|
||||
txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
|
||||
if (txn->mt_u.dirty_queue == NULL) {
|
||||
free(txn);
|
||||
return ENOMEM;
|
||||
}
|
||||
SIMPLEQ_INIT(txn->mt_dirty_queue);
|
||||
SIMPLEQ_INIT(txn->mt_u.dirty_queue);
|
||||
|
||||
#if 0
|
||||
DPRINTF("taking write lock on txn %p", txn);
|
||||
if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
|
||||
DPRINTF("flock: %s", strerror(errno));
|
||||
errno = EBUSY;
|
||||
free(txn->dirty_queue);
|
||||
free(txn);
|
||||
return NULL;
|
||||
pthread_mutex_lock(&env->me_txns->mt_wmutex);
|
||||
env->me_txns->mt_txnid++;
|
||||
}
|
||||
txn->mt_txnid = env->me_txns->mt_txnid;
|
||||
if (rdonly) {
|
||||
MDB_reader *r = pthread_getspecific(env->me_txkey);
|
||||
if (!r) {
|
||||
int i;
|
||||
pthread_mutex_lock(&env->me_txns->mt_mutex);
|
||||
for (i=0; i<env->me_maxreaders; i++) {
|
||||
if (env->me_txns->mt_readers[i].mr_pid == 0) {
|
||||
env->me_txns->mt_readers[i].mr_pid = getpid();
|
||||
env->me_txns->mt_readers[i].mr_tid = pthread_self();
|
||||
pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]);
|
||||
if (i >= env->me_txns->mt_numreaders)
|
||||
env->me_txns->mt_numreaders = i+1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&env->me_txns->mt_mutex);
|
||||
if (i == env->me_maxreaders) {
|
||||
return ENOSPC;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
r->mr_txnid = txn->mt_txnid;
|
||||
txn->mt_u.reader = r;
|
||||
} else {
|
||||
env->me_txn = txn;
|
||||
}
|
||||
|
||||
@ -470,12 +532,14 @@ mdb_txn_abort(MDB_txn *txn)
|
||||
env = txn->mt_env;
|
||||
DPRINTF("abort transaction on mdbenv %p, root page %lu", env, txn->mt_root);
|
||||
|
||||
if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
|
||||
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
|
||||
txn->mt_u.reader->mr_txnid = 0;
|
||||
} else {
|
||||
/* Discard all dirty pages.
|
||||
*/
|
||||
while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
|
||||
dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
|
||||
SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
|
||||
while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
|
||||
dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
|
||||
SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
|
||||
free(dp);
|
||||
}
|
||||
|
||||
@ -487,11 +551,11 @@ mdb_txn_abort(MDB_txn *txn)
|
||||
txn->bt->fd, strerror(errno));
|
||||
}
|
||||
#endif
|
||||
free(txn->mt_dirty_queue);
|
||||
free(txn->mt_u.dirty_queue);
|
||||
env->me_txn = NULL;
|
||||
pthread_mutex_unlock(&env->me_txns->mt_wmutex);
|
||||
}
|
||||
|
||||
if (txn == env->me_txn)
|
||||
env->me_txn = NULL;
|
||||
free(txn);
|
||||
}
|
||||
|
||||
@ -528,7 +592,7 @@ mdb_txn_commit(MDB_txn *txn)
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
if (SIMPLEQ_EMPTY(txn->mt_dirty_queue))
|
||||
if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
|
||||
goto done;
|
||||
|
||||
if (F_ISSET(env->me_flags, MDB_FIXPADDING)) {
|
||||
@ -552,7 +616,7 @@ mdb_txn_commit(MDB_txn *txn)
|
||||
do {
|
||||
n = 0;
|
||||
done = 1;
|
||||
SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
|
||||
SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
|
||||
DPRINTF("committing page %lu", dp->p.mp_pgno);
|
||||
iov[n].iov_len = env->me_head.mh_psize * dp->h.md_num;
|
||||
iov[n].iov_base = &dp->p;
|
||||
@ -581,9 +645,9 @@ mdb_txn_commit(MDB_txn *txn)
|
||||
|
||||
/* Drop the dirty pages.
|
||||
*/
|
||||
while (!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
|
||||
dp = SIMPLEQ_FIRST(txn->mt_dirty_queue);
|
||||
SIMPLEQ_REMOVE_HEAD(txn->mt_dirty_queue, h.md_next);
|
||||
while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
|
||||
dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
|
||||
SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
|
||||
free(dp);
|
||||
if (--n == 0)
|
||||
break;
|
||||
@ -596,8 +660,9 @@ mdb_txn_commit(MDB_txn *txn)
|
||||
mdb_txn_abort(txn);
|
||||
return n;
|
||||
}
|
||||
txn = NULL;
|
||||
env->me_txn = NULL;
|
||||
pthread_mutex_unlock(&env->me_txns->mt_wmutex);
|
||||
txn = NULL;
|
||||
|
||||
done:
|
||||
mdb_txn_abort(txn);
|
||||
@ -713,7 +778,7 @@ mdbenv_write_meta(MDB_env *env, pgno_t root, unsigned int flags)
|
||||
bcopy(&env->me_meta, meta, sizeof(*meta));
|
||||
|
||||
rc = write(env->me_fd, &dp->p, env->me_head.mh_psize);
|
||||
SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_dirty_queue, h.md_next);
|
||||
SIMPLEQ_REMOVE_HEAD(env->me_txn->mt_u.dirty_queue, h.md_next);
|
||||
free(dp);
|
||||
if (rc != (ssize_t)env->me_head.mh_psize) {
|
||||
int err = errno;
|
||||
@ -852,12 +917,38 @@ mdbenv_create(MDB_env **env, size_t size)
|
||||
|
||||
e->me_head.mh_magic = MDB_MAGIC;
|
||||
e->me_head.mh_version = MDB_VERSION;
|
||||
e->me_mapsize = e->me_head.mh_mapsize = size;
|
||||
e->me_head.mh_mapsize = DEFAULT_MAPSIZE;
|
||||
e->me_maxreaders = DEFAULT_READERS;
|
||||
e->me_db.md_env = e;
|
||||
*env = e;
|
||||
return MDB_SUCCESS;
|
||||
}
|
||||
|
||||
int
|
||||
mdbenv_set_mapsize(MDB_env *env, size_t size)
|
||||
{
|
||||
if (env->me_map)
|
||||
return EINVAL;
|
||||
env->me_mapsize = env->me_head.mh_mapsize = size;
|
||||
return MDB_SUCCESS;
|
||||
}
|
||||
|
||||
int
|
||||
mdbenv_set_maxreaders(MDB_env *env, int readers)
|
||||
{
|
||||
env->me_maxreaders = readers;
|
||||
return MDB_SUCCESS;
|
||||
}
|
||||
|
||||
int
|
||||
mdbenv_get_maxreaders(MDB_env *env, int *readers)
|
||||
{
|
||||
if (!env || !readers)
|
||||
return EINVAL;
|
||||
*readers = env->me_maxreaders;
|
||||
return MDB_SUCCESS;
|
||||
}
|
||||
|
||||
int
|
||||
mdbenv_open2(MDB_env *env, unsigned int flags)
|
||||
{
|
||||
@ -918,10 +1009,54 @@ mdbenv_open2(MDB_env *env, unsigned int flags)
|
||||
return MDB_SUCCESS;
|
||||
}
|
||||
|
||||
static void
|
||||
mdbenv_reader_dest(void *ptr)
|
||||
{
|
||||
MDB_reader *reader = ptr;
|
||||
|
||||
reader->mr_txnid = 0;
|
||||
reader->mr_pid = 0;
|
||||
reader->mr_tid = 0;
|
||||
}
|
||||
|
||||
int
|
||||
mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
|
||||
{
|
||||
int oflags, rc;
|
||||
int oflags, rc, len;
|
||||
char *lpath;
|
||||
off_t size, rsize;
|
||||
|
||||
len = strlen(path);
|
||||
lpath = malloc(len + sizeof(".lock"));
|
||||
sprintf(lpath, "%s.lock", path);
|
||||
if ((env->me_lfd = open(lpath, O_RDWR | O_CREAT, mode)) == -1)
|
||||
return errno;
|
||||
|
||||
size = lseek(env->me_lfd, 0, SEEK_END);
|
||||
rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
|
||||
if (size < rsize) {
|
||||
if (ftruncate(env->me_lfd, rsize) != 0) {
|
||||
rc = errno;
|
||||
close(env->me_lfd);
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
rsize = size;
|
||||
size = rsize - sizeof(MDB_txninfo);
|
||||
env->me_maxreaders = size/sizeof(MDB_reader) + 1;
|
||||
}
|
||||
env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
|
||||
env->me_lfd, 0);
|
||||
if (env->me_txns == MAP_FAILED)
|
||||
return errno;
|
||||
if (size == 0) {
|
||||
pthread_mutexattr_t mattr;
|
||||
|
||||
pthread_mutexattr_init(&mattr);
|
||||
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
|
||||
pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
|
||||
pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
|
||||
}
|
||||
|
||||
if (F_ISSET(flags, MDB_RDONLY))
|
||||
oflags = O_RDONLY;
|
||||
@ -939,6 +1074,8 @@ mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
|
||||
DPRINTF("opened dbenv %p", env);
|
||||
}
|
||||
|
||||
pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -954,6 +1091,12 @@ mdbenv_close(MDB_env *env)
|
||||
munmap(env->me_map, env->me_mapsize);
|
||||
}
|
||||
close(env->me_fd);
|
||||
if (env->me_txns) {
|
||||
size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
|
||||
munmap(env->me_txns, size);
|
||||
}
|
||||
close(env->me_lfd);
|
||||
free(env);
|
||||
}
|
||||
|
||||
/* Search for key within a leaf page, using binary search.
|
||||
@ -1060,9 +1203,9 @@ mdbenv_get_page(MDB_env *env, pgno_t pgno)
|
||||
MDB_txn *txn = env->me_txn;
|
||||
|
||||
if (txn && pgno >= txn->mt_first_pgno &&
|
||||
!SIMPLEQ_EMPTY(txn->mt_dirty_queue)) {
|
||||
!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
|
||||
MDB_dpage *dp;
|
||||
SIMPLEQ_FOREACH(dp, txn->mt_dirty_queue, h.md_next) {
|
||||
SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
|
||||
if (dp->p.mp_pgno == pgno) {
|
||||
p = &dp->p;
|
||||
break;
|
||||
|
@ -55,12 +55,15 @@ typedef struct MDB_stat {
|
||||
unsigned long ms_entries;
|
||||
} MDB_stat;
|
||||
|
||||
int mdbenv_create(MDB_env **env, size_t size);
|
||||
int mdbenv_create(MDB_env **env);
|
||||
int mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode);
|
||||
int mdbenv_stat(MDB_env *env, MDB_stat **stat);
|
||||
void mdbenv_close(MDB_env *env);
|
||||
int mdbenv_get_flags(MDB_env *env, unsigned int *flags);
|
||||
int mdbenv_get_path(MDB_env *env, const char **path);
|
||||
int mdbenv_set_mapsize(MDB_env *env, size_t size);
|
||||
int mdbenv_set_maxreaders(MDB_env *env, int readers);
|
||||
int mdbenv_get_maxreaders(MDB_env *env, int *readers);
|
||||
int mdbenv_sync(MDB_env *env);
|
||||
int mdbenv_compact(MDB_env *env);
|
||||
|
||||
|
@ -25,7 +25,8 @@ int main(int argc,char * argv[])
|
||||
values[i] = random()%1024;
|
||||
}
|
||||
|
||||
rc = mdbenv_create(&env, 10485760);
|
||||
rc = mdbenv_create(&env);
|
||||
rc = mdbenv_set_mapsize(env, 10485760);
|
||||
rc = mdbenv_open(env, "./testdb", MDB_FIXEDMAP|MDB_NOSYNC, 0664);
|
||||
rc = mdb_txn_begin(env, 0, &txn);
|
||||
rc = mdb_open(env, txn, NULL, 0, &db);
|
||||
|
Loading…
Reference in New Issue
Block a user