postgresql/contrib/pgbench/pgbench.c
Tom Lane 69525fc0ef Remove incorrect increment of lineno, per David Fetter.
Sync HEAD and 8.1 branches of pgbench.
2005-12-10 01:09:07 +00:00

1436 lines
30 KiB
C

/*
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.49 2005/12/10 01:09:07 tgl Exp $
*
* pgbench: a simple benchmark program for PostgreSQL
* written by Tatsuo Ishii
*
* Copyright (c) 2000-2005 Tatsuo Ishii
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
* granted, provided that the above copyright notice appear in all
* copies and that both that copyright notice and this permission
* notice appear in supporting documentation, and that the name of the
* author not be used in advertising or publicity pertaining to
* distribution of the software without specific, written prior
* permission. The author makes no representations about the
* suitability of this software for any purpose. It is provided "as
* is" without express or implied warranty.
*/
#include "postgres_fe.h"
#include "libpq-fe.h"
#include <ctype.h>
#ifdef WIN32
#include "win32.h"
#else
#include <sys/time.h>
#include <unistd.h>
#ifdef HAVE_GETOPT_H
#include <getopt.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
/* for getrlimit */
#include <sys/resource.h>
#endif /* ! WIN32 */
extern char *optarg;
extern int optind;
#ifdef WIN32
#undef select
#endif
/********************************************************************
* some configurable parameters */
#define MAXCLIENTS 1024 /* max number of clients allowed */
int nclients = 1; /* default number of simulated clients */
int nxacts = 10; /* default number of transactions per clients */
/*
* scaling factor. for example, tps = 10 will make 1000000 tuples of
* accounts table.
*/
int tps = 1;
/*
* end of configurable parameters
*********************************************************************/
#define nbranches 1
#define ntellers 10
#define naccounts 100000
FILE *LOGFILE = NULL;
bool use_log; /* log transaction latencies to a file */
int remains; /* number of remaining clients */
int is_connect; /* establish connection for each transaction */
char *pghost = "";
char *pgport = NULL;
char *pgoptions = NULL;
char *pgtty = NULL;
char *login = NULL;
char *pwd = NULL;
char *dbName;
/* variable definitions */
typedef struct
{
char *name; /* variable name */
char *value; /* its value */
} Variable;
/*
* structures used in custom query mode
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
int state; /* state No. */
int cnt; /* xacts count */
int ecnt; /* error count */
int listen; /* 0 indicates that an async query has been
* sent */
Variable *variables; /* array of variable definitions */
int nvariables;
struct timeval txn_begin; /* used for measuring latencies */
int use_file; /* index in sql_files for this client */
} CState;
/*
* queries read from files
*/
#define SQL_COMMAND 1
#define META_COMMAND 2
#define MAX_ARGS 10
typedef struct
{
int type; /* command type (SQL_COMMAND or META_COMMAND) */
int argc; /* number of commands */
char *argv[MAX_ARGS]; /* command list */
} Command;
#define MAX_FILES 128 /* max number of SQL script files allowed */
Command **sql_files[MAX_FILES]; /* SQL script files */
int num_files; /* its number */
/* default scenario */
static char *tpc_b = {
"\\setrandom aid 1 100000\n"
"\\setrandom bid 1 1\n"
"\\setrandom tid 1 10\n"
"\\setrandom delta 1 10000\n"
"BEGIN;\n"
"UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
"SELECT abalance FROM accounts WHERE aid = :aid;\n"
"UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
"UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
"INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
};
/* -N case */
static char *simple_update = {
"\\setrandom aid 1 100000\n"
"\\setrandom bid 1 1\n"
"\\setrandom tid 1 10\n"
"\\setrandom delta 1 10000\n"
"BEGIN;\n"
"UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
"SELECT abalance FROM accounts WHERE aid = :aid;\n"
"INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
};
/* -S case */
static char *select_only = {
"\\setrandom aid 1 100000\n"
"SELECT abalance FROM accounts WHERE aid = :aid;\n"
};
static void
usage(void)
{
fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
}
/* random number generator */
static int
getrand(int min, int max)
{
return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
}
/* set up a connection to the backend */
static PGconn *
doConnect(void)
{
PGconn *con;
PGresult *res;
con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
login, pwd);
if (con == NULL)
{
fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
fprintf(stderr, "Memory allocatin problem?\n");
return (NULL);
}
if (PQstatus(con) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
if (PQerrorMessage(con))
fprintf(stderr, "%s", PQerrorMessage(con));
else
fprintf(stderr, "No explanation from the backend\n");
return (NULL);
}
res = PQexec(con, "SET search_path = public");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
return (con);
}
/* throw away response from backend */
static void
discard_response(CState * state)
{
PGresult *res;
do
{
res = PQgetResult(state->con);
if (res)
PQclear(res);
} while (res);
}
/* check to see if the SQL result was good */
static int
check(CState * state, PGresult *res, int n, int good)
{
CState *st = &state[n];
if (res && PQresultStatus(res) != good)
{
fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return (-1);
}
return (0); /* OK */
}
static int
compareVariables(const void *v1, const void *v2)
{
return strcmp(((const Variable *) v1)->name,
((const Variable *) v2)->name);
}
static char *
getVariable(CState * st, char *name)
{
Variable key,
*var;
/* On some versions of Solaris, bsearch of zero items dumps core */
if (st->nvariables <= 0)
return NULL;
key.name = name;
var = (Variable *) bsearch((void *) &key,
(void *) st->variables,
st->nvariables,
sizeof(Variable),
compareVariables);
if (var != NULL)
return var->value;
else
return NULL;
}
static int
putVariable(CState * st, char *name, char *value)
{
Variable key,
*var;
key.name = name;
/* On some versions of Solaris, bsearch of zero items dumps core */
if (st->nvariables > 0)
var = (Variable *) bsearch((void *) &key,
(void *) st->variables,
st->nvariables,
sizeof(Variable),
compareVariables);
else
var = NULL;
if (var == NULL)
{
Variable *newvars;
if (st->variables)
newvars = (Variable *) realloc(st->variables,
(st->nvariables + 1) * sizeof(Variable));
else
newvars = (Variable *) malloc(sizeof(Variable));
if (newvars == NULL)
return false;
st->variables = newvars;
var = &newvars[st->nvariables];
var->name = NULL;
var->value = NULL;
if ((var->name = strdup(name)) == NULL
|| (var->value = strdup(value)) == NULL)
{
free(var->name);
free(var->value);
return false;
}
st->nvariables++;
qsort((void *) st->variables, st->nvariables, sizeof(Variable),
compareVariables);
}
else
{
if ((value = strdup(value)) == NULL)
return false;
free(var->value);
var->value = value;
}
return true;
}
static char *
assignVariables(CState * st, char *sql)
{
int i,
j;
char *p,
*name,
*val;
void *tmp;
i = 0;
while ((p = strchr(&sql[i], ':')) != NULL)
{
i = j = p - sql;
do
{
i++;
} while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
if (i == j + 1)
continue;
name = malloc(i - j);
if (name == NULL)
return NULL;
memcpy(name, &sql[j + 1], i - (j + 1));
name[i - (j + 1)] = '\0';
val = getVariable(st, name);
free(name);
if (val == NULL)
continue;
if (strlen(val) > i - j)
{
tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
if (tmp == NULL)
{
free(sql);
return NULL;
}
sql = tmp;
}
if (strlen(val) != i - j)
memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
strncpy(&sql[j], val, strlen(val));
if (strlen(val) < i - j)
{
tmp = realloc(sql, strlen(sql) + 1);
if (tmp == NULL)
{
free(sql);
return NULL;
}
sql = tmp;
}
i = j + strlen(val);
}
return sql;
}
static void
doCustom(CState * state, int n, int debug)
{
PGresult *res;
CState *st = &state[n];
Command **commands;
top:
commands = sql_files[st->use_file];
if (st->listen)
{ /* are we receiver? */
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
fprintf(stderr, "client %d receiving\n", n);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return;
}
if (PQisBusy(st->con))
return; /* don't have the whole result yet */
}
/*
* transaction finished: record the time it took in the log
*/
if (use_log && commands[st->state + 1] == NULL)
{
double diff;
struct timeval now;
gettimeofday(&now, NULL);
diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
(int) (now.tv_usec - st->txn_begin.tv_usec);
fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
}
if (commands[st->state]->type == SQL_COMMAND)
{
res = PQgetResult(st->con);
if (strncasecmp(commands[st->state]->argv[0], "select", 6) != 0)
{
if (check(state, res, n, PGRES_COMMAND_OK))
return;
}
else
{
if (check(state, res, n, PGRES_TUPLES_OK))
return;
}
PQclear(res);
discard_response(st);
}
if (commands[st->state + 1] == NULL)
{
if (is_connect)
{
PQfinish(st->con);
st->con = NULL;
}
if (++st->cnt >= nxacts)
{
remains--; /* I've done */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return;
}
}
/* increment state counter */
st->state++;
if (commands[st->state] == NULL)
{
st->state = 0;
st->use_file = getrand(0, num_files - 1);
commands = sql_files[st->use_file];
}
}
if (st->con == NULL)
{
if ((st->con = doConnect()) == NULL)
{
fprintf(stderr, "Client %d aborted in establishing connection.\n",
n);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return;
}
}
if (use_log && st->state == 0)
gettimeofday(&(st->txn_begin), NULL);
if (commands[st->state]->type == SQL_COMMAND)
{
char *sql;
if ((sql = strdup(commands[st->state]->argv[0])) == NULL
|| (sql = assignVariables(st, sql)) == NULL)
{
fprintf(stderr, "out of memory\n");
st->ecnt++;
return;
}
if (debug)
fprintf(stderr, "client %d sending %s\n", n, sql);
if (PQsendQuery(st->con, sql) == 0)
{
if (debug)
fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
st->ecnt++;
}
else
{
st->listen = 1; /* flags that should be listened */
}
free(sql);
}
else if (commands[st->state]->type == META_COMMAND)
{
int argc = commands[st->state]->argc,
i;
char **argv = commands[st->state]->argv;
if (debug)
{
fprintf(stderr, "client %d executing \\%s", n, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
}
if (strcasecmp(argv[0], "setrandom") == 0)
{
char *val;
if ((val = malloc(strlen(argv[3]) + 1)) == NULL)
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
return;
}
sprintf(val, "%d", getrand(atoi(argv[2]), atoi(argv[3])));
if (putVariable(st, argv[1], val) == false)
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
free(val);
st->ecnt++;
return;
}
free(val);
st->listen = 1;
}
goto top;
}
}
/* discard connections */
static void
disconnect_all(CState * state)
{
int i;
for (i = 0; i < nclients; i++)
{
if (state[i].con)
PQfinish(state[i].con);
}
}
/* create tables and setup data */
static void
init(void)
{
PGconn *con;
PGresult *res;
static char *DDLs[] = {
"drop table branches",
"create table branches(bid int not null,bbalance int,filler char(88))",
"drop table tellers",
"create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
"drop table accounts",
"create table accounts(aid int not null,bid int,abalance int,filler char(84))",
"drop table history",
"create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
static char *DDLAFTERs[] = {
"alter table branches add primary key (bid)",
"alter table tellers add primary key (tid)",
"alter table accounts add primary key (aid)"};
char sql[256];
int i;
if ((con = doConnect()) == NULL)
exit(1);
for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
{
res = PQexec(con, DDLs[i]);
if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
res = PQexec(con, "begin");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
for (i = 0; i < nbranches * tps; i++)
{
snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
res = PQexec(con, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
for (i = 0; i < ntellers * tps; i++)
{
snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
,i + 1, i / ntellers + 1);
res = PQexec(con, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
res = PQexec(con, "end");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
/*
* occupy accounts table with some data
*/
fprintf(stderr, "creating tables...\n");
for (i = 0; i < naccounts * tps; i++)
{
int j = i + 1;
if (j % 10000 == 1)
{
res = PQexec(con, "copy accounts from stdin");
if (PQresultStatus(res) != PGRES_COPY_IN)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
if (PQputline(con, sql))
{
fprintf(stderr, "PQputline failed\n");
exit(1);
}
if (j % 10000 == 0)
{
/*
* every 10000 tuples, we commit the copy command. this should
* avoid generating too much WAL logs
*/
fprintf(stderr, "%d tuples done.\n", j);
if (PQputline(con, "\\.\n"))
{
fprintf(stderr, "very last PQputline failed\n");
exit(1);
}
if (PQendcopy(con))
{
fprintf(stderr, "PQendcopy failed\n");
exit(1);
}
#ifdef NOT_USED
/*
* do a checkpoint to purge the old WAL logs
*/
res = PQexec(con, "checkpoint");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
#endif /* NOT_USED */
}
}
fprintf(stderr, "set primary key...\n");
for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
{
res = PQexec(con, DDLAFTERs[i]);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
/* vacuum */
fprintf(stderr, "vacuum...");
res = PQexec(con, "vacuum analyze");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "done.\n");
PQfinish(con);
}
static Command *
process_commands(char *buf)
{
const char delim[] = " \f\n\r\t\v";
Command *my_commands;
int j;
char *p,
*tok;
if ((p = strchr(buf, '\n')) != NULL)
*p = '\0';
p = buf;
while (isspace((unsigned char) *p))
p++;
if (*p == '\0' || strncmp(p, "--", 2) == 0)
{
return NULL;
}
my_commands = (Command *) malloc(sizeof(Command));
if (my_commands == NULL)
{
return NULL;
}
my_commands->argc = 0;
if (*p == '\\')
{
my_commands->type = META_COMMAND;
j = 0;
tok = strtok(++p, delim);
while (tok != NULL)
{
if ((my_commands->argv[j] = strdup(tok)) == NULL)
return NULL;
my_commands->argc++;
j++;
tok = strtok(NULL, delim);
}
if (strcasecmp(my_commands->argv[0], "setrandom") == 0)
{
int min,
max;
if (my_commands->argc < 4)
{
fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
return NULL;
}
for (j = 4; j < my_commands->argc; j++)
fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
my_commands->argv[0], my_commands->argv[j]);
if ((min = atoi(my_commands->argv[2])) < 0)
{
fprintf(stderr, "%s: invalid minimum number %s\n",
my_commands->argv[0], my_commands->argv[2]);
return NULL;
}
if ((max = atoi(my_commands->argv[3])) < min || max > MAX_RANDOM_VALUE)
{
fprintf(stderr, "%s: invalid maximum number %s\n",
my_commands->argv[0], my_commands->argv[3]);
return NULL;
}
}
else
{
fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
return NULL;
}
}
else
{
my_commands->type = SQL_COMMAND;
if ((my_commands->argv[0] = strdup(p)) == NULL)
return NULL;
my_commands->argc++;
}
return my_commands;
}
static int
process_file(char *filename)
{
#define COMMANDS_ALLOC_NUM 128
Command **my_commands;
FILE *fd;
int lineno;
char buf[BUFSIZ];
int alloc_num;
if (num_files >= MAX_FILES)
{
fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
exit(1);
}
alloc_num = COMMANDS_ALLOC_NUM;
my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
if (my_commands == NULL)
return false;
if (strcmp(filename, "-") == 0)
fd = stdin;
else if ((fd = fopen(filename, "r")) == NULL)
{
fprintf(stderr, "%s: %s\n", filename, strerror(errno));
return false;
}
lineno = 0;
while (fgets(buf, sizeof(buf), fd) != NULL)
{
Command *commands;
if (strncmp(buf, "\n", 1) != 0) {
commands = process_commands(buf);
if (commands == NULL)
{
fclose(fd);
return false;
}
} else
continue;
my_commands[lineno] = commands;
lineno++;
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
if (my_commands == NULL)
{
fclose(fd);
return false;
}
}
}
fclose(fd);
my_commands[lineno] = NULL;
sql_files[num_files++] = my_commands;
return true;
}
static Command **
process_builtin(char *tb)
{
#define COMMANDS_ALLOC_NUM 128
Command **my_commands;
int lineno;
char buf[BUFSIZ];
int alloc_num;
if (*tb == '\0')
return NULL;
alloc_num = COMMANDS_ALLOC_NUM;
my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
if (my_commands == NULL)
return NULL;
lineno = 0;
for (;;)
{
char *p;
Command *commands;
p = buf;
while (*tb && *tb != '\n')
*p++ = *tb++;
if (*tb == '\0')
break;
if (*tb == '\n')
tb++;
*p = '\0';
commands = process_commands(buf);
if (commands == NULL)
{
return NULL;
}
my_commands[lineno] = commands;
lineno++;
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
if (my_commands == NULL)
{
return NULL;
}
}
}
my_commands[lineno] = NULL;
return my_commands;
}
/* print out results */
static void
printResults(
int ttype, CState * state,
struct timeval * tv1, struct timeval * tv2,
struct timeval * tv3)
{
double t1,
t2;
int i;
int normal_xacts = 0;
char *s;
for (i = 0; i < nclients; i++)
normal_xacts += state[i].cnt;
t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
t1 = normal_xacts * 1000000.0 / t1;
t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
t2 = normal_xacts * 1000000.0 / t2;
if (ttype == 0)
s = "TPC-B (sort of)";
else if (ttype == 2)
s = "Update only accounts";
else if (ttype == 1)
s = "SELECT only";
else
s = "Custom query";
printf("transaction type: %s\n", s);
printf("scaling factor: %d\n", tps);
printf("number of clients: %d\n", nclients);
printf("number of transactions per client: %d\n", nxacts);
printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
printf("tps = %f (including connections establishing)\n", t1);
printf("tps = %f (excluding connections establishing)\n", t2);
}
int
main(int argc, char **argv)
{
int c;
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int is_full_vacuum = 0; /* do full vacuum before testing? */
int debug = 0; /* debug flag */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
* 2: skip update of branches and tellers */
char *filename = NULL;
CState *state; /* status of clients */
struct timeval tv1; /* start up time */
struct timeval tv2; /* after establishing all connections to the
* backend */
struct timeval tv3; /* end time */
int i;
fd_set input_mask;
int nsocks; /* return from select(2) */
int maxsock; /* max socket number to be waited */
#if !(defined(__CYGWIN__) || defined(__MINGW32__))
struct rlimit rlim;
#endif
PGconn *con;
PGresult *res;
char *env;
if ((env = getenv("PGHOST")) != NULL && *env != '\0')
pghost = env;
if ((env = getenv("PGPORT")) != NULL && *env != '\0')
pgport = env;
else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
login = env;
while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:")) != -1)
{
switch (c)
{
case 'i':
is_init_mode++;
break;
case 'h':
pghost = optarg;
break;
case 'n':
is_no_vacuum++;
break;
case 'v':
is_full_vacuum++;
break;
case 'p':
pgport = optarg;
break;
case 'd':
debug++;
break;
case 'S':
ttype = 1;
break;
case 'N':
ttype = 2;
break;
case 'c':
nclients = atoi(optarg);
if (nclients <= 0 || nclients > MAXCLIENTS)
{
fprintf(stderr, "invalid number of clients: %d\n", nclients);
exit(1);
}
#if !(defined(__CYGWIN__) || defined(__MINGW32__))
#ifdef RLIMIT_NOFILE /* most platform uses RLIMIT_NOFILE */
if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
#else /* but BSD doesn't ... */
if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
#endif /* RLIMIT_NOFILE */
{
fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
exit(1);
}
if (rlim.rlim_cur <= (nclients + 2))
{
fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
exit(1);
}
#endif
break;
case 'C':
is_connect = 1;
break;
case 's':
tps = atoi(optarg);
if (tps <= 0)
{
fprintf(stderr, "invalid scaling factor: %d\n", tps);
exit(1);
}
break;
case 't':
nxacts = atoi(optarg);
if (nxacts <= 0)
{
fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
exit(1);
}
break;
case 'U':
login = optarg;
break;
case 'P':
pwd = optarg;
break;
case 'l':
use_log = true;
break;
case 'f':
ttype = 3;
filename = optarg;
if (process_file(filename) == false)
exit(1);
break;
default:
usage();
exit(1);
break;
}
}
if (argc > optind)
dbName = argv[optind];
else
{
if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
dbName = env;
else if (login != NULL && *login != '\0')
dbName = login;
else
dbName = "";
}
if (is_init_mode)
{
init();
exit(0);
}
remains = nclients;
state = (CState *) malloc(sizeof(CState) * nclients);
if (state == NULL)
{
fprintf(stderr, "Couldn't allocate memory for state\n");
exit(1);
}
memset(state, 0, sizeof(*state) * nclients);
if (use_log)
{
char logpath[64];
snprintf(logpath, 64, "pgbench_log.%d", getpid());
LOGFILE = fopen(logpath, "w");
if (LOGFILE == NULL)
{
fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
exit(1);
}
}
if (debug)
{
printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
pghost, pgport, nclients, nxacts, dbName);
}
/* opening connection... */
con = doConnect();
if (con == NULL)
exit(1);
if (PQstatus(con) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
if (ttype != 3)
{
/*
* get the scaling factor that should be same as count(*) from
* branches if this is not a custom query
*/
res = PQexec(con, "select count(*) from branches");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
tps = atoi(PQgetvalue(res, 0, 0));
if (tps < 0)
{
fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
exit(1);
}
PQclear(res);
}
if (!is_no_vacuum)
{
fprintf(stderr, "starting vacuum...");
res = PQexec(con, "vacuum branches");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum tellers");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "delete from history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "end.\n");
if (is_full_vacuum)
{
fprintf(stderr, "starting full vacuum...");
res = PQexec(con, "vacuum analyze accounts");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "end.\n");
}
}
PQfinish(con);
/* set random seed */
gettimeofday(&tv1, NULL);
srand((unsigned int) tv1.tv_usec);
/* get start up time */
gettimeofday(&tv1, NULL);
if (is_connect == 0)
{
/* make connections to the database */
for (i = 0; i < nclients; i++)
{
state[i].id = i;
if ((state[i].con = doConnect()) == NULL)
exit(1);
}
}
/* time after connections set up */
gettimeofday(&tv2, NULL);
/* process bultin SQL scripts */
switch (ttype)
{
char buf[128];
case 0:
sql_files[0] = process_builtin(tpc_b);
snprintf(buf, sizeof(buf), "%d", 100000 * tps);
sql_files[0][0]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 1 * tps);
sql_files[0][1]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 10 * tps);
sql_files[0][2]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 10000 * tps);
sql_files[0][3]->argv[3] = strdup(buf);
num_files = 1;
break;
case 1:
sql_files[0] = process_builtin(select_only);
snprintf(buf, sizeof(buf), "%d", 100000 * tps);
sql_files[0][0]->argv[3] = strdup(buf);
num_files = 1;
break;
case 2:
sql_files[0] = process_builtin(simple_update);
snprintf(buf, sizeof(buf), "%d", 100000 * tps);
sql_files[0][0]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 1 * tps);
sql_files[0][1]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 10 * tps);
sql_files[0][2]->argv[3] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", 10000 * tps);
sql_files[0][3]->argv[3] = strdup(buf);
num_files = 1;
break;
default:
break;
}
/* send start up queries in async manner */
for (i = 0; i < nclients; i++)
{
state[i].use_file = getrand(0, num_files - 1);
doCustom(state, i, debug);
}
for (;;)
{
if (remains <= 0)
{ /* all done ? */
disconnect_all(state);
/* get end time */
gettimeofday(&tv3, NULL);
printResults(ttype, state, &tv1, &tv2, &tv3);
if (LOGFILE)
fclose(LOGFILE);
exit(0);
}
FD_ZERO(&input_mask);
maxsock = -1;
for (i = 0; i < nclients; i++)
{
Command **commands = sql_files[state[i].use_file];
if (state[i].con && commands[state[i].state]->type != META_COMMAND)
{
int sock = PQsocket(state[i].con);
if (sock < 0)
{
disconnect_all(state);
exit(1);
}
FD_SET(sock, &input_mask);
if (maxsock < sock)
maxsock = sock;
}
}
if (maxsock != -1)
{
if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
(fd_set *) NULL, (struct timeval *) NULL)) < 0)
{
if (errno == EINTR)
continue;
/* must be something wrong */
disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno));
exit(1);
}
else if (nsocks == 0)
{ /* timeout */
fprintf(stderr, "select timeout\n");
for (i = 0; i < nclients; i++)
{
fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
}
exit(0);
}
}
/* ok, backend returns reply */
for (i = 0; i < nclients; i++)
{
Command **commands = sql_files[state[i].use_file];
if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
|| commands[state[i].state]->type == META_COMMAND))
{
doCustom(state, i, debug);
}
}
}
}