From c393e68c8da0c691fdccdfd4a584d035b18f982e Mon Sep 17 00:00:00 2001
From: Marina Polyakova <m.polyakova@postgrespro.ru>
Date: Fri, 9 Jun 2017 14:42:30 +0300
Subject: [PATCH v1 1/4] Pgbench Serialization and deadlock errors

Now session is not disconnected because of serialization or deadlock errors.
If there were such errors during script execution this "transaction" is marked
appropriately in logs. Numbers of "transactions" with such errors are printed in
progress, in aggregation logs and in the end with other results (all and for
each script).
---
 src/bin/pgbench/pgbench.c                     | 169 +++++++++++++++++++++-----
 src/bin/pgbench/t/002_serialization_errors.pl |  75 ++++++++++++
 src/bin/pgbench/t/003_deadlock_errors.pl      |  93 ++++++++++++++
 3 files changed, 308 insertions(+), 29 deletions(-)
 create mode 100644 src/bin/pgbench/t/002_serialization_errors.pl
 create mode 100644 src/bin/pgbench/t/003_deadlock_errors.pl

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index ae36247..bbf444b 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -58,6 +58,9 @@
 
 #include "pgbench.h"
 
+#define ERRCODE_IN_FAILED_SQL_TRANSACTION  "25P02"
+#define ERRCODE_T_R_SERIALIZATION_FAILURE  "40001"
+#define ERRCODE_T_R_DEADLOCK_DETECTED  "40P01"
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
 
 /*
@@ -232,6 +235,10 @@ typedef struct StatsData
 	int64		cnt;			/* number of transactions */
 	int64		skipped;		/* number of transactions skipped under --rate
 								 * and --latency-limit */
+	int64		serialization_failures;	/* number of transactions with
+										 * serialization failures */
+	int64		deadlock_failures; /* number of transactions with deadlock
+									* failures */
 	SimpleStats latency;
 	SimpleStats lag;
 } StatsData;
@@ -330,6 +337,10 @@ typedef struct
 
 	/* per client collected stats */
 	int64		cnt;			/* transaction count */
+	bool		serialization_failure;	/* if there was serialization failure
+										 * during script execution */
+	bool		deadlock_failure;	/* if there was deadlock failure during
+									 * script execution */
 	int			ecnt;			/* error count */
 } CState;
 
@@ -786,6 +797,8 @@ initStats(StatsData *sd, time_t start_time)
 	sd->start_time = start_time;
 	sd->cnt = 0;
 	sd->skipped = 0;
+	sd->serialization_failures = 0;
+	sd->deadlock_failures = 0;
 	initSimpleStats(&sd->latency);
 	initSimpleStats(&sd->lag);
 }
@@ -794,14 +807,20 @@ initStats(StatsData *sd, time_t start_time)
  * Accumulate one additional item into the given stats object.
  */
 static void
-accumStats(StatsData *stats, bool skipped, double lat, double lag)
+accumStats(StatsData *stats, bool skipped, bool serialization_failure,
+		   bool deadlock_failure, double lat, double lag)
 {
 	stats->cnt++;
 
-	if (skipped)
+	if (skipped || serialization_failure || deadlock_failure)
 	{
-		/* no latency to record on skipped transactions */
-		stats->skipped++;
+		/* no latency to record on such transactions */
+		if (skipped)
+			stats->skipped++;
+		if (serialization_failure)
+			stats->serialization_failures++;
+		if (deadlock_failure)
+			stats->deadlock_failures++;
 	}
 	else
 	{
@@ -1962,6 +1981,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 	instr_time	now;
 	bool		end_tx_processed = false;
 	int64		wait;
+	bool		serialization_failure = false;
+	bool		deadlock_failure = false;
+	bool		in_failed_transaction = false;
+	ExecStatusType result_status;
+	char	   *sqlState;
 
 	/*
 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
@@ -2121,6 +2145,10 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 						st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
 				}
 
+				/* reset transaction variables to default values */
+				st->serialization_failure = false;
+				st->deadlock_failure = false;
+
 				/* Begin with the first command */
 				st->command = 0;
 				st->state = CSTATE_START_COMMAND;
@@ -2142,6 +2170,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 					break;
 				}
 
+				/* reset command result variables to default values */
+				serialization_failure = false;
+				deadlock_failure = false;
+				in_failed_transaction = false;
+
 				/*
 				 * Record statement start time if per-command latencies are
 				 * requested
@@ -2299,21 +2332,34 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 * Read and discard the query result;
 				 */
 				res = PQgetResult(st->con);
-				switch (PQresultStatus(res))
+				result_status = PQresultStatus(res);
+				sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+				if (sqlState) {
+					serialization_failure =
+						strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0;
+					deadlock_failure =
+						strcmp(sqlState, ERRCODE_T_R_DEADLOCK_DETECTED) == 0;
+					in_failed_transaction =
+						strcmp(sqlState, ERRCODE_IN_FAILED_SQL_TRANSACTION) == 0;
+				}
+
+				if (result_status == PGRES_COMMAND_OK ||
+					result_status == PGRES_TUPLES_OK ||
+					result_status == PGRES_EMPTY_QUERY ||
+					serialization_failure ||
+					deadlock_failure ||
+					in_failed_transaction)
 				{
-					case PGRES_COMMAND_OK:
-					case PGRES_TUPLES_OK:
-					case PGRES_EMPTY_QUERY:
-						/* OK */
-						PQclear(res);
-						discard_response(st);
-						st->state = CSTATE_END_COMMAND;
-						break;
-					default:
-						commandFailed(st, PQerrorMessage(st->con));
-						PQclear(res);
-						st->state = CSTATE_ABORTED;
-						break;
+					/* OK */
+					PQclear(res);
+					discard_response(st);
+					st->state = CSTATE_END_COMMAND;
+				}
+				else
+				{
+					commandFailed(st, PQerrorMessage(st->con));
+					PQclear(res);
+					st->state = CSTATE_ABORTED;
 				}
 				break;
 
@@ -2342,7 +2388,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 * in thread-local data structure, if per-command latencies
 				 * are requested.
 				 */
-				if (is_latencies)
+				if (is_latencies && !serialization_failure && !deadlock_failure)
 				{
 					if (INSTR_TIME_IS_ZERO(now))
 						INSTR_TIME_SET_CURRENT(now);
@@ -2354,6 +2400,12 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
 				}
 
+				/* remember for transaction if there were failures */
+				if (serialization_failure)
+					st->serialization_failure = true;
+				if (deadlock_failure)
+					st->deadlock_failure = true;
+
 				/* Go ahead with next command */
 				st->command++;
 				st->state = CSTATE_START_COMMAND;
@@ -2370,9 +2422,17 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 */
 				if (progress || throttle_delay || latency_limit ||
 					per_script_stats || use_log)
+				{
 					processXactStats(thread, st, &now, false, agg);
+				}
 				else
+				{
 					thread->stats.cnt++;
+					if (st->serialization_failure)
+						thread->stats.serialization_failures++;
+					if (st->deadlock_failure)
+						thread->stats.deadlock_failures++;
+				}
 
 				if (is_connect)
 				{
@@ -2462,9 +2522,11 @@ doLog(TState *thread, CState *st,
 		while (agg->start_time + agg_interval <= now)
 		{
 			/* print aggregated report to logfile */
-			fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			fprintf(logfile, "%ld " INT64_FORMAT " " INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
 					(long) agg->start_time,
 					agg->cnt,
+					agg->serialization_failures,
+					agg->deadlock_failures,
 					agg->latency.sum,
 					agg->latency.sum2,
 					agg->latency.min,
@@ -2486,17 +2548,28 @@ doLog(TState *thread, CState *st,
 		}
 
 		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		accumStats(agg, skipped, st->serialization_failure,
+				   st->deadlock_failure, latency, lag);
 	}
 	else
 	{
 		/* no, print raw transactions */
 		struct timeval tv;
+		char		transaction_label[256];
 
-		gettimeofday(&tv, NULL);
 		if (skipped)
-			fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
-					st->id, st->cnt, st->use_file,
+			snprintf(transaction_label, sizeof(transaction_label), "skipped");
+		else if (st->serialization_failure && st->deadlock_failure)
+			snprintf(transaction_label, sizeof(transaction_label),
+					 "serialization and deadlock failures");
+		else if (st->serialization_failure || st->deadlock_failure)
+			snprintf(transaction_label, sizeof(transaction_label), "%s failure",
+					 st->serialization_failure ? "serialization" : "deadlock");
+
+		gettimeofday(&tv, NULL);
+		if (skipped || st->serialization_failure || st->deadlock_failure)
+			fprintf(logfile, "%d " INT64_FORMAT " %s %d %ld %ld",
+					st->id, st->cnt, transaction_label, st->use_file,
 					(long) tv.tv_sec, (long) tv.tv_usec);
 		else
 			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
@@ -2523,7 +2596,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
 	if ((!skipped) && INSTR_TIME_IS_ZERO(*now))
 		INSTR_TIME_SET_CURRENT(*now);
 
-	if (!skipped)
+	if (!skipped && !st->serialization_failure && !st->deadlock_failure)
 	{
 		/* compute latency & lag */
 		latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
@@ -2532,21 +2605,30 @@ processXactStats(TState *thread, CState *st, instr_time *now,
 
 	if (progress || throttle_delay || latency_limit)
 	{
-		accumStats(&thread->stats, skipped, latency, lag);
+		accumStats(&thread->stats, skipped, st->serialization_failure,
+				   st->deadlock_failure, latency, lag);
 
 		/* count transactions over the latency limit, if needed */
 		if (latency_limit && latency > latency_limit)
 			thread->latency_late++;
 	}
 	else
+	{
 		thread->stats.cnt++;
+		if (st->serialization_failure)
+			thread->stats.serialization_failures++;
+		if (st->deadlock_failure)
+			thread->stats.deadlock_failures++;
+	}
 
 	if (use_log)
 		doLog(thread, st, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
-		accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
+		accumStats(&sql_script[st->use_file].stats, skipped,
+				   st->serialization_failure, st->deadlock_failure, latency,
+				   lag);
 }
 
 
@@ -3522,6 +3604,14 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
 	if (total->cnt <= 0)
 		return;
 
+	printf("number of transactions with serialization failures: " INT64_FORMAT " (%.3f %%)\n",
+		   total->serialization_failures,
+		   (100.0 * total->serialization_failures / total->cnt));
+
+	printf("number of transactions with deadlock failures: " INT64_FORMAT " (%.3f %%)\n",
+		   total->deadlock_failures,
+		   (100.0 * total->deadlock_failures / total->cnt));
+
 	if (throttle_delay && latency_limit)
 		printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
 			   total->skipped,
@@ -3576,6 +3666,16 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
 			else
 				printf("script statistics:\n");
 
+			printf(" - number of transactions with serialization failures: " INT64_FORMAT " (%.3f%%)\n",
+				   sql_script[i].stats.serialization_failures,
+				   (100.0 * sql_script[i].stats.serialization_failures /
+					sql_script[i].stats.cnt));
+
+			printf(" - number of transactions with deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
+				   sql_script[i].stats.deadlock_failures,
+				   (100.0 * sql_script[i].stats.deadlock_failures /
+					sql_script[i].stats.cnt));
+
 			if (latency_limit)
 				printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
 					   sql_script[i].stats.skipped,
@@ -4340,6 +4440,8 @@ main(int argc, char **argv)
 		mergeSimpleStats(&stats.lag, &thread->stats.lag);
 		stats.cnt += thread->stats.cnt;
 		stats.skipped += thread->stats.skipped;
+		stats.serialization_failures += thread->stats.serialization_failures;
+		stats.deadlock_failures += thread->stats.deadlock_failures;
 		latency_late += thread->latency_late;
 		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
 	}
@@ -4639,6 +4741,9 @@ threadRun(void *arg)
 					mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
 					cur.cnt += thread[i].stats.cnt;
 					cur.skipped += thread[i].stats.skipped;
+					cur.serialization_failures +=
+						thread[i].stats.serialization_failures;
+					cur.deadlock_failures += thread[i].stats.deadlock_failures;
 				}
 
 				total_run = (now - thread_start) / 1000000.0;
@@ -4669,8 +4774,14 @@ threadRun(void *arg)
 					snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
 
 				fprintf(stderr,
-						"progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
-						tbuf, tps, latency, stdev);
+						"progress: %s, %.1f tps, " INT64_FORMAT " serialization failures transactions, " INT64_FORMAT " deadlock failures transactions, lat %.3f ms stddev %.3f",
+						tbuf,
+						tps,
+						(cur.serialization_failures -
+						 last.serialization_failures),
+						(cur.deadlock_failures - last.deadlock_failures),
+						latency,
+						stdev);
 
 				if (throttle_delay)
 				{
diff --git a/src/bin/pgbench/t/002_serialization_errors.pl b/src/bin/pgbench/t/002_serialization_errors.pl
new file mode 100644
index 0000000..8d0d99f
--- /dev/null
+++ b/src/bin/pgbench/t/002_serialization_errors.pl
@@ -0,0 +1,75 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 18;
+
+# Test concurrent update in table row with different transaction isolation
+# levels.
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+$node->safe_psql('postgres',
+	    'CREATE UNLOGGED TABLE xy (x integer, y integer); '
+	  . 'INSERT INTO xy VALUES (1, 2);');
+
+# Test serialization errors on transactions with Read committed isolation level
+my $script_read_committed = $node->basedir . '/pgbench_script_read_committed';
+append_to_file($script_read_committed,
+		"\\set delta random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;\n"
+	  . "UPDATE xy SET y = y + :delta WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_read_committed ],
+	qr{processed: 50/50},
+	'concurrent update: Read Committed: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_read_committed ],
+	qr{serialization failures: 0 \(0\.000 %\)},
+	'concurrent update: Read Committed: check serialization failures');
+
+# Test serialization errors on transactions with Repeatable read isolation level
+my $script_repeatable_read = $node->basedir . '/pgbench_script_repeatable_read';
+append_to_file($script_repeatable_read,
+		"\\set delta random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\n"
+	  . "UPDATE xy SET y = y + :delta WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_repeatable_read ],
+	qr{processed: 50/50},
+	'concurrent update: Repeatable Read: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_repeatable_read ],
+	qr{serialization failures: [1-9]\d* \([1-9]\d*\.\d* %\)},
+	'concurrent update: Repeatable Read: check serialization failures');
+
+# Test serialization errors on transactions with Serializable isolation level
+my $script_serializable = $node->basedir . '/pgbench_script_serializable';
+append_to_file($script_serializable,
+		"\\set delta random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n"
+	  . "UPDATE xy SET y = y + :delta WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_serializable ],
+	qr{processed: 50/50},
+	'concurrent update: Serializable: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_serializable ],
+	qr{serialization failures: [1-9]\d* \([1-9]\d*\.\d* %\)},
+	'concurrent update: Serializable: check serialization failures');
diff --git a/src/bin/pgbench/t/003_deadlock_errors.pl b/src/bin/pgbench/t/003_deadlock_errors.pl
new file mode 100644
index 0000000..791d456
--- /dev/null
+++ b/src/bin/pgbench/t/003_deadlock_errors.pl
@@ -0,0 +1,93 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 18;
+
+# Test concurrent deadlock updates in table with different transaction isolation
+# levels.
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+$node->safe_psql('postgres',
+	    'CREATE UNLOGGED TABLE xy (x integer, y integer); '
+	  . 'INSERT INTO xy VALUES (1, 2), (2, 3);');
+
+# Test deadlock errors on transactions with Read committed isolation level
+my $script_read_committed = $node->basedir . '/pgbench_script_read_committed';
+append_to_file($script_read_committed,
+		"\\set delta1 random(-5000, 5000)\n"
+	  . "\\set delta2 random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "END;\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_read_committed ],
+	qr{processed: 50/50},
+	'concurrent deadlock update: Read Committed: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_read_committed ],
+	qr{deadlock failures: [1-9]\d* \([1-9]\d*\.\d* %\)},
+	'concurrent deadlock update: Read Committed: check deadlock failures');
+
+# Test deadlock errors on transactions with Repeatable read isolation level
+my $script_repeatable_read = $node->basedir . '/pgbench_script_repeatable_read';
+append_to_file($script_repeatable_read,
+		"\\set delta1 random(-5000, 5000)\n"
+	  . "\\set delta2 random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "END;\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_repeatable_read ],
+	qr{processed: 50/50},
+	'concurrent deadlock update: Repeatable Read: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_repeatable_read ],
+	qr{deadlock failures: [1-9]\d* \([1-9]\d*\.\d* %\)},
+	'concurrent deadlock update: Repeatable Read: check deadlock failures');
+
+# Test deadlock errors on transactions with Serializable isolation level
+my $script_serializable = $node->basedir . '/pgbench_script_serializable';
+append_to_file($script_serializable,
+		"\\set delta1 random(-5000, 5000)\n"
+	  . "\\set delta2 random(-5000, 5000)\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "END;\n"
+	  . "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n"
+	  . "UPDATE xy SET y = y + :delta2 WHERE x = 2;\n"
+	  . "UPDATE xy SET y = y + :delta1 WHERE x = 1;\n"
+	  . "END;\n");
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_serializable ],
+	qr{processed: 50/50},
+	'concurrent update: Serializable: check processed transactions');
+
+$node->command_like(
+	[   qw(pgbench --no-vacuum --client=5 --transactions=10 --file),
+		$script_serializable ],
+	qr{deadlock failures: [1-9]\d* \([1-9]\d*\.\d* %\)},
+	'concurrent update: Serializable: check deadlock failures');
\ No newline at end of file
-- 
1.9.1

