ITAGAKI Takahiro <itagaki.takahiro@oss.ntt.co.jp> added thread-safe

descriptor handling
This commit is contained in:
Michael Meskes 2007-10-02 09:50:00 +00:00
parent f1d37a9997
commit 0c2eb200d6
15 changed files with 345 additions and 84 deletions

View File

@ -2247,5 +2247,10 @@ Sun, 30 Sep 2007 13:37:31 +0200
- Applied another patch by ITAGAKI Takahiro <itagaki.takahiro@oss.ntt.co.jp>
to get memory allocation thread-safe. He also did some cleaning up.
Tue, 02 Oct 2007 11:32:25 +0200
- ITAGAKI Takahiro <itagaki.takahiro@oss.ntt.co.jp> added thread-safe
descriptor handling
- Set ecpg library version to 6.0.
- Set ecpg version to 4.4.

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/connect.c,v 1.44 2007/09/30 11:38:48 meskes Exp $ */
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/connect.c,v 1.45 2007/10/02 09:49:59 meskes Exp $ */
#define POSTGRES_ECPG_INTERNAL
#include "postgres_fe.h"
@ -447,6 +447,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
this->cache_head = NULL;
this->prep_stmts = NULL;
this->descriptors = NULL;
if (all_connections == NULL)
this->next = NULL;

View File

@ -1,12 +1,13 @@
/* dynamic SQL support routines
*
* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/descriptor.c,v 1.23 2007/08/14 10:01:52 meskes Exp $
* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/descriptor.c,v 1.24 2007/10/02 09:49:59 meskes Exp $
*/
#define POSTGRES_ECPG_INTERNAL
#include "postgres_fe.h"
#include "pg_type.h"
#include "ecpg-pthread-win32.h"
#include "ecpgtype.h"
#include "ecpglib.h"
#include "ecpgerrno.h"
@ -14,18 +15,55 @@
#include "sqlca.h"
#include "sql3types.h"
struct descriptor *all_descriptors = NULL;
static void descriptor_free(struct descriptor *desc);
static void descriptor_deallocate_all(struct descriptor *list);
/* We manage descriptors separately for each thread. */
#ifdef ENABLE_THREAD_SAFETY
static pthread_key_t descriptor_key;
#ifndef WIN32
static pthread_once_t descriptor_once = PTHREAD_ONCE_INIT;
#endif
static void
descriptor_destructor(void *arg)
{
descriptor_deallocate_all(arg);
}
NON_EXEC_STATIC void
descriptor_key_init(void)
{
pthread_key_create(&descriptor_key, descriptor_destructor);
}
static struct descriptor *
get_descriptors(void)
{
pthread_once(&descriptor_once, descriptor_key_init);
return (struct descriptor *) pthread_getspecific(descriptor_key);
}
static void
set_descriptors(struct descriptor *value)
{
pthread_setspecific(descriptor_key, value);
}
#else
static struct descriptor *all_descriptors = NULL;
#define get_descriptors() (all_descriptors)
#define set_descriptors(value) do { all_descriptors = (value); } while(0)
#endif
/* old internal convenience function that might go away later */
static PGresult
*
static PGresult *
ECPGresultByDescriptor(int line, const char *name)
{
PGresult **resultpp = ECPGdescriptor_lvalue(line, name);
if (resultpp)
return *resultpp;
return NULL;
struct descriptor *desc = ECPGfind_desc(line, name);
if (desc == NULL)
return NULL;
return desc->result;
}
static unsigned int
@ -445,20 +483,9 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...)
bool
ECPGset_desc_header(int lineno, const char *desc_name, int count)
{
struct descriptor *desc;
for (desc = all_descriptors; desc; desc = desc->next)
{
if (strcmp(desc_name, desc->name) == 0)
break;
}
struct descriptor *desc = ECPGfind_desc(lineno, desc_name);
if (desc == NULL)
{
ECPGraise(lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, desc_name);
return false;
}
desc->count = count;
return true;
}
@ -471,17 +498,9 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...)
struct descriptor_item *desc_item;
struct variable *var;
for (desc = all_descriptors; desc; desc = desc->next)
{
if (strcmp(desc_name, desc->name) == 0)
break;
}
desc = ECPGfind_desc(lineno, desc_name);
if (desc == NULL)
{
ECPGraise(lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, desc_name);
return false;
}
for (desc_item = desc->items; desc_item; desc_item = desc_item->next)
{
@ -506,7 +525,7 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...)
va_start(args, index);
do
for (;;)
{
enum ECPGdtype itemtype;
const char *tobeinserted = NULL;
@ -585,40 +604,50 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...)
return false;
}
}
} while (true);
}
ECPGfree(var);
return true;
}
/* Free the descriptor and items in it. */
static void
descriptor_free(struct descriptor *desc)
{
struct descriptor_item *desc_item;
for (desc_item = desc->items; desc_item;)
{
struct descriptor_item *di;
ECPGfree(desc_item->data);
di = desc_item;
desc_item = desc_item->next;
ECPGfree(di);
}
ECPGfree(desc->name);
PQclear(desc->result);
ECPGfree(desc);
}
bool
ECPGdeallocate_desc(int line, const char *name)
{
struct descriptor *desc;
struct descriptor **lastptr = &all_descriptors;
struct descriptor *prev;
struct sqlca_t *sqlca = ECPGget_sqlca();
ECPGinit_sqlca(sqlca);
for (desc = all_descriptors; desc; lastptr = &desc->next, desc = desc->next)
for (desc = get_descriptors(), prev = NULL; desc; prev = desc, desc = desc->next)
{
if (!strcmp(name, desc->name))
{
struct descriptor_item *desc_item;
for (desc_item = desc->items; desc_item;)
{
struct descriptor_item *di;
ECPGfree(desc_item->data);
di = desc_item;
desc_item = desc_item->next;
ECPGfree(di);
}
*lastptr = desc->next;
ECPGfree(desc->name);
PQclear(desc->result);
ECPGfree(desc);
if (prev)
prev->next = desc->next;
else
set_descriptors(desc->next);
descriptor_free(desc);
return true;
}
}
@ -626,6 +655,18 @@ ECPGdeallocate_desc(int line, const char *name)
return false;
}
/* Deallocate all descriptors in the list */
static void
descriptor_deallocate_all(struct descriptor *list)
{
while (list)
{
struct descriptor *next = list->next;
descriptor_free(list);
list = next;
}
}
bool
ECPGallocate_desc(int line, const char *name)
{
@ -636,7 +677,7 @@ ECPGallocate_desc(int line, const char *name)
new = (struct descriptor *) ECPGalloc(sizeof(struct descriptor), line);
if (!new)
return false;
new->next = all_descriptors;
new->next = get_descriptors();
new->name = ECPGalloc(strlen(name) + 1, line);
if (!new->name)
{
@ -654,23 +695,24 @@ ECPGallocate_desc(int line, const char *name)
return false;
}
strcpy(new->name, name);
all_descriptors = new;
set_descriptors(new);
return true;
}
PGresult **
ECPGdescriptor_lvalue(int line, const char *descriptor)
/* Find descriptor with name in the connection. */
struct descriptor *
ECPGfind_desc(int line, const char *name)
{
struct descriptor *i;
struct descriptor *desc;
for (i = all_descriptors; i != NULL; i = i->next)
for (desc = get_descriptors(); desc; desc = desc->next)
{
if (!strcmp(descriptor, i->name))
return &i->result;
if (strcmp(name, desc->name) == 0)
return desc;
}
ECPGraise(line, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, (char *) descriptor);
return NULL;
ECPGraise(line, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, name);
return NULL; /* not found */
}
bool

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/execute.c,v 1.70 2007/09/26 10:57:00 meskes Exp $ */
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/execute.c,v 1.71 2007/10/02 09:49:59 meskes Exp $ */
/*
* The aim is to get a simpler inteface to the database routines.
@ -1088,17 +1088,9 @@ ECPGexecute(struct statement * stmt)
struct descriptor *desc;
struct descriptor_item *desc_item;
for (desc = all_descriptors; desc; desc = desc->next)
{
if (strcmp(var->pointer, desc->name) == 0)
break;
}
desc = ECPGfind_desc(stmt->lineno, var->pointer);
if (desc == NULL)
{
ECPGraise(stmt->lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, var->pointer);
return false;
}
desc_counter++;
for (desc_item = desc->items; desc_item; desc_item = desc_item->next)
@ -1334,16 +1326,15 @@ ECPGexecute(struct statement * stmt)
if (var != NULL && var->type == ECPGt_descriptor)
{
PGresult **resultpp = ECPGdescriptor_lvalue(stmt->lineno, (const char *) var->pointer);
if (resultpp == NULL)
struct descriptor *desc = ECPGfind_desc(stmt->lineno, var->pointer);
if (desc == NULL)
status = false;
else
{
if (*resultpp)
PQclear(*resultpp);
*resultpp = results;
clear_result = FALSE;
if (desc->result)
PQclear(desc->result);
desc->result = results;
clear_result = false;
ECPGlog("ECPGexecute putting result (%d tuples) into descriptor '%s'\n", PQntuples(results), (const char *) var->pointer);
}
var = var->next;

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/extern.h,v 1.28 2007/09/30 11:38:48 meskes Exp $ */
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/extern.h,v 1.29 2007/10/02 09:49:59 meskes Exp $ */
#ifndef _ECPG_LIB_EXTERN_H
#define _ECPG_LIB_EXTERN_H
@ -36,6 +36,8 @@ bool ECPGget_data(const PGresult *, int, int, int, enum ECPGttype type,
#ifdef ENABLE_THREAD_SAFETY
void ecpg_pthreads_init(void);
#else
#define ecpg_pthreads_init() ((void)0)
#endif
struct connection *ECPGget_connection(const char *);
char *ECPGalloc(long, int);
@ -92,6 +94,7 @@ struct connection
int autocommit;
struct ECPGtype_information_cache *cache_head;
struct prepared_statement *prep_stmts;
struct descriptor *descriptors;
struct connection *next;
};
@ -105,8 +108,6 @@ struct descriptor
struct descriptor_item *items;
};
extern struct descriptor *all_descriptors;
struct descriptor_item
{
int num;
@ -136,7 +137,7 @@ struct variable
struct variable *next;
};
PGresult **ECPGdescriptor_lvalue(int line, const char *descriptor);
struct descriptor *ECPGfind_desc(int line, const char *name);
bool ECPGstore_result(const PGresult *results, int act_field,
const struct statement * stmt, struct variable * var);

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/misc.c,v 1.37 2007/09/30 11:38:48 meskes Exp $ */
/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/misc.c,v 1.38 2007/10/02 09:49:59 meskes Exp $ */
#define POSTGRES_ECPG_INTERNAL
#include "postgres_fe.h"
@ -431,6 +431,7 @@ DllMain(HANDLE module, DWORD reason, LPVOID reserved)
auto_mem_key_init();
ecpg_actual_connection_init();
ecpg_sqlca_key_init();
descriptor_key_init();
}
return TRUE;
}

View File

@ -1,4 +1,4 @@
/* $PostgreSQL: pgsql/src/interfaces/ecpg/include/ecpg-pthread-win32.h,v 1.2 2007/09/30 11:38:48 meskes Exp $ */
/* $PostgreSQL: pgsql/src/interfaces/ecpg/include/ecpg-pthread-win32.h,v 1.3 2007/10/02 09:49:59 meskes Exp $ */
/*
* pthread mapping macros for win32 native thread implementation
*/
@ -47,6 +47,7 @@ extern pthread_mutex_t debug_init_mutex;
extern void auto_mem_key_init(void);
extern void ecpg_actual_connection_init(void);
extern void ecpg_sqlca_key_init(void);
extern void descriptor_key_init(void);
extern BOOL WINAPI DllMain(HANDLE module, DWORD reason, LPVOID reserved);
#endif /* WIN32 */

View File

@ -43,3 +43,4 @@ test: thread/thread
test: thread/thread_implicit
test: thread/prep
test: thread/alloc
test: thread/descriptor

View File

@ -43,5 +43,6 @@ test: thread/thread
test: thread/thread_implicit
test: thread/prep
test: thread/alloc
test: thread/descriptor
test: connect/test1

View File

@ -0,0 +1,155 @@
/* Processed by ecpg (regression mode) */
/* These include files are added by the preprocessor */
#include <ecpgtype.h>
#include <ecpglib.h>
#include <ecpgerrno.h>
#include <sqlca.h>
/* End of automatic include section */
#define ECPGdebug(X,Y) ECPGdebug((X)+100,(Y))
#line 1 "descriptor.pgc"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#endif
#include <stdio.h>
#define THREADS 16
#define REPEATS 50000
#line 1 "sqlca.h"
#ifndef POSTGRES_SQLCA_H
#define POSTGRES_SQLCA_H
#ifndef PGDLLIMPORT
#if defined(WIN32) || defined(__CYGWIN__)
#define PGDLLIMPORT __declspec (dllimport)
#else
#define PGDLLIMPORT
#endif /* __CYGWIN__ */
#endif /* PGDLLIMPORT */
#define SQLERRMC_LEN 150
#ifdef __cplusplus
extern "C"
{
#endif
struct sqlca_t
{
char sqlcaid[8];
long sqlabc;
long sqlcode;
struct
{
int sqlerrml;
char sqlerrmc[SQLERRMC_LEN];
} sqlerrm;
char sqlerrp[8];
long sqlerrd[6];
/* Element 0: empty */
/* 1: OID of processed tuple if applicable */
/* 2: number of rows processed */
/* after an INSERT, UPDATE or */
/* DELETE statement */
/* 3: empty */
/* 4: empty */
/* 5: empty */
char sqlwarn[8];
/* Element 0: set to 'W' if at least one other is 'W' */
/* 1: if 'W' at least one character string */
/* value was truncated when it was */
/* stored into a host variable. */
/*
* 2: if 'W' a (hopefully) non-fatal notice occurred
*/ /* 3: empty */
/* 4: empty */
/* 5: empty */
/* 6: empty */
/* 7: empty */
char sqlstate[5];
};
struct sqlca_t *ECPGget_sqlca(void);
#ifndef POSTGRES_ECPG_INTERNAL
#define sqlca (*ECPGget_sqlca())
#endif
#ifdef __cplusplus
}
#endif
#endif
#line 13 "descriptor.pgc"
/* exec sql whenever sqlerror sqlprint ; */
#line 14 "descriptor.pgc"
/* exec sql whenever not found sqlprint ; */
#line 15 "descriptor.pgc"
#ifdef WIN32
static unsigned STDCALL fn(void* arg)
#else
void* fn(void* arg)
#endif
{
int i;
for (i = 1; i <= REPEATS; ++i)
{
ECPGallocate_desc(__LINE__, "mydesc");
#line 27 "descriptor.pgc"
if (sqlca.sqlcode < 0) sqlprint();
#line 27 "descriptor.pgc"
ECPGdeallocate_desc(__LINE__, "mydesc");
#line 28 "descriptor.pgc"
if (sqlca.sqlcode < 0) sqlprint();
#line 28 "descriptor.pgc"
}
return 0;
}
int main (int argc, char** argv)
{
int i;
#ifdef WIN32
HANDLE threads[THREADS];
#else
pthread_t threads[THREADS];
#endif
#ifdef WIN32
for (i = 0; i < THREADS; ++i)
{
unsigned id;
threads[i] = (HANDLE)_beginthreadex(NULL, 0, fn, NULL, 0, &id);
}
WaitForMultipleObjects(THREADS, threads, TRUE, INFINITE);
for (i = 0; i < THREADS; ++i)
CloseHandle(threads[i]);
#else
for (i = 0; i < THREADS; ++i)
pthread_create(&threads[i], NULL, fn, NULL);
for (i = 0; i < THREADS; ++i)
pthread_join(threads[i], NULL);
#endif
return 0;
}

View File

@ -7,6 +7,7 @@ include $(top_srcdir)/$(subdir)/../Makefile.regress
TESTS = thread_implicit thread_implicit.c \
thread thread.c \
prep prep.c \
descriptor descriptor.c \
alloc alloc.c
all: $(TESTS)

View File

@ -0,0 +1,61 @@
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#endif
#include <stdio.h>
#define THREADS 16
#define REPEATS 50000
EXEC SQL include sqlca;
EXEC SQL whenever sqlerror sqlprint;
EXEC SQL whenever not found sqlprint;
#ifdef WIN32
static unsigned STDCALL fn(void* arg)
#else
void* fn(void* arg)
#endif
{
int i;
for (i = 1; i <= REPEATS; ++i)
{
EXEC SQL ALLOCATE DESCRIPTOR mydesc;
EXEC SQL DEALLOCATE DESCRIPTOR mydesc;
}
return 0;
}
int main (int argc, char** argv)
{
int i;
#ifdef WIN32
HANDLE threads[THREADS];
#else
pthread_t threads[THREADS];
#endif
#ifdef WIN32
for (i = 0; i < THREADS; ++i)
{
unsigned id;
threads[i] = (HANDLE)_beginthreadex(NULL, 0, fn, NULL, 0, &id);
}
WaitForMultipleObjects(THREADS, threads, TRUE, INFINITE);
for (i = 0; i < THREADS; ++i)
CloseHandle(threads[i]);
#else
for (i = 0; i < THREADS; ++i)
pthread_create(&threads[i], NULL, fn, NULL);
for (i = 0; i < THREADS; ++i)
pthread_join(threads[i], NULL);
#endif
return 0;
}