From 79d1554ad648155d209abafc7ef1f602caa8cdda Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Tue, 31 Mar 2020 20:35:41 -0500
Subject: [PATCH v16 7/7] Implement vacuum full/cluster (INDEX_TABLESPACE
 <tablespace>)

TODO: docs , tests
---
 src/backend/commands/cluster.c      | 77 ++++++++++++++++++++---------
 src/backend/commands/matview.c      |  3 +-
 src/backend/commands/tablecmds.c    |  2 +-
 src/backend/commands/vacuum.c       | 65 ++++++++++++++----------
 src/backend/postmaster/autovacuum.c |  1 +
 src/include/commands/cluster.h      |  5 +-
 src/include/commands/vacuum.h       |  6 ++-
 7 files changed, 104 insertions(+), 55 deletions(-)

diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index cef4beb2c8..9083d39d26 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -70,7 +70,8 @@ typedef struct
 } RelToCluster;
 
 
-static void rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTableSpaceOid, bool verbose);
+static Oid cluster_get_tablespace_oid(const char *tablespaceName);
+static void rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTableSpaceOid, Oid NewIdxTableSpaceOid, bool verbose);
 static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
 							bool verbose, bool *pSwapToastByContent,
 							TransactionId *pFreezeXid, MultiXactId *pCutoffMulti);
@@ -106,8 +107,10 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 {
 	ListCell *lc;
 	/* Name and Oid of tablespace to use for clustered relation. */
-	char	*tablespaceName = NULL;
+	char	*tablespaceName = NULL,
+		*idxtablespaceName = NULL;
 	Oid tablespaceOid = InvalidOid;
+	Oid idxtablespaceOid = InvalidOid;
 
 	/* Parse list of generic parameter not handled by the parser */
 	foreach(lc, stmt->params)
@@ -120,6 +123,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 		else if (strcmp(opt->defname, "tablespace") == 0)
 			tablespaceName = defGetString(opt);
 			// XXX: if (tablespaceOid == GLOBALTABLESPACE_OID)
+		else if (strcmp(opt->defname, "index_tablespace") == 0)
+			idxtablespaceName = defGetString(opt);
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -128,18 +133,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 					 parser_errposition(pstate, opt->location)));
 	}
 
-	/* Select tablespace Oid to use. */
-	if (tablespaceName)
-	{
-		tablespaceOid = get_tablespace_oid(tablespaceName, false);
-
-		/* Can't move a non-shared relation into pg_global */
-		if (tablespaceOid == GLOBALTABLESPACE_OID)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("cannot move non-shared relation to tablespace \"%s\"",
-							tablespaceName)));
-	}
+	tablespaceOid = cluster_get_tablespace_oid(tablespaceName);
+	idxtablespaceOid = cluster_get_tablespace_oid(idxtablespaceName);
 
 	if (stmt->relation != NULL)
 	{
@@ -222,7 +217,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 		table_close(rel, NoLock);
 
 		/* Do the job. */
-		cluster_rel(tableOid, indexOid, tablespaceOid, stmt->options);
+		cluster_rel(tableOid, indexOid, tablespaceOid, idxtablespaceOid, stmt->options);
 	}
 	else
 	{
@@ -271,7 +266,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 			PushActiveSnapshot(GetTransactionSnapshot());
 			/* Do the job. */
 			cluster_rel(rvtc->tableOid, rvtc->indexOid, tablespaceOid,
-						stmt->options | CLUOPT_RECHECK);
+						 idxtablespaceOid, stmt->options | CLUOPT_RECHECK);
 			PopActiveSnapshot();
 			CommitTransactionCommand();
 		}
@@ -305,7 +300,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
  * InvalidOid, use the tablespace in-use instead.
  */
 void
-cluster_rel(Oid tableOid, Oid indexOid, Oid tablespaceOid, int options)
+cluster_rel(Oid tableOid, Oid indexOid, Oid tablespaceOid, Oid idxtablespaceOid, int options)
 {
 	Relation	OldHeap;
 	bool		verbose = ((options & CLUOPT_VERBOSE) != 0);
@@ -485,7 +480,7 @@ cluster_rel(Oid tableOid, Oid indexOid, Oid tablespaceOid, int options)
 	TransferPredicateLocksToHeapRelation(OldHeap);
 
 	/* rebuild_relation does all the dirty work */
-	rebuild_relation(OldHeap, indexOid, tablespaceOid, verbose);
+	rebuild_relation(OldHeap, indexOid, tablespaceOid, idxtablespaceOid, verbose);
 
 	/* NB: rebuild_relation does table_close() on OldHeap */
 
@@ -635,6 +630,27 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
 	table_close(pg_index, RowExclusiveLock);
 }
 
+/* Return Oid of given tablespace */
+static Oid
+cluster_get_tablespace_oid(const char *tablespaceName)
+{
+	Oid ret;
+
+	if (tablespaceName == NULL)
+		return InvalidOid;
+
+	ret = get_tablespace_oid(tablespaceName, false);
+
+	// XXX: check this in cluster_rel ?
+	/* Can't move a non-shared relation into pg_global */
+	if (ret == GLOBALTABLESPACE_OID)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot move non-shared relation to tablespace \"%s\"",
+						tablespaceName)));
+	return ret;
+}
+
 /*
  * rebuild_relation: rebuild an existing relation in index or physical order
  *
@@ -644,10 +660,11 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
  * NB: this routine closes OldHeap at the right time; caller should not.
  */
 static void
-rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTablespaceOid, bool verbose)
+rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTablespaceOid, Oid NewIdxTablespaceOid, bool verbose)
 {
 	Oid			tableOid = RelationGetRelid(OldHeap);
 	Oid			tableSpace = OldHeap->rd_rel->reltablespace;
+	Oid			idxtableSpace;
 	Oid			OIDNewHeap;
 	char		relpersistence;
 	bool		is_system_catalog;
@@ -659,6 +676,22 @@ rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTablespaceOid, bool verb
 	if (OidIsValid(NewTablespaceOid))
 		tableSpace = NewTablespaceOid;
 
+	if (OidIsValid(NewIdxTablespaceOid))
+		idxtableSpace = NewIdxTablespaceOid;
+	else if (!OidIsValid(indexOid))
+		idxtableSpace = InvalidOid;
+	else
+	{
+		/* Look up in syscache */
+		bool		isNull;
+		HeapTuple	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indexOid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", indexOid);
+		idxtableSpace = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reltablespace,
+									 &isNull);
+		ReleaseSysCache(tuple);
+	}
+
 	/* Mark the correct index as clustered */
 	if (OidIsValid(indexOid))
 		mark_index_clustered(OldHeap, indexOid, true);
@@ -686,7 +719,7 @@ rebuild_relation(Relation OldHeap, Oid indexOid, Oid NewTablespaceOid, bool verb
 	finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
 					 swap_toast_by_content, false, true,
 					 frozenXid, cutoffMulti,
-					 relpersistence);
+					 relpersistence, idxtableSpace);
 }
 
 
@@ -1415,7 +1448,7 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 				 bool is_internal,
 				 TransactionId frozenXid,
 				 MultiXactId cutoffMulti,
-				 char newrelpersistence)
+				 char newrelpersistence, Oid idxtableSpace)
 {
 	ObjectAddress object;
 	Oid			mapped_tables[4];
@@ -1477,7 +1510,7 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 	pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
 								 PROGRESS_CLUSTER_PHASE_REBUILD_INDEX);
 
-	reindex_relation(OIDOldHeap, InvalidOid, reindex_flags, 0);
+	reindex_relation(OIDOldHeap, idxtableSpace, reindex_flags, 0);
 
 	/* Report that we are now doing clean up */
 	pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index e5a5eef102..40c57baccd 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -848,7 +848,8 @@ static void
 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
 {
 	finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
-					 RecentXmin, ReadNextMultiXactId(), relpersistence);
+					 RecentXmin, ReadNextMultiXactId(), relpersistence,
+					 InvalidOid);
 }
 
 /*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 5f6a9fe3e2..fc05c194ef 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4916,7 +4916,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 							 !OidIsValid(tab->newTableSpace),
 							 RecentXmin,
 							 ReadNextMultiXactId(),
-							 persistence);
+							 persistence, InvalidOid);
 		}
 		else
 		{
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 005fb97a7b..34f72844a9 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -90,6 +90,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
 static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
 static double compute_parallel_delay(void);
 static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
+static Oid vacuum_get_tablespace_oid(VacuumParams *params, const char *tablespacename);
 
 /*
  * Primary entry point for manual VACUUM and ANALYZE commands
@@ -110,9 +111,9 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 	bool		parallel_option = false;
 	ListCell   *lc;
 
-	/* Name and Oid of tablespace to use for relations after VACUUM FULL. */
-	char		*tablespacename = NULL;
-	Oid 		tablespaceOid = InvalidOid;
+	/* Tablespace to use for relations after VACUUM FULL. */
+	char		*tablespacename = NULL,
+			*idxtablespacename = NULL;
 
 	/* Set default value */
 	params.index_cleanup = VACOPT_TERNARY_DEFAULT;
@@ -152,6 +153,8 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 			params.truncate = get_vacopt_ternary_value(opt);
 		else if (strcmp(opt->defname, "tablespace") == 0)
 			tablespacename = defGetString(opt);
+		else if (strcmp(opt->defname, "index_tablespace") == 0)
+			idxtablespacename = defGetString(opt);
 		else if (strcmp(opt->defname, "parallel") == 0)
 		{
 			parallel_option = true;
@@ -214,25 +217,8 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 				 errmsg("cannot specify both FULL and PARALLEL options")));
 
 	/* Get tablespace Oid to use. */
-	if (tablespacename)
-	{
-		if ((params.options & VACOPT_FULL) == 0)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					errmsg("incompatible TABLESPACE option"),
-					errdetail("You can only use TABLESPACE with VACUUM FULL.")));
-
-		tablespaceOid = get_tablespace_oid(tablespacename, false);
-
-		/* Can't move a non-shared relation into pg_global */
-		if (tablespaceOid == GLOBALTABLESPACE_OID)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					errmsg("cannot move non-shared relation to tablespace \"%s\"",
-							tablespacename)));
-
-	}
-	params.tablespace_oid = tablespaceOid;
+	params.tablespace_oid = vacuum_get_tablespace_oid(&params, tablespacename);
+	params.idxtablespace_oid = vacuum_get_tablespace_oid(&params, idxtablespacename);
 
 	/*
 	 * Make sure VACOPT_ANALYZE is specified if any column lists are present.
@@ -1703,8 +1689,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 	Relation	onerel;
 	LockRelId	onerelid;
 	Oid			toast_relid,
-				save_userid,
-				tablespaceOid = InvalidOid;
+				save_userid;
 	int			save_sec_context;
 	int			save_nestlevel;
 
@@ -1852,8 +1837,6 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 						RelationGetRelationName(onerel)),
 				errdetail("Cannot move system relation, only VACUUM is performed.")));
 	}
-	else
-		tablespaceOid = params->tablespace_oid;
 
 	/*
 	 * Get a session-level lock too. This will protect our access to the
@@ -1924,7 +1907,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 			cluster_options |= CLUOPT_VERBOSE;
 
 		/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
-		cluster_rel(relid, InvalidOid, tablespaceOid, cluster_options);
+		cluster_rel(relid, InvalidOid, params->tablespace_oid,
+				params->idxtablespace_oid, cluster_options);
 	}
 	else
 		table_relation_vacuum(onerel, params, vac_strategy);
@@ -2152,3 +2136,30 @@ get_vacopt_ternary_value(DefElem *def)
 {
 	return defGetBoolean(def) ? VACOPT_TERNARY_ENABLED : VACOPT_TERNARY_DISABLED;
 }
+
+/* Get tablespace OID with vacuum-specific checks */
+static Oid
+vacuum_get_tablespace_oid(VacuumParams *params, const char *tablespacename)
+{
+	Oid	ret;
+
+	if (tablespacename == NULL)
+		return InvalidOid;
+
+	if ((params->options & VACOPT_FULL) == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("incompatible TABLESPACE option"),
+				errdetail("You can only use TABLESPACE with VACUUM FULL.")));
+
+	ret = get_tablespace_oid(tablespacename, false);
+
+	/* Can't move a non-shared relation into pg_global */
+	if (ret == GLOBALTABLESPACE_OID)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot move non-shared relation to tablespace \"%s\"",
+						tablespacename)));
+
+	return ret;
+}
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 8acc321faa..7de5797f9c 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -2898,6 +2898,7 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
 		tab->at_params.is_wraparound = wraparound;
 		tab->at_params.log_min_duration = log_min_duration;
 		tab->at_params.tablespace_oid = InvalidOid;
+		tab->at_params.idxtablespace_oid = InvalidOid;
 		tab->at_vacuum_cost_limit = vac_cost_limit;
 		tab->at_vacuum_cost_delay = vac_cost_delay;
 		tab->at_relname = NULL;
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index bc9f881d8c..515e810505 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -20,7 +20,7 @@
 
 
 extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel);
-extern void cluster_rel(Oid tableOid, Oid indexOid, Oid tablespaceOid, int options);
+extern void cluster_rel(Oid tableOid, Oid indexOid, Oid tablespaceOid, Oid indextablespaceOid, int options);
 extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
 									   bool recheck, LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
@@ -34,6 +34,7 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 bool is_internal,
 							 TransactionId frozenXid,
 							 MultiXactId minMulti,
-							 char newrelpersistence);
+							 char newrelpersistence,
+							 Oid idxtablespaceOid);
 
 #endif							/* CLUSTER_H */
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 6758e9812f..d6f8e5de23 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -229,8 +229,10 @@ typedef struct VacuumParams
 	 * disabled.
 	 */
 	int			nworkers;
-	Oid			tablespace_oid; /* tablespace Oid to use for relations
-								 * after VACUUM FULL */
+
+	/* tablespace Oids to use for relations rebuilt by VACUUM FULL */
+	Oid			tablespace_oid;
+	Oid			idxtablespace_oid;
 } VacuumParams;
 
 /* GUC parameters */
-- 
2.17.0

