diff options
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r-- | contrib/pgbench/pgbench.c | 186 |
1 files changed, 94 insertions, 92 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index b38086490a4..b290b7477b1 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -4,7 +4,7 @@ * A simple benchmark program for PostgreSQL * Originally written by Tatsuo Ishii and enhanced by many contributors. * - * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.96 2010/01/06 01:30:03 itagaki Exp $ + * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.97 2010/02/26 02:00:32 momjian Exp $ * Copyright (c) 2000-2010, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -28,7 +28,7 @@ */ #ifdef WIN32 -#define FD_SETSIZE 1024 /* set before winsock2.h is included */ +#define FD_SETSIZE 1024 /* set before winsock2.h is included */ #endif /* ! WIN32 */ #include "postgres_fe.h" @@ -66,16 +66,14 @@ #ifdef WIN32 /* Use native win32 threads on Windows */ -typedef struct win32_pthread *pthread_t; -typedef int pthread_attr_t; - -static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); -static int pthread_join(pthread_t th, void **thread_return); +typedef struct win32_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); #elif defined(ENABLE_THREAD_SAFETY) /* Use platform-dependent pthread capability */ #include <pthread.h> - #else /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */ @@ -86,12 +84,11 @@ static int pthread_join(pthread_t th, void **thread_return); #define pthread_create pg_pthread_create #define pthread_join pg_pthread_join -typedef struct fork_pthread *pthread_t; -typedef int pthread_attr_t; - -static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); -static int pthread_join(pthread_t th, void **thread_return); +typedef struct fork_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); #endif extern char *optarg; @@ -129,7 +126,8 @@ int fillfactor = 100; * end of configurable parameters *********************************************************************/ -#define nbranches 1 /* Makes little sense to change this. Change -s instead */ +#define nbranches 1 /* Makes little sense to change this. Change + * -s instead */ #define ntellers 10 #define naccounts 100000 @@ -156,7 +154,7 @@ typedef struct } Variable; #define MAX_FILES 128 /* max number of SQL script files allowed */ -#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ +#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ /* * structures used in custom query mode @@ -185,18 +183,18 @@ typedef struct */ typedef struct { - pthread_t thread; /* thread handle */ - CState *state; /* array of CState */ - int nstate; /* length of state[] */ - instr_time start_time; /* thread start time */ + pthread_t thread; /* thread handle */ + CState *state; /* array of CState */ + int nstate; /* length of state[] */ + instr_time start_time; /* thread start time */ } TState; #define INVALID_THREAD ((pthread_t) 0) typedef struct { - instr_time conn_time; - int xacts; + instr_time conn_time; + int xacts; } TResult; /* @@ -224,9 +222,9 @@ typedef struct char *argv[MAX_ARGS]; /* command list */ } Command; -static Command **sql_files[MAX_FILES]; /* SQL script files */ -static int num_files; /* number of script files */ -static int debug = 0; /* debug flag */ +static Command **sql_files[MAX_FILES]; /* SQL script files */ +static int num_files; /* number of script files */ +static int debug = 0; /* debug flag */ /* default scenario */ static char *tpc_b = { @@ -271,7 +269,7 @@ static char *select_only = { /* Function prototypes */ static void setalarm(int seconds); -static void* threadRun(void *arg); +static void *threadRun(void *arg); static void usage(const char *progname) @@ -432,7 +430,7 @@ getVariable(CState *st, char *name) static bool isLegalVariableName(const char *name) { - int i; + int i; for (i = 0; name[i] != '\0'; i++) { @@ -624,29 +622,28 @@ getQueryParams(CState *st, const Command *command, const char **params) static bool runShellCommand(CState *st, char *variable, char **argv, int argc) { - char command[SHELL_COMMAND_SIZE]; - int i, - len = 0; - FILE *fp; - char res[64]; - char *endptr; - int retval; + char command[SHELL_COMMAND_SIZE]; + int i, + len = 0; + FILE *fp; + char res[64]; + char *endptr; + int retval; /* * Join arguments with whilespace separaters. Arguments starting with - * exactly one colon are treated as variables: - * name - append a string "name" - * :var - append a variable named 'var'. - * ::name - append a string ":name" + * exactly one colon are treated as variables: name - append a string + * "name" :var - append a variable named 'var'. ::name - append a string + * ":name" */ for (i = 0; i < argc; i++) { - char *arg; - int arglen; + char *arg; + int arglen; if (argv[i][0] != ':') { - arg = argv[i]; /* a string literal */ + arg = argv[i]; /* a string literal */ } else if (argv[i][1] == ':') { @@ -732,14 +729,14 @@ preparedStatementName(char *buffer, int file, int state) static bool clientDone(CState *st, bool ok) { - (void) ok; /* unused */ + (void) ok; /* unused */ if (st->con != NULL) { PQfinish(st->con); st->con = NULL; } - return false; /* always false */ + return false; /* always false */ } /* return false iff client should be disconnected */ @@ -811,10 +808,10 @@ top: { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: - break; /* OK */ + break; /* OK */ default: fprintf(stderr, "Client %d aborted in state %d: %s", - st->id, st->state, PQerrorMessage(st->con)); + st->id, st->state, PQerrorMessage(st->con)); PQclear(res); return clientDone(st, false); } @@ -847,7 +844,8 @@ top: if (st->con == NULL) { - instr_time start, end; + instr_time start, + end; INSTR_TIME_SET_CURRENT(start); if ((st->con = doConnect()) == NULL) @@ -1091,7 +1089,7 @@ top: { char *var; int usec; - instr_time now; + instr_time now; if (*argv[1] == ':') { @@ -1124,9 +1122,9 @@ top: } else if (pg_strcasecmp(argv[0], "setshell") == 0) { - bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); + bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); - if (timer_exceeded) /* timeout */ + if (timer_exceeded) /* timeout */ return clientDone(st, true); else if (!ret) /* on error */ { @@ -1138,9 +1136,9 @@ top: } else if (pg_strcasecmp(argv[0], "shell") == 0) { - bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); + bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); - if (timer_exceeded) /* timeout */ + if (timer_exceeded) /* timeout */ return clientDone(st, true); else if (!ret) /* on error */ { @@ -1442,7 +1440,7 @@ process_commands(char *buf) */ if (my_commands->argv[1][0] != ':') { - char *c = my_commands->argv[1]; + char *c = my_commands->argv[1]; while (isdigit((unsigned char) *c)) c++; @@ -1667,7 +1665,7 @@ printResults(int ttype, int normal_xacts, int nclients, int nthreads, time_include = INSTR_TIME_GET_DOUBLE(total_time); tps_include = normal_xacts / time_include; tps_exclude = normal_xacts / (time_include - - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); + (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); if (ttype == 0) s = "TPC-B (sort of)"; @@ -1704,8 +1702,8 @@ int main(int argc, char **argv) { int c; - int nclients = 1; /* default number of simulated clients */ - int nthreads = 1; /* default number of threads */ + int nclients = 1; /* default number of simulated clients */ + int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ @@ -1826,7 +1824,7 @@ main(int argc, char **argv) } #endif /* HAVE_GETRLIMIT */ break; - case 'j': /* jobs */ + case 'j': /* jobs */ nthreads = atoi(optarg); if (nthreads <= 0) { @@ -2120,7 +2118,8 @@ main(int argc, char **argv) /* the first thread (i = 0) is executed by main thread */ if (i > 0) { - int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + if (err != 0 || threads[i].thread == INVALID_THREAD) { fprintf(stderr, "cannot create thread: %s\n", strerror(err)); @@ -2138,7 +2137,7 @@ main(int argc, char **argv) INSTR_TIME_SET_ZERO(conn_total_time); for (i = 0; i < nthreads; i++) { - void *ret = NULL; + void *ret = NULL; if (threads[i].thread == INVALID_THREAD) ret = threadRun(&threads[i]); @@ -2147,7 +2146,8 @@ main(int argc, char **argv) if (ret != NULL) { - TResult *r = (TResult *) ret; + TResult *r = (TResult *) ret; + total_xacts += r->xacts; INSTR_TIME_ADD(conn_total_time, r->conn_time); free(ret); @@ -2170,10 +2170,11 @@ threadRun(void *arg) { TState *thread = (TState *) arg; CState *state = thread->state; - TResult *result; - instr_time start, end; + TResult *result; + instr_time start, + end; int nstate = thread->nstate; - int remains = nstate; /* number of remaining clients */ + int remains = nstate; /* number of remaining clients */ int i; result = malloc(sizeof(TResult)); @@ -2202,7 +2203,7 @@ threadRun(void *arg) st->use_file = getrand(0, num_files - 1); if (!doCustom(st, &result->conn_time)) - remains--; /* I've aborted */ + remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { @@ -2215,10 +2216,10 @@ threadRun(void *arg) while (remains > 0) { - fd_set input_mask; - int maxsock; /* max socket number to be waited */ - int64 now_usec = 0; - int64 min_usec; + fd_set input_mask; + int maxsock; /* max socket number to be waited */ + int64 now_usec = 0; + int64 min_usec; FD_ZERO(&input_mask); @@ -2237,6 +2238,7 @@ threadRun(void *arg) if (min_usec == INT64_MAX) { instr_time now; + INSTR_TIME_SET_CURRENT(now); now_usec = INSTR_TIME_GET_MICROSEC(now); } @@ -2262,18 +2264,20 @@ threadRun(void *arg) goto done; } - FD_SET(sock, &input_mask); + FD_SET (sock, &input_mask); + if (maxsock < sock) maxsock = sock; } if (min_usec > 0 && maxsock != -1) { - int nsocks; /* return from select(2) */ + int nsocks; /* return from select(2) */ if (min_usec != INT64_MAX) { - struct timeval timeout; + struct timeval timeout; + timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); @@ -2298,10 +2302,10 @@ threadRun(void *arg) int prev_ecnt = st->ecnt; if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) - || commands[st->state]->type == META_COMMAND)) + || commands[st->state]->type == META_COMMAND)) { if (!doCustom(st, &result->conn_time)) - remains--; /* I've aborted */ + remains--; /* I've aborted */ } if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) @@ -2353,30 +2357,30 @@ setalarm(int seconds) typedef struct fork_pthread { - pid_t pid; - int pipes[2]; -} fork_pthread; + pid_t pid; + int pipes[2]; +} fork_pthread; static int pthread_create(pthread_t *thread, pthread_attr_t *attr, - void * (*start_routine)(void *), + void *(*start_routine) (void *), void *arg) { - fork_pthread *th; - void *ret; - instr_time start_time; + fork_pthread *th; + void *ret; + instr_time start_time; th = (fork_pthread *) malloc(sizeof(fork_pthread)); pipe(th->pipes); th->pid = fork(); - if (th->pid == -1) /* error */ + if (th->pid == -1) /* error */ { free(th); return errno; } - if (th->pid != 0) /* in parent process */ + if (th->pid != 0) /* in parent process */ { close(th->pipes[1]); *thread = th; @@ -2391,11 +2395,11 @@ pthread_create(pthread_t *thread, setalarm(duration); /* - * Set a different random seed in each child process. Otherwise they - * all inherit the parent's state and generate the same "random" - * sequence. (In the threaded case, the different threads will obtain - * subsets of the output of a single random() sequence, which should be - * okay for our purposes.) + * Set a different random seed in each child process. Otherwise they all + * inherit the parent's state and generate the same "random" sequence. + * (In the threaded case, the different threads will obtain subsets of the + * output of a single random() sequence, which should be okay for our + * purposes.) */ INSTR_TIME_SET_CURRENT(start_time); srandom(((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)) + @@ -2411,7 +2415,7 @@ pthread_create(pthread_t *thread, static int pthread_join(pthread_t th, void **thread_return) { - int status; + int status; while (waitpid(th->pid, &status, 0) != th->pid) { @@ -2434,9 +2438,7 @@ pthread_join(pthread_t th, void **thread_return) free(th); return 0; } - #endif - #else /* WIN32 */ static VOID CALLBACK @@ -2468,7 +2470,7 @@ setalarm(int seconds) typedef struct win32_pthread { HANDLE handle; - void *(*routine)(void *); + void *(*routine) (void *); void *arg; void *result; } win32_pthread; @@ -2486,11 +2488,11 @@ win32_pthread_run(void *arg) static int pthread_create(pthread_t *thread, pthread_attr_t *attr, - void * (*start_routine)(void *), + void *(*start_routine) (void *), void *arg) { - int save_errno; - win32_pthread *th; + int save_errno; + win32_pthread *th; th = (win32_pthread *) malloc(sizeof(win32_pthread)); th->routine = start_routine; |