You can subscribe to this list here.
2010 |
Jan
|
Feb
|
Mar
|
Apr
(4) |
May
(28) |
Jun
(12) |
Jul
(11) |
Aug
(12) |
Sep
(5) |
Oct
(19) |
Nov
(14) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(18) |
Feb
(30) |
Mar
(115) |
Apr
(89) |
May
(50) |
Jun
(44) |
Jul
(22) |
Aug
(13) |
Sep
(11) |
Oct
(30) |
Nov
(28) |
Dec
(39) |
2012 |
Jan
(38) |
Feb
(18) |
Mar
(43) |
Apr
(91) |
May
(108) |
Jun
(46) |
Jul
(37) |
Aug
(44) |
Sep
(33) |
Oct
(29) |
Nov
(36) |
Dec
(15) |
2013 |
Jan
(35) |
Feb
(611) |
Mar
(5) |
Apr
(55) |
May
(30) |
Jun
(28) |
Jul
(458) |
Aug
(34) |
Sep
(9) |
Oct
(39) |
Nov
(22) |
Dec
(32) |
2014 |
Jan
(16) |
Feb
(16) |
Mar
(42) |
Apr
(179) |
May
(7) |
Jun
(6) |
Jul
(9) |
Aug
|
Sep
(4) |
Oct
|
Nov
(3) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
(2) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
S | M | T | W | T | F | S |
---|---|---|---|---|---|---|
|
1
(1) |
2
|
3
(1) |
4
(1) |
5
|
6
|
7
|
8
|
9
|
10
|
11
|
12
|
13
|
14
(2) |
15
(4) |
16
(2) |
17
|
18
(1) |
19
|
20
|
21
|
22
|
23
(2) |
24
|
25
|
26
|
27
|
28
|
29
|
30
|
|
|
|
|
From: mason_s <ma...@us...> - 2010-11-23 23:20:03
|
Project "Postgres-XC". The branch, master has been updated via c66ed018bf6e7295c576286bba275af109b4bcb9 (commit) from 392180900570a9f3f8bd482663013b6778b323cd (commit) - Log ----------------------------------------------------------------- commit c66ed018bf6e7295c576286bba275af109b4bcb9 Author: Mason Sharp <ma...@us...> Date: Tue Nov 23 18:15:28 2010 -0500 Fixed bug where if there are subqueries in the SELECT clause like select (select col1 from mds2),col2 from mds1; we were not recognizing that this is multi-step and cannot simply be pushed down to the data nodes. We now consider this a multi-step query except if all subqueries in the SELECT clause only use replicated tables. A future optimization is to allow partitioned tables if they all evaluate to the same node. diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index d499149..83ee829 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -1266,6 +1266,35 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* Look for special conditions */ + /* Examine projection list, to handle cases like + * SELECT col1, (SELECT col2 FROM non_replicated_table...), ... + * PGXCTODO: Improve this to allow for partitioned tables + * where all subqueries and the main query use the same single node + */ + if (query->targetList) + { + foreach(item, query->targetList) + { + TargetEntry *target = (TargetEntry *) lfirst(item); + + if (examine_conditions_walker((Node*)target->expr, context)) + return true; + + if (context->query_step->exec_nodes) + { + /* + * if it is not replicated, assume it is something complicated and go + * through standard planner + */ + if (context->query_step->exec_nodes->tableusagetype != TABLE_USAGE_TYPE_USER_REPLICATED) + return true; + + pfree(context->query_step->exec_nodes); + context->query_step->exec_nodes = NULL; + } + } + } + /* Look for JOIN syntax joins */ foreach(item, query->jointree->fromlist) { ----------------------------------------------------------------------- Summary of changes: src/backend/pgxc/plan/planner.c | 29 +++++++++++++++++++++++++++++ 1 files changed, 29 insertions(+), 0 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-23 07:23:21
|
Project "Postgres-XC". The branch, master has been updated via 392180900570a9f3f8bd482663013b6778b323cd (commit) from a1f26c32e159aeff6cdf1488b1020ff8bc115386 (commit) - Log ----------------------------------------------------------------- commit 392180900570a9f3f8bd482663013b6778b323cd Author: Mason Sharp <ma...@us...> Date: Tue Nov 23 02:18:55 2010 -0500 Fix for sourceforge.net bug#3013984: Sequence scope When we drop databases, we need to make sure that we remove the sequences from GTM. By Benny Mei Le, Michael Paquier, with bug fix by Mason Sharp diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c index 64437e7..fafa0b7 100644 --- a/src/backend/access/transam/gtm.c +++ b/src/backend/access/transam/gtm.c @@ -166,12 +166,14 @@ CommitPreparedTranGTM(GlobalTransactionId gxid, GlobalTransactionId prepared_gxi int RollbackTranGTM(GlobalTransactionId gxid) { - int ret; + int ret = -1; if (!GlobalTransactionIdIsValid(gxid)) return 0; CheckConnection(); - ret = abort_transaction(conn, gxid); + + if (conn) + ret = abort_transaction(conn, gxid); /* * If something went wrong (timeout), try and reset GTM connection. @@ -376,15 +378,20 @@ SetValGTM(char *seqname, GTM_Sequence nextval, bool iscalled) } /* - * Drop the sequence + * Drop the sequence depending the key type + * + * Type of Sequence name use in key; + * GTM_SEQ_FULL_NAME, full name of sequence + * GTM_SEQ_DB_NAME, DB name part of sequence key */ int -DropSequenceGTM(char *seqname) +DropSequenceGTM(const char *name, GTM_SequenceKeyType type) { GTM_SequenceKeyData seqkey; CheckConnection(); - seqkey.gsk_keylen = strlen(seqname); - seqkey.gsk_key = seqname; + seqkey.gsk_keylen = strlen(name); + seqkey.gsk_key = name; + seqkey.gsk_type = type; return conn ? close_sequence(conn, &seqkey) : -1; } diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index dbbca98..5333cf0 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1160,7 +1160,7 @@ doDeletion(const ObjectAddress *object) relseq = relation_open(object->objectId, AccessShareLock); seqname = GetGlobalSeqName(relseq, NULL, NULL); - DropSequenceGTM(seqname); + DropSequenceGTM(seqname, GTM_SEQ_FULL_NAME); pfree(seqname); /* Then close the relation opened previously */ diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index ec1db5a..208d621 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -57,7 +57,10 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" - +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "access/gtm.h" +#endif typedef struct { @@ -875,6 +878,13 @@ dropdb(const char *dbname, bool missing_ok) * according to pg_database, which is not good. */ database_file_update_needed(); + +#ifdef PGXC + /* Drop sequences on gtm that are on the database dropped. */ + if(IS_PGXC_COORDINATOR && !IsConnFromCoord()) + if(DropSequenceGTM(dbname, GTM_SEQ_DB_NAME)) + elog(ERROR, "Deletion of sequences on database %s not completed", dbname); +#endif } diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 53ab3f3..6ff3996 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -574,7 +574,8 @@ close_sequence(GTM_Conn *conn, GTM_SequenceKey key) if (gtmpqPutMsgStart('C', true, conn) || gtmpqPutInt(MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || - gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)) + gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || + gtmpqPutnchar((char *)&key->gsk_type, sizeof(GTM_SequenceKeyType), conn)) goto send_failed; /* Finish the message. */ diff --git a/src/gtm/main/gtm_seq.c b/src/gtm/main/gtm_seq.c index 8611f40..6d946a9 100644 --- a/src/gtm/main/gtm_seq.c +++ b/src/gtm/main/gtm_seq.c @@ -37,11 +37,13 @@ static GTM_SeqInfoHashBucket GTMSequences[SEQ_HASH_TABLE_SIZE]; static uint32 seq_gethash(GTM_SequenceKey key); static bool seq_keys_equal(GTM_SequenceKey key1, GTM_SequenceKey key2); +static bool seq_key_dbname_equal(GTM_SequenceKey nsp, GTM_SequenceKey seq); static GTM_SeqInfo *seq_find_seqinfo(GTM_SequenceKey seqkey); static int seq_release_seqinfo(GTM_SeqInfo *seqinfo); static int seq_add_seqinfo(GTM_SeqInfo *seqinfo); static int seq_remove_seqinfo(GTM_SeqInfo *seqinfo); static GTM_SequenceKey seq_copy_key(GTM_SequenceKey key); +static int seq_drop_with_dbkey(GTM_SequenceKey nsp); /* * Get the hash value given the sequence key @@ -442,24 +444,143 @@ GTM_SeqRestore(GTM_SequenceKey seqkey, } return errcode; } + /* - * Destroy the given sequence + * Destroy the given sequence depending on type of given key */ int GTM_SeqClose(GTM_SequenceKey seqkey) { - GTM_SeqInfo *seqinfo = seq_find_seqinfo(seqkey); - if (seqinfo != NULL) + int res; + + switch(seqkey->gsk_type) { - seq_remove_seqinfo(seqinfo); - pfree(seqinfo->gs_key); - pfree(seqinfo); - return 0; + case GTM_SEQ_FULL_NAME: + { + GTM_SeqInfo *seqinfo = seq_find_seqinfo(seqkey); + if (seqinfo != NULL) + { + seq_remove_seqinfo(seqinfo); + pfree(seqinfo->gs_key); + pfree(seqinfo); + res = 0; + } + else + res = EINVAL; + + break; + } + case GTM_SEQ_DB_NAME: + res = seq_drop_with_dbkey(seqkey); + break; + + default: + res = EINVAL; + break; } - else - return EINVAL; + + return res; } +/* Check if sequence key contains only Database name */ +static bool +seq_key_dbname_equal(GTM_SequenceKey nsp, GTM_SequenceKey seq) +{ + Assert(nsp); + Assert(seq); + + /* + * Sequence key of GTM_SEQ_DB_NAME type has to be shorter + * than given sequence key. + */ + if(nsp->gsk_keylen >= seq->gsk_keylen) + return false; + + /* + * Check also if first part of sequence key name has a dot at the right place. + * This accelerates process instead of making numerous memcmp. + */ + if (seq->gsk_key[nsp->gsk_keylen] != '.') + return false; + + /* Then Check the strings */ + return (memcmp(nsp->gsk_key, seq->gsk_key, nsp->gsk_keylen) == 0); +} + +/* + * Remove all sequences with given key depending on its type. + */ +static int +seq_drop_with_dbkey(GTM_SequenceKey nsp) +{ + int ii = 0; + GTM_SeqInfoHashBucket *bucket; + ListCell *cell, *prev; + GTM_SeqInfo *curr_seqinfo = NULL; + int res = 0; + bool deleted; + + for(ii = 0; ii < SEQ_HASH_TABLE_SIZE; ii++) + { + bucket = >MSequences[ii]; + + GTM_RWLockAcquire(&bucket->shb_lock, GTM_LOCKMODE_READ); + + prev = NULL; + cell = list_head(bucket->shb_list); + while (cell != NULL) + { + curr_seqinfo = (GTM_SeqInfo *) lfirst(cell); + deleted = false; + + if (seq_key_dbname_equal(nsp, curr_seqinfo->gs_key)) + { + GTM_RWLockAcquire(&curr_seqinfo->gs_lock, GTM_LOCKMODE_WRITE); + + if (curr_seqinfo->gs_ref_count > 1) + { + curr_seqinfo->gs_state = SEQ_STATE_DELETED; + + /* can not happen, be checked before called */ + elog(LOG,"Sequence %s is in use, mark for deletion only", + curr_seqinfo->gs_key->gsk_key); + + /* + * Continue to delete other sequences linked to this dbname, + * sequences in use are deleted later. + */ + res = EBUSY; + } + else + { + /* Sequence is not is busy state, it can be deleted safely */ + + bucket->shb_list = list_delete_cell(bucket->shb_list, cell, prev); + elog(LOG, "Sequence %s was deleted from GTM", + curr_seqinfo->gs_key->gsk_key); + + deleted = true; + } + GTM_RWLockRelease(&curr_seqinfo->gs_lock); + } + if (deleted) + { + if (prev) + cell = lnext(prev); + else + cell = list_head(bucket->shb_list); + } + else + { + prev = cell; + cell = lnext(cell); + } + } + GTM_RWLockRelease(&bucket->shb_lock); + } + + return res; +} /* * Rename an existing sequence with a new name */ @@ -1017,6 +1138,8 @@ ProcessSequenceCloseCommand(Port *myport, StringInfo message) seqkey.gsk_keylen = pq_getmsgint(message, sizeof (seqkey.gsk_keylen)); seqkey.gsk_key = (char *)pq_getmsgbytes(message, seqkey.gsk_keylen); + memcpy(&seqkey.gsk_type, pq_getmsgbytes(message, sizeof (GTM_SequenceKeyType)), + sizeof (GTM_SequenceKeyType)); if ((errcode = GTM_SeqClose(&seqkey))) ereport(ERROR, diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h index 3fc4cfe..6687474 100644 --- a/src/include/access/gtm.h +++ b/src/include/access/gtm.h @@ -53,6 +53,6 @@ extern int CreateSequenceGTM(char *seqname, GTM_Sequence increment, extern int AlterSequenceGTM(char *seqname, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart); -extern int DropSequenceGTM(char *seqname); +extern int DropSequenceGTM(const char *name, GTM_SequenceKeyType type); extern int RenameSequenceGTM(char *seqname, const char *newseqname); #endif /* ACCESS_GTM_H */ diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h index da15df3..e8b9984 100644 --- a/src/include/gtm/gtm_c.h +++ b/src/include/gtm/gtm_c.h @@ -63,13 +63,23 @@ typedef int32 GTM_TransactionHandle; typedef int64 GTM_Timestamp; /* timestamp data is 64-bit based */ typedef int64 GTM_Sequence; /* a 64-bit sequence */ -typedef struct GTM_SequenceKeyData + +/* Type of sequence name used when dropping it */ +typedef enum GTM_SequenceKeyType +{ + GTM_SEQ_FULL_NAME, /* Full sequence key */ + GTM_SEQ_DB_NAME /* DB name part of sequence key */ +} GTM_SequenceKeyType; + +typedef struct GTM_SequenceKeyData { uint32 gsk_keylen; char *gsk_key; -} GTM_SequenceKeyData; /* Counter key, set by the client */ + GTM_SequenceKeyType gsk_type; /* see constants below */ +} GTM_SequenceKeyData; /* Counter key, set by the client */ typedef GTM_SequenceKeyData *GTM_SequenceKey; + #define GTM_MAX_SEQKEY_LENGTH 1024 #define InvalidSequenceValue 0x7fffffffffffffffLL ----------------------------------------------------------------------- Summary of changes: src/backend/access/transam/gtm.c | 19 ++++-- src/backend/catalog/dependency.c | 2 +- src/backend/commands/dbcommands.c | 12 +++- src/gtm/client/gtm_client.c | 3 +- src/gtm/main/gtm_seq.c | 141 ++++++++++++++++++++++++++++++++++--- src/include/access/gtm.h | 2 +- src/include/gtm/gtm_c.h | 14 +++- 7 files changed, 172 insertions(+), 21 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-18 08:14:55
|
Project "Postgres-XC". The branch, master has been updated via a1f26c32e159aeff6cdf1488b1020ff8bc115386 (commit) from afd3cfb8c6670fd2aed987d55fcf44062f4c7b9f (commit) - Log ----------------------------------------------------------------- commit a1f26c32e159aeff6cdf1488b1020ff8bc115386 Author: Mason Sharp <ma...@us...> Date: Thu Nov 18 17:06:08 2010 +0900 Minor cursor changes: - small optimization for read-only cursors (use simple query protocol) - meaningful error message if one tries to update replicated table through cursor declared without FOR UPDATE - the possible deadlock issue for replicated tables (lock for update row on the primary node first). By Andrei Martsinchyk diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index b87b07b..e1576fc 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1723,7 +1723,7 @@ CopyTo(CopyState cstate) if (IS_PGXC_COORDINATOR && cstate->rel_loc) { cstate->processed = DataNodeCopyOut( - GetRelationNodes(cstate->rel_loc, NULL, true), + GetRelationNodes(cstate->rel_loc, NULL, RELATION_ACCESS_READ), cstate->connections, cstate->copy_file); } @@ -2439,7 +2439,8 @@ CopyFrom(CopyState cstate) if (DataNodeCopyIn(cstate->line_buf.data, cstate->line_buf.len, - GetRelationNodes(cstate->rel_loc, (long *)hash_value, false), + GetRelationNodes(cstate->rel_loc, (long *)hash_value, + RELATION_ACCESS_WRITE), cstate->connections)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 845f9fc..4c65f49 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -43,7 +43,7 @@ #include "catalog/namespace.h" -/* +/* * PGXCTODO For prototype, relations use the same hash mapping table. * Long term, make it a pointer in RelationLocInfo, and have * similarly handled tables point to the same mapping table, @@ -265,7 +265,7 @@ GetRoundRobinNode(Oid relid) /* * GetRelationNodes * - * Get list of relation nodes + * Get list of relation nodes * If the table is replicated and we are reading, we can just pick one. * If the table is partitioned, we apply partitioning column value, if possible. * @@ -281,7 +281,8 @@ GetRoundRobinNode(Oid relid) * The returned List is a copy, so it should be freed when finished. */ ExecNodes * -GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, int isRead) +GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, + RelationAccessType accessType) { ListCell *prefItem; ListCell *stepItem; @@ -293,21 +294,21 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, int isRead) exec_nodes = makeNode(ExecNodes); exec_nodes->baselocatortype = rel_loc_info->locatorType; - + switch (rel_loc_info->locatorType) { case LOCATOR_TYPE_REPLICATED: - if (!isRead) + if (accessType == RELATION_ACCESS_WRITE) { /* we need to write to all synchronously */ exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); - /* - * Write to primary node first, to reduce chance of a deadlock - * on replicated tables. If 0, do not use primary copy. + /* + * Write to primary node first, to reduce chance of a deadlock + * on replicated tables. If 0, do not use primary copy. */ - if (primary_data_node && exec_nodes->nodelist + if (primary_data_node && exec_nodes->nodelist && list_length(exec_nodes->nodelist) > 1) /* make sure more than 1 */ { exec_nodes->primarynodelist = lappend_int(NULL, primary_data_node); @@ -316,7 +317,17 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, int isRead) } else { - if (globalPreferredNodes != NULL) + if (accessType == RELATION_ACCESS_READ_FOR_UPDATE + && primary_data_node) + { + /* + * We should ensure row is locked on the primary node to + * avoid distributed deadlock if updating the same row + * concurrently + */ + exec_nodes->nodelist = lappend_int(NULL, primary_data_node); + } + else if (globalPreferredNodes != NULL) { /* try and pick from the preferred list */ foreach(prefItem, globalPreferredNodes) @@ -365,15 +376,15 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, int isRead) case LOCATOR_TYPE_RROBIN: /* round robin, get next one */ - if (isRead) + if (accessType == RELATION_ACCESS_WRITE) { - /* we need to read from all */ - exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); + /* write to just one of them */ + exec_nodes->nodelist = lappend_int(NULL, GetRoundRobinNode(rel_loc_info->relid)); } else { - /* write to just one of them */ - exec_nodes->nodelist = lappend_int(NULL, GetRoundRobinNode(rel_loc_info->relid)); + /* we need to read from all */ + exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); } break; @@ -608,7 +619,7 @@ GetRelationLocInfo(Oid relid) return ret_loc_info; } -/* +/* * Copy the RelationLocInfo struct */ RelationLocInfo * diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 94f851a..d499149 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -125,7 +125,7 @@ typedef struct ColumnBase typedef struct XCWalkerContext { Query *query; - bool isRead; + RelationAccessType accessType; RemoteQuery *query_step; /* remote query step being analized */ Special_Conditions *conditions; bool multilevel_join; @@ -144,7 +144,7 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -static void get_plan_nodes(Query *query, RemoteQuery *step, bool isRead); +static void get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); @@ -524,7 +524,8 @@ get_plan_nodes_insert(Query *query) } /* single call handles both replicated and partitioned types */ - exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, false); + exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, + RELATION_ACCESS_WRITE); if (eval_expr) pfree(eval_expr); @@ -648,12 +649,9 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) int natts = get_relnatts(table->relid); char *attnames[natts]; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - /* - * ctid is the last attribute, but more correct to iterate over - * attributes and find by name, or store index for table - */ - Datum ctid = slot->tts_values[slot->tts_tupleDescriptor->natts - 1]; - char *ctid_str = (char *) DirectFunctionCall1(tidout, ctid); + TupleDesc slot_meta = slot->tts_tupleDescriptor; + Datum ctid = 0; + char *ctid_str = NULL; int nodenum = slot->tts_dataNode; AttrNumber att; StringInfoData buf; @@ -661,6 +659,30 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) int i; MemoryContext context_save; + /* + * Iterate over attributes and find CTID. This attribute is + * most likely at the end of the list, so iterate in + * reverse order to find it quickly. + * If not found, target table is not updatable through + * the cursor, report problem to client + */ + for (i = slot_meta->natts - 1; i >= 0; i--) + { + Form_pg_attribute attr = slot_meta->attrs[i]; + if (strcmp(attr->attname.data, "ctid") == 0) + { + ctid = slot->tts_values[i]; + ctid_str = (char *) DirectFunctionCall1(tidout, ctid); + break; + } + } + + if (ctid_str == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_STATE), + errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"", + cexpr->cursor_name, tableName))); + initStringInfo(&buf); /* Step 1: select tuple values by ctid */ @@ -1448,12 +1470,16 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (!rel_loc_info) return true; - context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, + NULL, + context->accessType); } } else { - context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, + NULL, + context->accessType); } /* Note replicated table usage for determining safe queries */ @@ -1479,7 +1505,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) Literal_Comparison *lit_comp = (Literal_Comparison *) lfirst(lc); test_exec_nodes = GetRelationNodes( - lit_comp->rel_loc_info, &(lit_comp->constant), true); + lit_comp->rel_loc_info, &(lit_comp->constant), + RELATION_ACCESS_READ); test_exec_nodes->tableusagetype = table_usage_type; if (context->query_step->exec_nodes == NULL) @@ -1505,7 +1532,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) parent_child = (Parent_Child_Join *) linitial(context->conditions->partitioned_parent_child); - context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, + NULL, + context->accessType); context->query_step->exec_nodes->tableusagetype = table_usage_type; } @@ -1545,7 +1574,7 @@ static void InitXCWalkerContext(XCWalkerContext *context) { context->query = NULL; - context->isRead = true; + context->accessType = RELATION_ACCESS_READ; context->query_step = NULL; context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); context->rtables = NIL; @@ -1562,14 +1591,14 @@ InitXCWalkerContext(XCWalkerContext *context) * */ static void -get_plan_nodes(Query *query, RemoteQuery *step, bool isRead) +get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType) { XCWalkerContext context; InitXCWalkerContext(&context); context.query = query; - context.isRead = isRead; + context.accessType = accessType; context.query_step = step; context.rtables = lappend(context.rtables, query->rtable); @@ -1594,7 +1623,9 @@ get_plan_nodes_command(Query *query, RemoteQuery *step) switch (query->commandType) { case CMD_SELECT: - get_plan_nodes(query, step, true); + get_plan_nodes(query, step, query->rowMarks ? + RELATION_ACCESS_READ_FOR_UPDATE : + RELATION_ACCESS_READ); break; case CMD_INSERT: @@ -1604,7 +1635,7 @@ get_plan_nodes_command(Query *query, RemoteQuery *step) case CMD_UPDATE: case CMD_DELETE: /* treat as a select */ - get_plan_nodes(query, step, false); + get_plan_nodes(query, step, RELATION_ACCESS_WRITE); break; default: @@ -2470,8 +2501,8 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) case CMD_UPDATE: case CMD_DELETE: /* PGXCTODO: This validation will not be removed - * until we support moving tuples from one node to another - * when the partition column of a table is updated + * until we support moving tuples from one node to another + * when the partition column of a table is updated */ if (query->commandType == CMD_UPDATE) validate_part_col_updatable(query); @@ -2629,9 +2660,12 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) /* * Support for multi-step cursor. * Ensure uniqueness of remote cursor name + * Small optimization for SCROLL (read-only) cursors: do not use Extended + * Query protocol */ if (query->utilityStmt && - IsA(query->utilityStmt, DeclareCursorStmt)) + IsA(query->utilityStmt, DeclareCursorStmt) && + (cursorOptions & CURSOR_OPT_SCROLL) == 0) { DeclareCursorStmt *stmt = (DeclareCursorStmt *) query->utilityStmt; set_cursor_name(result->planTree, stmt->portalname, 0); @@ -2744,7 +2778,7 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, *partitioned_replicated = false; InitXCWalkerContext(&context); - context.isRead = true; /* PGXCTODO - determine */ + context.accessType = RELATION_ACCESS_READ; /* PGXCTODO - determine */ context.rtables = NIL; context.rtables = lappend(context.rtables, rtable_list); /* add to list of lists */ @@ -2825,9 +2859,9 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, } /* - * validate whether partition column of a table is being updated + * validate whether partition column of a table is being updated */ -static void +static void validate_part_col_updatable(const Query *query) { RangeTblEntry *rte; @@ -2853,7 +2887,7 @@ validate_part_col_updatable(const Query *query) (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Could not find relation for oid = %d", rte->relid)))); - + /* Only LOCATOR_TYPE_HASH should be checked */ if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH && rel_loc_info->partAttrName != NULL) diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index afeff56..b01606f 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -68,6 +68,15 @@ typedef struct TableUsageType tableusagetype; /* track pg_catalog usage */ } ExecNodes; +/* + * How relation is accessed in the query + */ +typedef enum +{ + RELATION_ACCESS_READ, + RELATION_ACCESS_READ_FOR_UPDATE, + RELATION_ACCESS_WRITE +} RelationAccessType; extern char *PreferredDataNodes; @@ -79,7 +88,7 @@ extern char *GetRelationHashColumn(RelationLocInfo *rel_loc_info); extern RelationLocInfo *GetRelationLocInfo(Oid relid); extern RelationLocInfo *CopyRelationLocInfo(RelationLocInfo *src_info); extern ExecNodes *GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, - int isRead); + RelationAccessType accessType); extern bool IsHashColumn(RelationLocInfo *rel_loc_info, char *part_col_name); extern bool IsHashColumnForRelId(Oid relid, char *part_col_name); extern int GetRoundRobinNode(Oid relid); ----------------------------------------------------------------------- Summary of changes: src/backend/commands/copy.c | 5 +- src/backend/pgxc/locator/locator.c | 43 +++++++++++------- src/backend/pgxc/plan/planner.c | 84 +++++++++++++++++++++++++----------- src/include/pgxc/locator.h | 11 ++++- 4 files changed, 99 insertions(+), 44 deletions(-) hooks/post-receive -- Postgres-XC |
From: Michael P. <mic...@us...> - 2010-11-16 04:23:13
|
Project "Postgres-XC". The branch, master has been updated via afd3cfb8c6670fd2aed987d55fcf44062f4c7b9f (commit) from 54a648a3ffe7f8fc0273295ee17e91cebcb34948 (commit) - Log ----------------------------------------------------------------- commit afd3cfb8c6670fd2aed987d55fcf44062f4c7b9f Author: Michael P <mic...@us...> Date: Tue Nov 16 13:22:44 2010 +0900 Block Node Commit on a Datanode. When COMMIT is issued on Datanodes, block the call PGXCNodeCommit in execRemote.c. This operation is only allowed on Coordinator diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 22ccd16..7465847 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1801,7 +1801,8 @@ CommitTransaction(void) * There can be error on the data nodes. So go to data nodes before * changing transaction state and local clean up */ - PGXCNodeCommit(); + if (IS_PGXC_COORDINATOR) + PGXCNodeCommit(); #endif /* Prevent cancel/die interrupt while cleaning up */ ----------------------------------------------------------------------- Summary of changes: src/backend/access/transam/xact.c | 3 ++- 1 files changed, 2 insertions(+), 1 deletions(-) hooks/post-receive -- Postgres-XC |
From: Michael P. <mic...@us...> - 2010-11-16 00:53:05
|
Project "Postgres-XC". The branch, master has been updated via 54a648a3ffe7f8fc0273295ee17e91cebcb34948 (commit) from 195026c49901d9526d217f4f826fe4de1e751b48 (commit) - Log ----------------------------------------------------------------- commit 54a648a3ffe7f8fc0273295ee17e91cebcb34948 Author: Michael P <mic...@us...> Date: Tue Nov 16 09:48:51 2010 +0900 Support for CLEAN CONNECTION Utility to clean up Postgres-XC Pooler connections. This utility is launched to all the Coordinators of the cluster Use of CLEAN CONNECTION is limited to a super user. It is advised to clean connections before dropping a Database. SQL query synopsis is as follows: CLEAN CONNECTION TO (COORDINATOR num | DATANODE num | ALL {FORCE}) FOR DATABASE dbname Connection cleaning has to be made on a chosen database called dbname. It is also possible to clean connections of several Coordinators or Datanodes Ex: CLEAN CONNECTION TO DATANODE 1,5,7 FOR DATABASE template1 CLEAN CONNECTION TO COORDINATOR 2,4,6 FOR DATABASE template1 Or even to all Coordinators/Datanodes at the same time Ex: CLEAN CONNECTION TO DATANODE * FOR DATABASE template1 CLEAN CONNECTION TO COORDINATOR * FOR DATABASE template1 When FORCE is used, all the transactions using pooler connections are aborted, and pooler connections are cleaned up. Ex: CLEAN CONNECTION TO ALL FORCE FOR DATABASE template1; FORCE can only be used with TO ALL, as it takes a lock on pooler to stop requests asking for connections, aborts all the connections in the cluster, and cleans up pool connections diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 9228a79..0c76d7a 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -191,7 +191,7 @@ static TypeName *TableFuncTypeName(List *columns); AlterForeignServerStmt AlterGroupStmt AlterObjectSchemaStmt AlterOwnerStmt AlterSeqStmt AlterTableStmt AlterUserStmt AlterUserMappingStmt AlterUserSetStmt AlterRoleStmt AlterRoleSetStmt - AnalyzeStmt ClosePortalStmt ClusterStmt CommentStmt + AnalyzeStmt CleanConnStmt ClosePortalStmt ClusterStmt CommentStmt ConstraintsSetStmt CopyStmt CreateAsStmt CreateCastStmt CreateDomainStmt CreateGroupStmt CreateOpClassStmt CreateOpFamilyStmt AlterOpFamilyStmt CreatePLangStmt @@ -328,7 +328,7 @@ static TypeName *TableFuncTypeName(List *columns); %type <boolean> opt_freeze opt_default opt_recheck %type <defelt> opt_binary opt_oids copy_delimiter -%type <list> node_list +%type <list> data_node_list coord_list %type <str> DirectStmt %type <boolean> copy_from @@ -445,7 +445,7 @@ static TypeName *TableFuncTypeName(List *columns); BOOLEAN_P BOTH BY CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P - CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE + CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLEAN CLOSE CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT COMMITTED CONCURRENTLY CONFIGURATION CONNECTION CONSTRAINT CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COORDINATOR COPY COST CREATE CREATEDB @@ -637,6 +637,7 @@ stmt : | AlterUserStmt | AnalyzeStmt | CheckPointStmt + | CleanConnStmt | ClosePortalStmt | ClusterStmt | CommentStmt @@ -6545,7 +6546,7 @@ ExecDirectStmt: EXECUTE DIRECT ON COORDINATOR DirectStmt n->query = $5; $$ = (Node *)n; } - | EXECUTE DIRECT ON NODE node_list DirectStmt + | EXECUTE DIRECT ON NODE data_node_list DirectStmt { ExecDirectStmt *n = makeNode(ExecDirectStmt); n->coordinator = FALSE; @@ -6559,9 +6560,21 @@ DirectStmt: Sconst /* by default all are $$=$1 */ ; -node_list: +coord_list: Iconst { $$ = list_make1(makeInteger($1)); } - | node_list ',' Iconst { $$ = lappend($1, makeInteger($3)); } + | coord_list ',' Iconst { $$ = lappend($1, makeInteger($3)); } + | '*' + { + int i; + $$ = NIL; + for (i=1; i<=NumCoords; i++) + $$ = lappend($$, makeInteger(i)); + } + ; + +data_node_list: + Iconst { $$ = list_make1(makeInteger($1)); } + | data_node_list ',' Iconst { $$ = lappend($1, makeInteger($3)); } | '*' { int i; @@ -6574,6 +6587,44 @@ node_list: /***************************************************************************** * * QUERY: + * + * CLEAN CONNECTION TO (COORDINATOR num | NODE num | ALL {FORCE}) + * FOR DATABASE dbname + * + *****************************************************************************/ + +CleanConnStmt: CLEAN CONNECTION TO COORDINATOR coord_list FOR DATABASE database_name + { + CleanConnStmt *n = makeNode(CleanConnStmt); + n->is_coord = true; + n->nodes = $5; + n->is_force = false; + n->dbname = $8; + $$ = (Node *)n; + } + | CLEAN CONNECTION TO NODE data_node_list FOR DATABASE database_name + { + CleanConnStmt *n = makeNode(CleanConnStmt); + n->is_coord = false; + n->nodes = $5; + n->is_force = false; + n->dbname = $8; + $$ = (Node *)n; + } + | CLEAN CONNECTION TO ALL opt_force FOR DATABASE database_name + { + CleanConnStmt *n = makeNode(CleanConnStmt); + n->is_coord = true; + n->nodes = NIL; + n->is_force = $5; + n->dbname = $8; + $$ = (Node *)n; + } + ; + +/***************************************************************************** + * + * QUERY: * PREPARE <plan_name> [(args, ...)] AS <query> * *****************************************************************************/ @@ -10259,6 +10310,7 @@ unreserved_keyword: | CHARACTERISTICS | CHECKPOINT | CLASS + | CLEAN | CLOSE | CLUSTER | COMMENT diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 098e254..845f9fc 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -466,8 +466,8 @@ GetAllDataNodes(void) /* * Return a list of all Coordinators - * This is used to send DDL to all nodes - * Do not put in the list the local Coordinator where this function is launched + * This is used to send DDL to all nodes and to clean up pooler connections. + * Do not put in the list the local Coordinator where this function is launched. */ List * GetAllCoordNodes(void) diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile index f8679eb..8c2b66a 100644 --- a/src/backend/pgxc/pool/Makefile +++ b/src/backend/pgxc/pool/Makefile @@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = pgxcnode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o +OBJS = pgxcnode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o poolutils.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 3bc3a83..cbaf68c 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -268,7 +268,7 @@ pgxc_node_receive(const int conn_count, FD_ZERO(&readfds); for (i = 0; i < conn_count; i++) { - /* If connection finised sending do not wait input from it */ + /* If connection finished sending do not wait input from it */ if (connections[i]->state == DN_CONNECTION_STATE_IDLE || HAS_MESSAGE_BUFFERED(connections[i])) continue; diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 853b385..bdb6cb1 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -480,10 +480,12 @@ pool_putmessage(PoolPort *port, char msgtype, const char *s, size_t len) /* message code('f'), size(8), node_count */ #define SEND_MSG_BUFFER_SIZE 9 - +/* message code('s'), result */ +#define SEND_RES_BUFFER_SIZE 5 +#define SEND_PID_BUFFER_SIZE (5 + (MaxConnections - 1) * 4) /* - * Build up a message carrying file deskriptors and send them over specified + * Build up a message carrying file descriptors or process numbers and send them over specified * connection */ int @@ -639,3 +641,178 @@ failure: free(cmptr); return EOF; } + +/* + * Send result to specified connection + */ +int +pool_sendres(PoolPort *port, int res) +{ + char buf[SEND_RES_BUFFER_SIZE]; + uint n32; + + /* Header */ + buf[0] = 's'; + /* Result */ + n32 = htonl(res); + memcpy(buf + 1, &n32, 4); + + if (send(Socket(*port), &buf, SEND_RES_BUFFER_SIZE, 0) != SEND_RES_BUFFER_SIZE) + return EOF; + + return 0; +} + +/* + * Read result from specified connection. + * Return 0 at success or EOF at error. + */ +int +pool_recvres(PoolPort *port) +{ + int r; + int res = 0; + uint n32; + char buf[SEND_RES_BUFFER_SIZE]; + + r = recv(Socket(*port), &buf, SEND_RES_BUFFER_SIZE, 0); + if (r < 0) + { + /* + * Report broken connection + */ + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("could not receive data from client: %m"))); + goto failure; + } + else if (r == 0) + { + goto failure; + } + else if (r != SEND_RES_BUFFER_SIZE) + { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("incomplete message from client"))); + goto failure; + } + + /* Verify response */ + if (buf[0] != 's') + { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message code"))); + goto failure; + } + + memcpy(&n32, buf + 1, 4); + n32 = ntohl(n32); + if (n32 != 0) + return EOF; + + return res; + +failure: + return EOF; +} + +/* + * Read a message from the specified connection carrying pid numbers + * of transactions interacting with pooler + */ +int +pool_recvpids(PoolPort *port, int **pids) +{ + int r, i; + uint n32; + char buf[SEND_PID_BUFFER_SIZE]; + + /* + * Buffer size is upper bounded by the maximum number of connections, + * as in the pooler each connection has one Pooler Agent. + */ + + r = recv(Socket(*port), &buf, SEND_PID_BUFFER_SIZE, 0); + if (r < 0) + { + /* + * Report broken connection + */ + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("could not receive data from client: %m"))); + goto failure; + } + else if (r == 0) + { + goto failure; + } + else if (r != SEND_PID_BUFFER_SIZE) + { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("incomplete message from client"))); + goto failure; + } + + /* Verify response */ + if (buf[0] != 'p') + { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message code"))); + goto failure; + } + + memcpy(&n32, buf + 1, 4); + n32 = ntohl(n32); + if (n32 == 0) + { + elog(WARNING, "No transaction to abort"); + return n32; + } + + *pids = (int *) palloc(sizeof(int) * n32); + + for (i = 0; i < n32; i++) + { + int n; + memcpy(&n, buf + 5 + i * sizeof(int), sizeof(int)); + *pids[i] = ntohl(n); + } + return n32; + +failure: + return 0; +} + +/* + * Send a message containing pid numbers to the specified connection + */ +int +pool_sendpids(PoolPort *port, int *pids, int count) +{ + int res = 0; + int i; + char buf[SEND_PID_BUFFER_SIZE]; + uint n32; + + buf[0] = 'p'; + n32 = htonl((uint32) count); + memcpy(buf + 1, &n32, 4); + for (i = 0; i < count; i++) + { + int n; + n = htonl((uint32) pids[i]); + memcpy(buf + 5 + i * sizeof(int), &n, 4); + } + + if (send(Socket(*port), &buf, SEND_PID_BUFFER_SIZE,0) != SEND_PID_BUFFER_SIZE) + { + res = EOF; + } + + return res; +} diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 7ebced8..264271b 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -45,6 +45,7 @@ #include "libpq/pqformat.h" #include "pgxc/locator.h" #include "pgxc/pgxc.h" +#include "pgxc/poolutils.h" #include "../interfaces/libpq/libpq-fe.h" #include "postmaster/postmaster.h" /* For UnixSocketDir */ #include <stdlib.h> @@ -89,6 +90,7 @@ static PoolAgent **poolAgents; static PoolHandle *Handle = NULL; +static int is_pool_cleaning = false; static int server_fd = -1; static void agent_init(PoolAgent *agent, const char *database); @@ -109,6 +111,8 @@ static void destroy_slot(PGXCNodePoolSlot *slot); static void grow_pool(DatabasePool *dbPool, int index, char client_conn_type); static void destroy_node_pool(PGXCNodePool *node_pool); static void PoolerLoop(void); +static int clean_connection(List *dn_discard, List *co_discard, const char *database); +static int *abort_pids(int *count, int pid, const char *database); /* Signal handlers */ static void pooler_die(SIGNAL_ARGS); @@ -631,6 +635,7 @@ agent_create(void) agent->pool = NULL; agent->dn_connections = NULL; agent->coord_connections = NULL; + agent->pid = 0; /* Append new agent to the list */ poolAgents[agentCount++] = agent; @@ -644,14 +649,32 @@ agent_create(void) void PoolManagerConnect(PoolHandle *handle, const char *database) { + int n32; + char msgtype = 'c'; + Assert(handle); Assert(database); /* Save the handle */ Handle = handle; + /* Message type */ + pool_putbytes(&handle->port, &msgtype, 1); + + /* Message length */ + n32 = htonl(strlen(database) + 13); + pool_putbytes(&handle->port, (char *) &n32, 4); + + /* PID number */ + n32 = htonl(MyProcPid); + pool_putbytes(&handle->port, (char *) &n32, 4); + + /* Length of Database string */ + n32 = htonl(strlen(database) + 1); + pool_putbytes(&handle->port, (char *) &n32, 4); + /* Send database name followed by \0 terminator */ - pool_putmessage(&handle->port, 'c', database, strlen(database) + 1); + pool_putbytes(&handle->port, database, strlen(database) + 1); pool_flush(&handle->port); } @@ -779,6 +802,7 @@ PoolManagerGetConnections(List *datanodelist, List *coordlist) pool_putmessage(&Handle->port, 'g', (char *) nodes, sizeof(int) * (totlen + 2)); pool_flush(&Handle->port); + /* Receive response */ fds = (int *) palloc(sizeof(int) * totlen); if (fds == NULL) @@ -795,6 +819,84 @@ PoolManagerGetConnections(List *datanodelist, List *coordlist) return fds; } +/* + * Abort active transactions using pooler. + * Take a lock forbidding access to Pooler for new transactions. + */ +int +PoolManagerAbortTransactions(char *dbname, int **proc_pids) +{ + int num_proc_ids = 0; + + Assert(Handle); + + pool_putmessage(&Handle->port, 'a', dbname, strlen(dbname) + 1); + + pool_flush(&Handle->port); + + /* Then Get back Pids from Pooler */ + num_proc_ids = pool_recvpids(&Handle->port, proc_pids); + + return num_proc_ids; +} + + +/* + * Clean up Pooled connections + */ +void +PoolManagerCleanConnection(List *datanodelist, List *coordlist, char *dbname) +{ + int totlen = list_length(datanodelist) + list_length(coordlist); + int nodes[totlen + 2]; + ListCell *nodelist_item; + int i, n32; + char msgtype = 'f'; + + nodes[0] = htonl(list_length(datanodelist)); + i = 1; + if (list_length(datanodelist) != 0) + { + foreach(nodelist_item, datanodelist) + { + nodes[i++] = htonl(lfirst_int(nodelist_item)); + } + } + /* Then with Coordinator list (can be nul) */ + nodes[i++] = htonl(list_length(coordlist)); + if (list_length(coordlist) != 0) + { + foreach(nodelist_item, coordlist) + { + nodes[i++] = htonl(lfirst_int(nodelist_item)); + } + } + + /* Message type */ + pool_putbytes(&Handle->port, &msgtype, 1); + + /* Message length */ + n32 = htonl(sizeof(int) * (totlen + 2) + strlen(dbname) + 9); + pool_putbytes(&Handle->port, (char *) &n32, 4); + + /* Send list of nodes */ + pool_putbytes(&Handle->port, (char *) nodes, sizeof(int) * (totlen + 2)); + + /* Length of Database string */ + n32 = htonl(strlen(dbname) + 1); + pool_putbytes(&Handle->port, (char *) &n32, 4); + + /* Send database name, followed by \0 terminator */ + pool_putbytes(&Handle->port, dbname, strlen(dbname) + 1); + pool_flush(&Handle->port); + + /* Receive result message */ + if (pool_recvres(&Handle->port) != CLEAN_CONNECTION_COMPLETED) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Clean connections not completed"))); +} + /* * Handle messages to agent @@ -816,14 +918,39 @@ agent_handle_input(PoolAgent * agent, StringInfo s) List *datanodelist = NIL; List *coordlist = NIL; int *fds; - int i; + int *pids; + int i, len, res; + + /* + * During a pool cleaning, Abort, Connect and Get Connections messages + * are not allowed on pooler side. + * It avoids to have new backends taking connections + * while remaining transactions are aborted during FORCE and then + * Pools are being shrinked. + */ + if (is_pool_cleaning && (qtype == 'a' || + qtype == 'c' || + qtype == 'g')) + elog(WARNING,"Pool operation cannot run during Pool cleaning"); switch (qtype) { - case 'c': /* CONNECT */ + case 'a': /* ABORT */ pool_getmessage(&agent->port, s, 0); database = pq_getmsgstring(s); - /* + pq_getmsgend(s); + pids = abort_pids(&len, agent->pid, database); + + pool_sendpids(&agent->port, pids, len); + if (pids) + pfree(pids); + break; + case 'c': /* CONNECT */ + pool_getmessage(&agent->port, s, 0); + agent->pid = pq_getmsgint(s, 4); + len = pq_getmsgint(s, 4); + database = pq_getmsgbytes(s, len); + /* * Coordinator pool is not initialized. * With that it would be impossible to create a Database by default. */ @@ -835,6 +962,29 @@ agent_handle_input(PoolAgent * agent, StringInfo s) agent_destroy(agent); pq_getmsgend(s); break; + case 'f': /* CLEAN CONNECTION */ + pool_getmessage(&agent->port, s, 0); + datanodecount = pq_getmsgint(s, 4); + /* It is possible to clean up only Coordinators connections */ + for (i = 0; i < datanodecount; i++) + datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4)); + coordcount = pq_getmsgint(s, 4); + /* It is possible to clean up only Datanode connections */ + for (i = 0; i < coordcount; i++) + coordlist = lappend_int(coordlist, pq_getmsgint(s, 4)); + len = pq_getmsgint(s, 4); + database = pq_getmsgbytes(s, len); + pq_getmsgend(s); + + /* Clean up connections here */ + res = clean_connection(datanodelist, coordlist, database); + + list_free(datanodelist); + list_free(coordlist); + + /* Send success result */ + pool_sendres(&agent->port, res); + break; case 'g': /* GET CONNECTIONS */ /* * Length of message is caused by: @@ -852,9 +1002,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s) datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4)); coordcount = pq_getmsgint(s, 4); /* It is possible that no Coordinators are involved in the transaction */ - if (coordcount != 0) - for (i = 0; i < coordcount; i++) - coordlist = lappend_int(coordlist, pq_getmsgint(s, 4)); + for (i = 0; i < coordcount; i++) + coordlist = lappend_int(coordlist, pq_getmsgint(s, 4)); pq_getmsgend(s); /* * In case of error agent_acquire_connections will log @@ -1282,8 +1431,7 @@ insert_database_pool(DatabasePool *databasePool) /* * Find pool for specified database in the list */ -static DatabasePool -* +static DatabasePool * find_database_pool(const char *database) { DatabasePool *databasePool; @@ -1292,12 +1440,11 @@ find_database_pool(const char *database) databasePool = databasePools; while (databasePool) { - /* if match break the loop and return */ if (strcmp(database, databasePool->database) == 0) break; - databasePool = databasePool->next; + databasePool = databasePool->next; } return databasePool; } @@ -1724,6 +1871,147 @@ PoolerLoop(void) } } +/* + * Clean Connection in all Database Pools for given Datanode and Coordinator list + */ +#define TIMEOUT_CLEAN_LOOP 10 + +int +clean_connection(List *dn_discard, List *co_discard, const char *database) +{ + DatabasePool *databasePool; + int dn_len = list_length(dn_discard); + int co_len = list_length(co_discard); + int dn_list[list_length(dn_discard)]; + int co_list[list_length(co_discard)]; + int count, i; + int res = CLEAN_CONNECTION_COMPLETED; + ListCell *nodelist_item; + PGXCNodePool *nodePool; + + /* Save in array the lists of node number */ + count = 0; + foreach(nodelist_item,dn_discard) + dn_list[count++] = lfirst_int(nodelist_item); + + count = 0; + foreach(nodelist_item, co_discard) + co_list[count++] = lfirst_int(nodelist_item); + + /* Find correct Database pool to clean */ + databasePool = find_database_pool(database); + + /* Database pool has not been found */ + if (!databasePool) + return CLEAN_CONNECTION_NOT_COMPLETED; + + /* + * Clean each Pool Correctly + * First for Datanode Pool + */ + for (count = 0; count < dn_len; count++) + { + int node_num = dn_list[count]; + nodePool = databasePool->dataNodePools[node_num - 1]; + + if (nodePool) + { + /* Check if connections are in use */ + if (nodePool->freeSize != nodePool->size) + { + elog(WARNING, "Pool of Database %s is using Datanode %d connections", + databasePool->database, node_num); + res = CLEAN_CONNECTION_NOT_COMPLETED; + } + + /* Destroy connections currently in Node Pool */ + if (nodePool->slot) + { + for (i = 0; i < nodePool->freeSize; i++) + destroy_slot(nodePool->slot[i]); + + /* Move slots in use at the beginning of Node Pool array */ + for (i = nodePool->freeSize; i < nodePool->size; i++ ) + nodePool->slot[i - nodePool->freeSize] = nodePool->slot[i]; + } + nodePool->size -= nodePool->freeSize; + nodePool->freeSize = 0; + } + } + + /* Then for Coordinators */ + for (count = 0; count < co_len; count++) + { + int node_num = co_list[count]; + nodePool = databasePool->coordNodePools[node_num - 1]; + + if (nodePool) + { + /* Check if connections are in use */ + if (nodePool->freeSize != nodePool->size) + { + elog(WARNING, "Pool of Database %s is using Coordinator %d connections", + databasePool->database, node_num); + res = CLEAN_CONNECTION_NOT_COMPLETED; + } + + /* Destroy connections currently in Node Pool */ + if (nodePool->slot) + { + for (i = 0; i < nodePool->freeSize; i++) + destroy_slot(nodePool->slot[i]); + + /* Move slots in use at the beginning of Node Pool array */ + for (i = nodePool->freeSize; i < nodePool->size; i++ ) + nodePool->slot[i - nodePool->freeSize] = nodePool->slot[i]; + } + nodePool->size -= nodePool->freeSize; + nodePool->freeSize = 0; + } + } + + /* Release lock on Pooler, to allow transactions to connect again. */ + is_pool_cleaning = false; + return res; +} + +/* + * Take a Lock on Pooler. + * Abort PIDs registered with the agents for the given database. + * Send back to client list of PIDs signaled to watch them. + */ +int * +abort_pids(int *len, int pid, const char *database) +{ + int *pids = NULL; + int i = 0; + int count; + + Assert(!is_pool_cleaning); + Assert(agentCount > 0); + + is_pool_cleaning = true; + + pids = (int *) palloc((agentCount - 1) * sizeof(int)); + + /* Send a SIGTERM signal to all processes of Pooler agents except this one */ + for (count = 0; count < agentCount; count++) + { + if (poolAgents[count]->pid != pid && + strcmp(poolAgents[count]->pool->database, database) == 0) + { + if (kill(poolAgents[count]->pid, SIGTERM) < 0) + elog(ERROR, "kill(%ld,%d) failed: %m", + (long) poolAgents[count]->pid, SIGTERM); + + pids[i++] = poolAgents[count]->pid; + } + } + + *len = i; + + return pids; +} /* * diff --git a/src/backend/pgxc/pool/poolutils.c b/src/backend/pgxc/pool/poolutils.c new file mode 100644 index 0000000..e14c936 --- /dev/null +++ b/src/backend/pgxc/pool/poolutils.c @@ -0,0 +1,187 @@ +/*------------------------------------------------------------------------- + * + * poolutils.c + * + * Utilities for Postgres-XC pooler + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "libpq/pqsignal.h" + +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "pgxc/locator.h" +#include "pgxc/poolutils.h" +#include "access/gtm.h" +#include "commands/dbcommands.h" + +#include "nodes/parsenodes.h" + +/* + * CleanConnection() + * + * Utility to clean up Postgres-XC Pooler connections. + * This utility is launched to all the Coordinators of the cluster + * + * Use of CLEAN CONNECTION is limited to a super user. + * It is advised to clean connections before shutting down a Node or drop a Database. + * + * SQL query synopsis is as follows: + * CLEAN CONNECTION TO + * (COORDINATOR num | DATANODE num | ALL {FORCE}) + * FOR DATABASE dbname + * + * Connection cleaning has to be made on a chosen database called dbname. + * + * It is also possible to clean connections of several Coordinators or Datanodes + * Ex: CLEAN CONNECTION TO DATANODE 1,5,7 FOR DATABASE template1 + * CLEAN CONNECTION TO COORDINATOR 2,4,6 FOR DATABASE template1 + * + * Or even to all Coordinators/Datanodes at the same time + * Ex: CLEAN CONNECTION TO DATANODE * FOR DATABASE template1 + * CLEAN CONNECTION TO COORDINATOR * FOR DATABASE template1 + * + * When FORCE is used, all the transactions using pooler connections are aborted, + * and pooler connections are cleaned up. + * Ex: CLEAN CONNECTION TO ALL FORCE FOR DATABASE template1; + * + * FORCE can only be used with TO ALL, as it takes a lock on pooler to stop requests + * asking for connections, aborts all the connections in the cluster, and cleans up + * pool connections. + */ +void +CleanConnection(CleanConnStmt *stmt) +{ + ListCell *nodelist_item; + List *co_list = NIL; + List *dn_list = NIL; + List *stmt_nodes = NIL; + char *dbname = stmt->dbname; + bool is_coord = stmt->is_coord; + bool is_force = stmt->is_force; + int max_node_number = 0; + Oid oid; + + /* Only a DB administrator can clean pooler connections */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to clean pool connections"))); + + /* Check if the Database exists by getting its Oid */ + oid = get_database_oid(dbname); + if (!OidIsValid(oid)) + { + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", dbname))); + return; + } + + /* + * FORCE is activated, + * Send a SIGTERM signal to all the processes and take a lock on Pooler + * to avoid backends to take new connections when cleaning. + * Only Disconnect is allowed. + */ + if (is_force) + { + int loop = 0; + int *proc_pids = NULL; + int num_proc_pids, count; + + num_proc_pids = PoolManagerAbortTransactions(dbname, &proc_pids); + + /* + * Watch the processes that received a SIGTERM. + * At the end of the timestamp loop, processes are considered as not finished + * and force the connection cleaning has failed + */ + + while (num_proc_pids > 0 && loop < TIMEOUT_CLEAN_LOOP) + { + for (count = num_proc_pids - 1; count >= 0; count--) + { + switch(kill(proc_pids[count],0)) + { + case 0: /* Termination not done yet */ + break; + + default: + /* Move tail pid in free space */ + proc_pids[count] = proc_pids[num_proc_pids - 1]; + num_proc_pids--; + break; + } + } + pg_usleep(1000000); + loop++; + } + + if (proc_pids) + pfree(proc_pids); + + if (loop >= TIMEOUT_CLEAN_LOOP) + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("All Transactions have not been aborted"))); + } + + /* Check node list */ + if (stmt->nodes && is_coord) + max_node_number = NumCoords; + else + max_node_number = NumDataNodes; + + foreach(nodelist_item, stmt->nodes) + { + int node_num = intVal(lfirst(nodelist_item)); + stmt_nodes = lappend_int(stmt_nodes, node_num); + + if (node_num > max_node_number || + node_num < 1) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Node Number %d is incorrect", node_num))); + } + + /* Build lists to be sent to Pooler Manager */ + if (stmt->nodes && is_coord) + co_list = stmt_nodes; + else if (stmt->nodes && !is_coord) + dn_list = stmt_nodes; + else + { + co_list = GetAllCoordNodes(); + dn_list = GetAllDataNodes(); + } + + /* + * If force is launched, send a signal to all the processes + * that are in transaction and take a lock. + * Get back their process number and watch them locally here. + * Process are checked as alive or not with pg_usleep and when all processes are down + * go out of the control loop. + * If at the end of the loop processes are not down send an error to client. + * Then Make a clean with normal pool cleaner. + * Always release the lock when calling CLEAN CONNECTION. + */ + + /* Finish by contacting Pooler Manager */ + PoolManagerCleanConnection(dn_list, co_list, dbname); + + /* Clean up memory */ + if (co_list) + list_free(co_list); + if (dn_list) + list_free(dn_list); +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 0f7910d..c83563b 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2625,6 +2625,7 @@ die(SIGNAL_ARGS) errno = save_errno; } + /* * Timeout or shutdown signal from postmaster during client authentication. * Simply exit(1). diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 4187887..968b921 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -61,6 +61,7 @@ #include "pgxc/locator.h" #include "pgxc/pgxc.h" #include "pgxc/planner.h" +#include "pgxc/poolutils.h" static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, bool force_autocommit, RemoteQueryExecType exec_type); @@ -1428,6 +1429,14 @@ ProcessUtility(Node *parsetree, if (!IsConnFromCoord()) ExecRemoteUtility((RemoteQuery *) parsetree); break; + + case T_CleanConnStmt: + Assert(IS_PGXC_COORDINATOR); + CleanConnection((CleanConnStmt *) parsetree); + + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS); + break; #endif default: elog(ERROR, "unrecognized node type: %d", @@ -2432,6 +2441,9 @@ CreateCommandTag(Node *parsetree) case T_ExecDirectStmt: tag = "EXECUTE DIRECT"; break; + case T_CleanConnStmt: + tag = "CLEAN CONNECTION"; + break; default: elog(WARNING, "unrecognized node type: %d", @@ -2862,6 +2874,11 @@ GetCommandLogLevel(Node *parsetree) } break; +#ifdef PGXC + case T_CleanConnStmt: + lev = LOGSTMT_DDL; + break; +#endif default: elog(WARNING, "unrecognized node type: %d", diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 8bb49c6..7dc1981 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -355,6 +355,7 @@ typedef enum NodeTag T_AlterUserMappingStmt, T_DropUserMappingStmt, T_ExecDirectStmt, + T_CleanConnStmt, /* * TAGS FOR PARSE TREE NODES (parsenodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4d90052..7f4b20c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2415,6 +2415,18 @@ typedef struct ExecDirectStmt List *nodes; char *query; } ExecDirectStmt; + +/* + * CLEAN CONNECTION statement + */ +typedef struct CleanConnStmt +{ + NodeTag type; + List *nodes; /* list of nodes dropped */ + char *dbname; /* name of database to drop connections */ + bool is_coord; /* type of connections dropped */ + bool is_force; /* option force */ +} CleanConnStmt; /* PGXC_END */ #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 45b8ab0..0565684 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -75,6 +75,9 @@ PG_KEYWORD("characteristics", CHARACTERISTICS, UNRESERVED_KEYWORD) PG_KEYWORD("check", CHECK, RESERVED_KEYWORD) PG_KEYWORD("checkpoint", CHECKPOINT, UNRESERVED_KEYWORD) PG_KEYWORD("class", CLASS, UNRESERVED_KEYWORD) +#ifdef PGXC +PG_KEYWORD("clean", CLEAN, UNRESERVED_KEYWORD) +#endif PG_KEYWORD("close", CLOSE, UNRESERVED_KEYWORD) PG_KEYWORD("cluster", CLUSTER, UNRESERVED_KEYWORD) PG_KEYWORD("coalesce", COALESCE, COL_NAME_KEYWORD) diff --git a/src/include/pgxc/poolcomm.h b/src/include/pgxc/poolcomm.h index 9e286ab..a10af3d 100644 --- a/src/include/pgxc/poolcomm.h +++ b/src/include/pgxc/poolcomm.h @@ -45,5 +45,9 @@ extern int pool_putbytes(PoolPort *port, const char *s, size_t len); extern int pool_flush(PoolPort *port); extern int pool_sendfds(PoolPort *port, int *fds, int count); extern int pool_recvfds(PoolPort *port, int *fds, int count); +extern int pool_sendres(PoolPort *port, int res); +extern int pool_recvres(PoolPort *port); +extern int pool_sendpids(PoolPort *port, int *pids, int count); +extern int pool_recvpids(PoolPort *port, int **pids); #endif /* POOLCOMM_H */ diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index 3a615cc..7b6e4d4 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -63,6 +63,8 @@ typedef struct databasepool */ typedef struct { + /* Process ID of postmaster child process associated to pool agent */ + int pid; /* communication channel */ PoolPort port; DatabasePool *pool; @@ -132,6 +134,12 @@ extern void PoolManagerConnect(PoolHandle *handle, const char *database); /* Get pooled connections */ extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist); +/* Clean pool connections */ +extern void PoolManagerCleanConnection(List *datanodelist, List *coordlist, char *dbname); + +/* Send Abort signal to transactions being run */ +extern int PoolManagerAbortTransactions(char *dbname, int **proc_pids); + /* Return connections back to the pool, for both Coordinator and Datanode connections */ extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard); diff --git a/src/include/pgxc/poolutils.h b/src/include/pgxc/poolutils.h new file mode 100644 index 0000000..0f34561 --- /dev/null +++ b/src/include/pgxc/poolutils.h @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * + * poolutils.h + * + * Utilities for Postgres-XC Pooler + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + *------------------------------------------------------------------------- + */ + +#ifndef POOLUTILS_H +#define POOLUTILS_H + +#include "nodes/parsenodes.h" + +#define TIMEOUT_CLEAN_LOOP 10 /* Wait 10s for all the transactions to shutdown */ + +/* Error codes for connection cleaning */ +#define CLEAN_CONNECTION_COMPLETED 0 +#define CLEAN_CONNECTION_NOT_COMPLETED 1 +#define CLEAN_CONNECTION_TX_REMAIN 2 +#define CLEAN_CONNECTION_EOF -1 + +void CleanConnection(CleanConnStmt *stmt); +#endif ----------------------------------------------------------------------- Summary of changes: src/backend/parser/gram.y | 64 +++++++- src/backend/pgxc/locator/locator.c | 4 +- src/backend/pgxc/pool/Makefile | 2 +- src/backend/pgxc/pool/pgxcnode.c | 2 +- src/backend/pgxc/pool/poolcomm.c | 181 +++++++++++++++++++++- src/backend/pgxc/pool/poolmgr.c | 310 ++++++++++++++++++++++++++++++++++-- src/backend/pgxc/pool/poolutils.c | 187 ++++++++++++++++++++++ src/backend/tcop/postgres.c | 1 + src/backend/tcop/utility.c | 17 ++ src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 12 ++ src/include/parser/kwlist.h | 3 + src/include/pgxc/poolcomm.h | 4 + src/include/pgxc/poolmgr.h | 8 + src/include/pgxc/poolutils.h | 30 ++++ 15 files changed, 803 insertions(+), 23 deletions(-) create mode 100644 src/backend/pgxc/pool/poolutils.c create mode 100644 src/include/pgxc/poolutils.h hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-15 15:03:17
|
Project "Postgres-XC". The branch, master has been updated via 195026c49901d9526d217f4f826fe4de1e751b48 (commit) from df5bbea8a52731418be0d5346ec2a0464cc78b08 (commit) - Log ----------------------------------------------------------------- commit 195026c49901d9526d217f4f826fe4de1e751b48 Author: Mason Sharp <ma...@us...> Date: Tue Nov 16 00:02:48 2010 +0900 Fix PGXC macro usage diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 130dff3..cda0849 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -114,8 +114,9 @@ typedef struct List *subplans; /* List of subplans, in plan-tree case */ Plan *outer_plan; /* OUTER subplan, or NULL if none */ Plan *inner_plan; /* INNER subplan, or NULL if none */ - +#ifdef PGXC bool remotequery; /* deparse context for remote query */ +#endif } deparse_namespace; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 85384b5..8d1e35a 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -595,8 +595,8 @@ extern Datum pg_get_function_identity_arguments(PG_FUNCTION_ARGS); extern Datum pg_get_function_result(PG_FUNCTION_ARGS); extern char *deparse_expression(Node *expr, List *dpcontext, bool forceprefix, bool showimplicit); -extern List *deparse_context_for_remotequery(const char *aliasname, Oid relid); #ifdef PGXC +extern List *deparse_context_for_remotequery(const char *aliasname, Oid relid); extern List *deparse_context_for(const char *aliasname, Oid relid); #endif extern List *deparse_context_for_plan(Node *plan, Node *outer_plan, ----------------------------------------------------------------------- Summary of changes: src/backend/utils/adt/ruleutils.c | 3 ++- src/include/utils/builtins.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-15 14:59:06
|
Project "Postgres-XC". The branch, master has been updated via df5bbea8a52731418be0d5346ec2a0464cc78b08 (commit) from 2de110a8491bf090c0dd51033a674b8de718d706 (commit) - Log ----------------------------------------------------------------- commit df5bbea8a52731418be0d5346ec2a0464cc78b08 Author: Mason Sharp <ma...@us...> Date: Mon Nov 15 23:58:30 2010 +0900 Fix some compiler warnings diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index e46065d..94f851a 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -22,6 +22,7 @@ #include "catalog/pg_type.h" #include "executor/executor.h" #include "lib/stringinfo.h" +#include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/nodes.h" #include "nodes/parsenodes.h" @@ -1586,7 +1587,6 @@ get_plan_nodes(Query *query, RemoteQuery *step, bool isRead) /* * get_plan_nodes_command - determine the nodes to execute the plan on * - * return NULL if it is not safe to be done in a single step. */ static void get_plan_nodes_command(Query *query, RemoteQuery *step) @@ -1606,6 +1606,9 @@ get_plan_nodes_command(Query *query, RemoteQuery *step) /* treat as a select */ get_plan_nodes(query, step, false); break; + + default: + break; } } @@ -2393,7 +2396,7 @@ handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stm for (c = &query_step->sql_statement[0]; c != pos && *c != '\0'; *newchar++ = *c++); if (query->limitCount) - sprintf(newchar, "LIMIT %I64d", newLimit); + sprintf(newchar, "LIMIT " INT64_FORMAT, newLimit); else *newchar = '\0'; @@ -2840,7 +2843,7 @@ validate_part_col_updatable(const Query *query) if (rte != NULL && rte->rtekind != RTE_RELATION) /* Bad relation type */ - return NULL; + return; /* See if we have the partitioned case. */ rel_loc_info = GetRelationLocInfo(rte->relid); ----------------------------------------------------------------------- Summary of changes: src/backend/pgxc/plan/planner.c | 9 ++++++--- 1 files changed, 6 insertions(+), 3 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-15 06:50:25
|
Project "Postgres-XC". The branch, master has been updated via 2de110a8491bf090c0dd51033a674b8de718d706 (commit) from 47477fd696a6c1110ea9c1b47ad60a403468dae3 (commit) - Log ----------------------------------------------------------------- commit 2de110a8491bf090c0dd51033a674b8de718d706 Author: Mason Sharp <ma...@us...> Date: Mon Nov 15 15:48:18 2010 +0900 Do not allow the partition column to be updated on hash partitioned tables until we support moving tuples. This corresponds to bug #3107683 on SourceForge. Written by Benny Mei Le diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index af368e4..e46065d 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -148,6 +148,7 @@ static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); static void InitXCWalkerContext(XCWalkerContext *context); +static void validate_part_col_updatable(const Query *query); /* * Find position of specified substring in the string @@ -2465,10 +2466,18 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) case CMD_INSERT: case CMD_UPDATE: case CMD_DELETE: + /* PGXCTODO: This validation will not be removed + * until we support moving tuples from one node to another + * when the partition column of a table is updated + */ + if (query->commandType == CMD_UPDATE) + validate_part_col_updatable(query); + if (query->returningList) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("RETURNING clause not yet supported")))); + /* Set result relations */ if (query->commandType != CMD_SELECT) result->resultRelations = list_make1_int(query->resultRelation); @@ -2812,4 +2821,56 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, return result; } +/* + * validate whether partition column of a table is being updated + */ +static void +validate_part_col_updatable(const Query *query) +{ + RangeTblEntry *rte; + RelationLocInfo *rel_loc_info; + ListCell *lc; + + /* Make sure there is one table at least */ + if (query->rtable == NULL) + return; + + rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1); + + + if (rte != NULL && rte->rtekind != RTE_RELATION) + /* Bad relation type */ + return NULL; + + /* See if we have the partitioned case. */ + rel_loc_info = GetRelationLocInfo(rte->relid); + + if (!rel_loc_info) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Could not find relation for oid = %d", rte->relid)))); + + + /* Only LOCATOR_TYPE_HASH should be checked */ + if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH && + rel_loc_info->partAttrName != NULL) + { + /* It is a partitioned table, check partition column in targetList */ + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (tle->resjunk) + continue; + /* + * See if we have a constant expression comparing against the + * designated partitioned column + */ + if (strcmp(tle->resname, rel_loc_info->partAttrName) == 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + (errmsg("Partition column can't be updated in current version")))); + } + } +} ----------------------------------------------------------------------- Summary of changes: src/backend/pgxc/plan/planner.c | 61 +++++++++++++++++++++++++++++++++++++++ 1 files changed, 61 insertions(+), 0 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-15 00:37:27
|
Project "Postgres-XC". The branch, master has been updated via 47477fd696a6c1110ea9c1b47ad60a403468dae3 (commit) from e0a1c694304e5ea591addb66faf5733311a9ba28 (commit) - Log ----------------------------------------------------------------- commit 47477fd696a6c1110ea9c1b47ad60a403468dae3 Author: Mason Sharp <ma...@us...> Date: Mon Nov 15 09:35:58 2010 +0900 Support for COPY SELECT Recent portal integration changes have allowed this support to happen "for free". We just remove the message that indicated that it was not supported. diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index a1da3a0..b87b07b 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1295,13 +1295,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString) Assert(!is_from); cstate->rel = NULL; -#ifdef PGXC - if (IS_PGXC_COORDINATOR) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY (SELECT) is not supported in PGXC"))); -#endif - /* Don't allow COPY w/ OIDs from a select */ if (cstate->oids) ereport(ERROR, ----------------------------------------------------------------------- Summary of changes: src/backend/commands/copy.c | 7 ------- 1 files changed, 0 insertions(+), 7 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-14 19:58:32
|
Project "Postgres-XC". The branch, master has been updated via e0a1c694304e5ea591addb66faf5733311a9ba28 (commit) from 894fc6d421a6ec402b3b02ba6f3988c1068cc113 (commit) - Log ----------------------------------------------------------------- commit e0a1c694304e5ea591addb66faf5733311a9ba28 Author: Mason Sharp <ma...@us...> Date: Mon Nov 15 04:52:45 2010 +0900 1. Support for UPDATE and DELETE WHERE CURRENT OF for replicated tables through a cursor. Cursor on replicated table should be declared with FOR UPDATE to be updateable. CTID is appended to the field list in this case. Plan for replicated WHERE CURRENT OF consists of multiple steps: - Select all values of current row by CTID (on single node) - Declare cursor for select from the table with WHERE clause looking like col1 = value1 AND col2 = value2 AND ..., all columns of the table are listed (on all nodes) - Move the cursor to the first position (on all nodes) - Update the cursor (on all nodes) 2. Support for Extended Query Protol (EQP) in coordinator-data node communication. Allows handling data node cursors effectively. Planner tells executor to use EQP by setting cursor field of RemoteQuery. 3. Enabled data node connection sharing between multiple concurrent RemoteQueries. Needed to better support concurrent user defined cursors and multi-step query plans. If another RemoteQuery needs to use data node connection, pending input intended for previous connection is buffered, and when previous RemoteQuery is active again it will consume buffered input first. It is recommended to use EQP for multi-step plans and user-defined cursors to minimize amount of buffered data if executor is switching between RemoteQueries. By Andrei Martsinchyk diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 604d48a..22ccd16 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2281,6 +2281,21 @@ AbortTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); +#ifdef PGXC + /* + * We should rollback on the data nodes before cleaning up portals + * to be sure data structures used by connections are not freed yet + */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + /* + * Make sure this is rolled back on the DataNodes + * if so it will just return + */ + PGXCNodeRollback(); + } +#endif + /* * do abort processing */ @@ -2301,11 +2316,6 @@ AbortTransaction(void) /* This is done by remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - /* - * Make sure this is rolled back on the DataNodes - * if so it will just return - */ - PGXCNodeRollback(); RollbackTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 53e424b..3a65361 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -153,6 +153,7 @@ ExecCreateTupleTable(int tableSize) slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; slot->tts_attinmeta = NULL; #endif slot->tts_mcxt = CurrentMemoryContext; @@ -238,6 +239,7 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc) slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; slot->tts_attinmeta = NULL; #endif slot->tts_mcxt = CurrentMemoryContext; @@ -440,6 +442,7 @@ ExecStoreTuple(HeapTuple tuple, slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif /* @@ -509,6 +512,7 @@ ExecStoreMinimalTuple(MinimalTuple mtup, slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif /* @@ -547,7 +551,8 @@ ExecStoreMinimalTuple(MinimalTuple mtup, * -------------------------------- */ TupleTableSlot * -ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree) +ExecStoreDataRowTuple(char *msg, size_t len, int node, TupleTableSlot *slot, + bool shouldFree) { /* * sanity checks @@ -586,6 +591,7 @@ ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFr slot->tts_mintuple = NULL; slot->tts_dataRow = msg; slot->tts_dataLen = len; + slot->tts_dataNode = node; /* Mark extracted state invalid */ slot->tts_nvalid = 0; @@ -624,6 +630,7 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif slot->tts_tuple = NULL; @@ -976,6 +983,7 @@ ExecMaterializeSlot(TupleTableSlot *slot) { slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; } #endif diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d4ae006..b184987 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -839,6 +839,7 @@ _copyRemoteQuery(RemoteQuery *from) COPY_NODE_FIELD(distinct); COPY_SCALAR_FIELD(read_only); COPY_SCALAR_FIELD(force_autocommit); + COPY_STRING_FIELD(cursor); COPY_STRING_FIELD(relname); COPY_SCALAR_FIELD(remotejoin); @@ -888,7 +889,7 @@ _copySimpleAgg(SimpleAgg *from) COPY_SCALAR_FIELD(transfn); COPY_SCALAR_FIELD(finalfn); if (!from->initValueIsNull) - newnode->initValue = datumCopy(from->initValue, from->transtypeByVal, + newnode->initValue = datumCopy(from->initValue, from->transtypeByVal, from->transtypeLen); COPY_SCALAR_FIELD(initValueIsNull); COPY_SCALAR_FIELD(inputtypeLen); diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 5ea6d8a..af368e4 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -31,6 +31,7 @@ #include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/planner.h" #include "tcop/pquery.h" @@ -38,6 +39,7 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/portal.h" #include "utils/syscache.h" @@ -123,7 +125,7 @@ typedef struct XCWalkerContext { Query *query; bool isRead; - ExecNodes *exec_nodes; /* resulting execution nodes */ + RemoteQuery *query_step; /* remote query step being analized */ Special_Conditions *conditions; bool multilevel_join; List *rtables; /* a pointer to a list of rtables */ @@ -141,13 +143,44 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -static ExecNodes *get_plan_nodes(Query *query, bool isRead); +static void get_plan_nodes(Query *query, RemoteQuery *step, bool isRead); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); static void InitXCWalkerContext(XCWalkerContext *context); /* + * Find position of specified substring in the string + * All non-printable symbols of str treated as spaces, all letters as uppercase + * Returns pointer to the beginning of the substring or NULL + */ +static char * +strpos(char *str, char *substr) +{ + char copy[strlen(str) + 1]; + char *src = str; + char *dst = copy; + + /* + * Initialize mutable copy, converting letters to uppercase and + * various witespace characters to spaces + */ + while (*src) + { + if (isspace(*src)) + { + src++; + *dst++ = ' '; + } + else + *dst++ = toupper(*src++); + } + *dst = '\0'; + dst = strstr(copy, substr); + return dst ? str + (dst - copy) : NULL; +} + +/* * True if both lists contain only one node and are the same */ static bool @@ -509,7 +542,7 @@ get_plan_nodes_insert(Query *query) * Get list of parent-child joins (partitioned together) * Get list of joins with replicated tables * - * If we encounter an expression such as a cross-node join that cannot + * If we encounter an expression such as a cross-node join that cannot * be easily handled in a single step, we stop processing and return true, * otherwise false. * @@ -536,6 +569,242 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) if (!context->conditions) context->conditions = new_special_conditions(); + /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */ + if (IsA(expr_node, CurrentOfExpr)) + { + /* Find referenced portal and figure out what was the last fetch node */ + Portal portal; + QueryDesc *queryDesc; + PlanState *state; + CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node; + char *cursor_name = cexpr->cursor_name; + char *node_cursor; + + /* Find the cursor's portal */ + portal = GetPortalByName(cursor_name); + if (!PortalIsValid(portal)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_CURSOR), + errmsg("cursor \"%s\" does not exist", cursor_name))); + + queryDesc = PortalGetQueryDesc(portal); + if (queryDesc == NULL || queryDesc->estate == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_STATE), + errmsg("cursor \"%s\" is held from a previous transaction", + cursor_name))); + + /* + * The cursor must have a current result row: per the SQL spec, it's + * an error if not. + */ + if (portal->atStart || portal->atEnd) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_STATE), + errmsg("cursor \"%s\" is not positioned on a row", + cursor_name))); + + state = ExecGetActivePlanTree(queryDesc); + if (IsA(state, RemoteQueryState)) + { + RemoteQueryState *node = (RemoteQueryState *) state; + RemoteQuery *step = (RemoteQuery *) state->plan; + + /* + * 1. step query: SELECT * FROM <table> WHERE ctid = <cur_ctid>, + * <cur_ctid> is taken from the scantuple ot the target step + * step node list: current node of the target step. + * 2. step query: DECLARE <xxx> CURSOR FOR SELECT * FROM <table> + * WHERE <col1> = <val1> AND <col2> = <val2> ... FOR UPDATE + * <xxx> is generated from cursor name of the target step, + * <col> and <val> pairs are taken from the step 1. + * step node list: all nodes of <table> + * 3. step query: MOVE <xxx> + * step node list: all nodes of <table> + */ + RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable); + node_cursor = step->cursor; + rel_loc_info1 = GetRelationLocInfo(table->relid); + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->baselocatortype = rel_loc_info1->locatorType; + if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) + { + RemoteQuery *step1, *step2, *step3; + /* + * We do not need first three steps if cursor already exists and + * positioned. + */ + if (node->update_cursor) + { + step3 = NULL; + node_cursor = node->update_cursor; + } + else + { + char *tableName = get_rel_name(table->relid); + int natts = get_relnatts(table->relid); + char *attnames[natts]; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + /* + * ctid is the last attribute, but more correct to iterate over + * attributes and find by name, or store index for table + */ + Datum ctid = slot->tts_values[slot->tts_tupleDescriptor->natts - 1]; + char *ctid_str = (char *) DirectFunctionCall1(tidout, ctid); + int nodenum = slot->tts_dataNode; + AttrNumber att; + StringInfoData buf; + HeapTuple tp; + int i; + MemoryContext context_save; + + initStringInfo(&buf); + + /* Step 1: select tuple values by ctid */ + step1 = makeNode(RemoteQuery); + appendStringInfoString(&buf, "SELECT "); + for (att = 1; att <= natts; att++) + { + TargetEntry *tle; + Var *expr; + + tp = SearchSysCache(ATTNUM, + ObjectIdGetDatum(table->relid), + Int16GetDatum(att), + 0, 0); + if (HeapTupleIsValid(tp)) + { + Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp); + + /* add comma before all except first attributes */ + if (att > 1) + appendStringInfoString(&buf, ", "); + attnames[att-1] = pstrdup(NameStr(att_tup->attname)); + appendStringInfoString(&buf, attnames[att - 1]); + expr = makeVar(att, att, att_tup->atttypid, + att_tup->atttypmod, 0); + tle = makeTargetEntry((Expr *) expr, att, + attnames[att - 1], false); + step1->scan.plan.targetlist = lappend(step1->scan.plan.targetlist, tle); + ReleaseSysCache(tp); + } + else + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + att, table->relid); + } + appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'", + tableName, ctid_str); + step1->sql_statement = pstrdup(buf.data); + step1->is_single_step = true; + step1->exec_nodes = makeNode(ExecNodes); + step1->read_only = true; + step1->exec_nodes->nodelist = list_make1_int(nodenum); + + /* Step 2: declare cursor for update target table */ + step2 = makeNode(RemoteQuery); + resetStringInfo(&buf); + + appendStringInfoString(&buf, step->cursor); + appendStringInfoString(&buf, "upd"); + /* This need to survive while the target Portal is alive */ + context_save = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + node_cursor = pstrdup(buf.data); + node->update_cursor = node_cursor; + MemoryContextSwitchTo(context_save); + resetStringInfo(&buf); + + appendStringInfo(&buf, + "DECLARE %s CURSOR FOR SELECT * FROM %s WHERE ", + node_cursor, tableName); + for (i = 0; i < natts; i++) + { + /* add comma before all except first attributes */ + if (i) + appendStringInfoString(&buf, "AND "); + appendStringInfo(&buf, "%s = $%d ", attnames[i], i+1); + } + appendStringInfoString(&buf, "FOR UPDATE"); + step2->sql_statement = pstrdup(buf.data); + step2->is_single_step = true; + step2->read_only = true; + step2->exec_nodes = makeNode(ExecNodes); + step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + innerPlan(step2) = (Plan *) step1; + /* Step 3: move cursor to first position */ + step3 = makeNode(RemoteQuery); + resetStringInfo(&buf); + appendStringInfo(&buf, "MOVE %s", node_cursor); + step3->sql_statement = pstrdup(buf.data); + step3->is_single_step = true; + step3->read_only = true; + step3->exec_nodes = makeNode(ExecNodes); + step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + innerPlan(step3) = (Plan *) step2; + + innerPlan(context->query_step) = (Plan *) step3; + + pfree(buf.data); + } + context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + } + else + { + /* Take target node from last scan tuple of referenced step */ + int curr_node = node->ss.ss_ScanTupleSlot->tts_dataNode; + context->query_step->exec_nodes->nodelist = lappend_int(context->query_step->exec_nodes->nodelist, curr_node); + } + FreeRelationLocInfo(rel_loc_info1); + + context->query_step->is_single_step = true; + /* + * replace cursor name in the query if differs + */ + if (strcmp(cursor_name, node_cursor)) + { + StringInfoData buf; + char *str = context->query->sql_statement; + /* + * Find last occurence of cursor_name + */ + for (;;) + { + char *next = strstr(str + 1, cursor_name); + if (next) + str = next; + else + break; + } + + /* + * now str points to cursor name truncate string here + * do not care the string is modified - we will pfree it + * soon anyway + */ + *str = '\0'; + + /* and move str at the beginning of the reminder */ + str += strlen(cursor_name); + + /* build up new statement */ + initStringInfo(&buf); + appendStringInfoString(&buf, context->query->sql_statement); + appendStringInfoString(&buf, node_cursor); + appendStringInfoString(&buf, str); + + /* take the result */ + pfree(context->query->sql_statement); + context->query->sql_statement = buf.data; + } + return false; + } + // ??? current plan node is not a remote query + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->exec_on_coord = true; + return false; + } + if (IsA(expr_node, Var)) { /* If we get here, that meant the previous call before recursing down did not @@ -800,13 +1069,13 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) bool is_multilevel; int save_parent_child_count = 0; SubLink *sublink = (SubLink *) expr_node; - ExecNodes *save_exec_nodes = context->exec_nodes; /* Save old exec_nodes */ + ExecNodes *save_exec_nodes = context->query_step->exec_nodes; /* Save old exec_nodes */ /* save parent-child count */ - if (context->exec_nodes) + if (context->query_step->exec_nodes) save_parent_child_count = list_length(context->conditions->partitioned_parent_child); - context->exec_nodes = NULL; + context->query_step->exec_nodes = NULL; context->multilevel_join = false; current_rtable = ((Query *) sublink->subselect)->rtable; @@ -823,31 +1092,31 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) context->multilevel_join = false; /* Allow for replicated tables */ - if (!context->exec_nodes) - context->exec_nodes = save_exec_nodes; + if (!context->query_step->exec_nodes) + context->query_step->exec_nodes = save_exec_nodes; else { if (save_exec_nodes) { - if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) + if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) { - context->exec_nodes = save_exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; } else { if (save_exec_nodes->tableusagetype != TABLE_USAGE_TYPE_USER_REPLICATED) { /* See if they run on the same node */ - if (same_single_node (context->exec_nodes->nodelist, save_exec_nodes->nodelist)) + if (same_single_node (context->query_step->exec_nodes->nodelist, save_exec_nodes->nodelist)) return false; } else /* use old value */ - context->exec_nodes = save_exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; } } else { - if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) + if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) return false; /* See if subquery safely joins with parent */ if (!is_multilevel) @@ -986,8 +1255,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (contains_only_pg_catalog (query->rtable)) { /* just pg_catalog tables */ - context->exec_nodes = makeNode(ExecNodes); - context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; context->exec_on_coord = true; return false; } @@ -1005,7 +1274,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (rte->rtekind == RTE_SUBQUERY) { - ExecNodes *save_exec_nodes = context->exec_nodes; + ExecNodes *save_exec_nodes = context->query_step->exec_nodes; Special_Conditions *save_conditions = context->conditions; /* Save old conditions */ List *current_rtable = rte->subquery->rtable; @@ -1025,8 +1294,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) context->rtables = list_delete_ptr(context->rtables, current_rtable); context->conditions = save_conditions; - current_nodes = context->exec_nodes; - context->exec_nodes = save_exec_nodes; + current_nodes = context->query_step->exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; if (current_nodes) current_usage_type = current_nodes->tableusagetype; @@ -1103,8 +1372,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* If we are just dealing with pg_catalog, just return */ if (table_usage_type == TABLE_USAGE_TYPE_PGCATALOG) { - context->exec_nodes = makeNode(ExecNodes); - context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; context->exec_on_coord = true; return false; } @@ -1113,6 +1382,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (examine_conditions_walker(query->jointree->quals, context)) return true; + if (context->query_step->exec_nodes) + return false; + /* Examine join conditions, see if each join is single-node safe */ if (context->join_list != NULL) { @@ -1174,27 +1446,27 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (!rel_loc_info) return true; - context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); } } else { - context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); } /* Note replicated table usage for determining safe queries */ - if (context->exec_nodes) + if (context->query_step->exec_nodes) { if (table_usage_type == TABLE_USAGE_TYPE_USER && IsReplicated(rel_loc_info)) table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED; - context->exec_nodes->tableusagetype = table_usage_type; + context->query_step->exec_nodes->tableusagetype = table_usage_type; } } /* check for partitioned col comparison against a literal */ else if (list_length(context->conditions->partitioned_literal_comps) > 0) { - context->exec_nodes = NULL; + context->query_step->exec_nodes = NULL; /* * Make sure that if there are multiple such comparisons, that they @@ -1208,11 +1480,11 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) lit_comp->rel_loc_info, &(lit_comp->constant), true); test_exec_nodes->tableusagetype = table_usage_type; - if (context->exec_nodes == NULL) - context->exec_nodes = test_exec_nodes; + if (context->query_step->exec_nodes == NULL) + context->query_step->exec_nodes = test_exec_nodes; else { - if (!same_single_node(context->exec_nodes->nodelist, test_exec_nodes->nodelist)) + if (!same_single_node(context->query_step->exec_nodes->nodelist, test_exec_nodes->nodelist)) { return true; } @@ -1231,22 +1503,22 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) parent_child = (Parent_Child_Join *) linitial(context->conditions->partitioned_parent_child); - context->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead); - context->exec_nodes->tableusagetype = table_usage_type; + context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead); + context->query_step->exec_nodes->tableusagetype = table_usage_type; } if (from_query_nodes) { - if (!context->exec_nodes) + if (!context->query_step->exec_nodes) { - context->exec_nodes = from_query_nodes; + context->query_step->exec_nodes = from_query_nodes; return false; } /* Just use exec_nodes if the from subqueries are all replicated or using the exact * same node */ else if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED - || (same_single_node(from_query_nodes->nodelist, context->exec_nodes->nodelist))) + || (same_single_node(from_query_nodes->nodelist, context->query_step->exec_nodes->nodelist))) return false; else { @@ -1254,7 +1526,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) * but the parent query applies a condition on the from subquery. */ if (list_length(query->jointree->fromlist) == from_subquery_count - && list_length(context->exec_nodes->nodelist) == 1) + && list_length(context->query_step->exec_nodes->nodelist) == 1) return false; } /* Too complicated, give up */ @@ -1270,8 +1542,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) static void InitXCWalkerContext(XCWalkerContext *context) { + context->query = NULL; context->isRead = true; - context->exec_nodes = NULL; + context->query_step = NULL; context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); context->rtables = NIL; context->multilevel_join = false; @@ -1286,27 +1559,26 @@ InitXCWalkerContext(XCWalkerContext *context) * Top level entry point before walking query to determine plan nodes * */ -static ExecNodes * -get_plan_nodes(Query *query, bool isRead) +static void +get_plan_nodes(Query *query, RemoteQuery *step, bool isRead) { - ExecNodes *result_nodes = NULL; XCWalkerContext context; InitXCWalkerContext(&context); + context.query = query; context.isRead = isRead; + context.query_step = step; context.rtables = lappend(context.rtables, query->rtable); - if (!get_plan_nodes_walker((Node *) query, &context)) - result_nodes = context.exec_nodes; - if (context.exec_on_coord && result_nodes) + if ((get_plan_nodes_walker((Node *) query, &context) + || context.exec_on_coord) && context.query_step->exec_nodes) { - pfree(result_nodes); - result_nodes = NULL; + pfree(context.query_step->exec_nodes); + context.query_step->exec_nodes = NULL; } free_special_relations(context.conditions); free_join_list(context.join_list); - return result_nodes; } @@ -1315,32 +1587,25 @@ get_plan_nodes(Query *query, bool isRead) * * return NULL if it is not safe to be done in a single step. */ -static ExecNodes * -get_plan_nodes_command(Query *query) +static void +get_plan_nodes_command(Query *query, RemoteQuery *step) { - ExecNodes *exec_nodes = NULL; - switch (query->commandType) { case CMD_SELECT: - exec_nodes = get_plan_nodes(query, true); + get_plan_nodes(query, step, true); break; case CMD_INSERT: - exec_nodes = get_plan_nodes_insert(query); + step->exec_nodes = get_plan_nodes_insert(query); break; case CMD_UPDATE: case CMD_DELETE: /* treat as a select */ - exec_nodes = get_plan_nodes(query, false); + get_plan_nodes(query, step, false); break; - - default: - return NULL; } - - return exec_nodes; } @@ -1645,8 +1910,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, List *sub_tlist = step->scan.plan.targetlist; ListCell *l; StringInfo buf = makeStringInfo(); - char *sql; - char *cur; char *sql_from; context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL); @@ -1677,23 +1940,9 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, * Do not handle the case if " FROM " we found is not a "FROM" keyword, but, * for example, a part of string constant. */ - sql = pstrdup(step->sql_statement); /* mutable copy */ - /* string to upper case, for comparing */ - cur = sql; - while (*cur) - { - /* replace whitespace with a space */ - if (isspace((unsigned char) *cur)) - *cur = ' '; - *cur++ = toupper(*cur); - } - - /* find the keyword */ - sql_from = strstr(sql, " FROM "); + sql_from = strpos(step->sql_statement, " FROM "); if (sql_from) { - /* the same offset in the original string */ - int offset = sql_from - sql; /* * Truncate query at the position of terminating semicolon to be able * to append extra order by entries. If query is submitted from client @@ -1705,7 +1954,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, if (*end == ';') *end = '\0'; - appendStringInfoString(buf, step->sql_statement + offset); + appendStringInfoString(buf, sql_from); } if (extra_sort) @@ -1728,9 +1977,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, } } - /* do not need the copy */ - pfree(sql); - /* free previous query */ pfree(step->sql_statement); /* get a copy of new query */ @@ -1742,6 +1988,81 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, /* + * Traverse the plan subtree and set cursor name for RemoteQuery nodes + * Cursor names must be unique, so append step_no parameter to the initial + * cursor name. Returns next step_no to be assigned + */ +static int +set_cursor_name(Plan *subtree, char *cursor, int step_no) +{ + if (innerPlan(subtree)) + step_no = set_cursor_name(innerPlan(subtree), cursor, step_no); + if (outerPlan(subtree)) + step_no = set_cursor_name(outerPlan(subtree), cursor, step_no); + if (IsA(subtree, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) subtree; + /* + * Keep the name for the very first step, hoping it is the only step and + * we do not have to modify WHERE CURRENT OF + */ + if (step_no) + { + StringInfoData buf; + initStringInfo(&buf); + appendStringInfo(&buf, "%s%d", cursor, step_no++); + /* make a copy before overwriting */ + step->cursor = buf.data; + } + else + { + step_no++; + step->cursor = pstrdup(cursor); + } + } + return step_no; +} + +/* + * Append ctid to the field list of step queries to support update + * WHERE CURRENT OF. The ctid is not sent down to client but used as a key + * to find target tuple + */ +static void +fetch_ctid_of(Plan *subtree, RowMarkClause *rmc) +{ + /* recursively process subnodes */ + if (innerPlan(subtree)) + fetch_ctid_of(innerPlan(subtree), rmc); + if (outerPlan(subtree)) + fetch_ctid_of(outerPlan(subtree), rmc); + + /* we are only interested in RemoteQueries */ + if (IsA(subtree, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) subtree; + /* + * TODO Find if the table is referenced by the step query + */ + + char *from_sql = strpos(step->sql_statement, " FROM "); + if (from_sql) + { + StringInfoData buf; + + initStringInfo(&buf); + appendBinaryStringInfo(&buf, step->sql_statement, + (int) (from_sql - step->sql_statement)); + /* TODO qualify with the table name */ + appendStringInfoString(&buf, ", ctid"); + appendStringInfoString(&buf, from_sql); + pfree(step->sql_statement); + step->sql_statement = buf.data; + } + } +} + +/* * Plan to sort step tuples * PGXC: copied and adopted from optimizer/plan/planner.c */ @@ -2114,44 +2435,9 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) query_step = makeNode(RemoteQuery); query_step->is_single_step = false; - /* - * Declare Cursor case: - * We should leave as a step query only SELECT statement - * Further if we need refer source statement for planning we should take - * the truncated string - */ - if (query->utilityStmt && + if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) - { - - char *src = query->sql_statement; - char str[strlen(src) + 1]; /* mutable copy */ - char *dst = str; - - cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; - - /* - * Initialize mutable copy, converting letters to uppercase and - * various witespace characters to spaces - */ - while (*src) - { - if (isspace(*src)) - { - src++; - *dst++ = ' '; - } - else - *dst++ = toupper(*src++); - } - *dst = '\0'; - /* search for SELECT keyword in the normalized string */ - dst = strstr(str, " SELECT "); - /* Take substring of the original string using found offset */ - query_step->sql_statement = pstrdup(query->sql_statement + (dst - str + 1)); - } - else - query_step->sql_statement = pstrdup(query->sql_statement); + cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; @@ -2187,7 +2473,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) if (query->commandType != CMD_SELECT) result->resultRelations = list_make1_int(query->resultRelation); - query_step->exec_nodes = get_plan_nodes_command(query); + get_plan_nodes_command(query, query_step); if (query_step->exec_nodes == NULL) { @@ -2217,26 +2503,29 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) } /* - * Use standard plan if we have more than one data node with either - * group by, hasWindowFuncs, or hasRecursive + * get_plan_nodes_command may alter original statement, so do not + * process it before the call + * + * Declare Cursor case: + * We should leave as a step query only SELECT statement + * Further if we need refer source statement for planning we should take + * the truncated string */ - /* - * PGXCTODO - this could be improved to check if the first - * group by expression is the partitioning column, in which - * case it is ok to treat as a single step. - */ - if (query->commandType == CMD_SELECT - && query_step->exec_nodes - && list_length(query_step->exec_nodes->nodelist) > 1 - && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + if (query->utilityStmt && + IsA(query->utilityStmt, DeclareCursorStmt)) { - result = standard_planner(query, cursorOptions, boundParams); - return result; + + /* search for SELECT keyword in the normalized string */ + char *select = strpos(query->sql_statement, " SELECT "); + /* Take substring of the original string using found offset */ + query_step->sql_statement = pstrdup(select + 1); } + else + query_step->sql_statement = pstrdup(query->sql_statement); /* - * If there already is an active portal, we may be doing planning - * within a function. Just use the standard plan, but check if + * If there already is an active portal, we may be doing planning + * within a function. Just use the standard plan, but check if * it is part of an EXPLAIN statement so that we do not show that * we plan multiple steps when it is a single-step operation. */ @@ -2285,6 +2574,24 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result = standard_planner(query, cursorOptions, boundParams); return result; } + + /* + * Use standard plan if we have more than one data node with either + * group by, hasWindowFuncs, or hasRecursive + */ + /* + * PGXCTODO - this could be improved to check if the first + * group by expression is the partitioning column, in which + * case it is ok to treat as a single step. + */ + if (query->commandType == CMD_SELECT + && query_step->exec_nodes + && list_length(query_step->exec_nodes->nodelist) > 1 + && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + { + result->planTree = standardPlan; + return result; + } break; default: @@ -2307,6 +2614,33 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result->planTree = materialize_finished_plan(result->planTree); } + /* + * Support for multi-step cursor. + * Ensure uniqueness of remote cursor name + */ + if (query->utilityStmt && + IsA(query->utilityStmt, DeclareCursorStmt)) + { + DeclareCursorStmt *stmt = (DeclareCursorStmt *) query->utilityStmt; + set_cursor_name(result->planTree, stmt->portalname, 0); + } + + /* + * If query is FOR UPDATE fetch CTIDs from the remote node + * Use CTID as a key to update tuples on remote nodes when handling + * WHERE CURRENT OF + */ + if (query->rowMarks) + { + ListCell *lc; + foreach(lc, query->rowMarks) + { + RowMarkClause *rmc = (RowMarkClause *) lfirst(lc); + + fetch_ctid_of(result->planTree, rmc); + } + } + return result; } @@ -2321,6 +2655,8 @@ free_query_step(RemoteQuery *query_step) return; pfree(query_step->sql_statement); + if (query_step->cursor) + pfree(query_step->cursor); if (query_step->exec_nodes) { if (query_step->exec_nodes->nodelist) diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 7fe08be..a524c13 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -35,6 +35,8 @@ #define END_QUERY_TIMEOUT 20 #define CLEAR_TIMEOUT 5 +#define DATA_NODE_FETCH_SIZE 1 + extern char *deparseSql(RemoteQueryState *scanstate); @@ -70,6 +72,8 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles); static int handle_response_clear(PGXCNodeHandle * conn); +static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); + static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void); #define MAX_STATEMENTS_PER_TRAN 10 @@ -200,8 +204,11 @@ CreateResponseCombiner(int node_count, CombineType combine_type) combiner->copy_out_count = 0; combiner->errorMessage = NULL; combiner->query_Done = false; - combiner->msg = NULL; - combiner->msglen = 0; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = NIL; + combiner->tapenodes = NULL; combiner->initAggregates = true; combiner->simple_aggregates = NULL; combiner->copy_file = NULL; @@ -671,10 +678,10 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) * Caller must stop reading if function returns false */ static void -HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len, int node) { /* We expect previous message is consumed */ - Assert(combiner->msg == NULL); + Assert(combiner->currentRow.msg == NULL); if (combiner->request_type != REQUEST_TYPE_QUERY) { @@ -695,9 +702,10 @@ HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) * We are copying message because it points into connection buffer, and * will be overwritten on next socket read */ - combiner->msg = (char *) palloc(len); - memcpy(combiner->msg, msg_body, len); - combiner->msglen = len; + combiner->currentRow.msg = (char *) palloc(len); + memcpy(combiner->currentRow.msg, msg_body, len); + combiner->currentRow.msglen = len; + combiner->currentRow.msgnode = node; } /* @@ -850,6 +858,10 @@ CloseCombiner(RemoteQueryState *combiner) FreeTupleDesc(combiner->tuple_desc); if (combiner->errorMessage) pfree(combiner->errorMessage); + if (combiner->cursor_connections) + pfree(combiner->cursor_connections); + if (combiner->tapenodes) + pfree(combiner->tapenodes); pfree(combiner); } } @@ -874,15 +886,24 @@ static bool ValidateAndResetCombiner(RemoteQueryState *combiner) { bool valid = validate_combiner(combiner); + ListCell *lc; if (combiner->connections) pfree(combiner->connections); if (combiner->tuple_desc) FreeTupleDesc(combiner->tuple_desc); - if (combiner->msg) - pfree(combiner->msg); + if (combiner->currentRow.msg) + pfree(combiner->currentRow.msg); + foreach(lc, combiner->rowBuffer) + { + RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc); + pfree(dataRow->msg); + } + list_free_deep(combiner->rowBuffer); if (combiner->errorMessage) pfree(combiner->errorMessage); + if (combiner->tapenodes) + pfree(combiner->tapenodes); combiner->command_complete_count = 0; combiner->connections = NULL; @@ -894,8 +915,11 @@ ValidateAndResetCombiner(RemoteQueryState *combiner) combiner->copy_out_count = 0; combiner->errorMessage = NULL; combiner->query_Done = false; - combiner->msg = NULL; - combiner->msglen = 0; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = NIL; + combiner->tapenodes = NULL; combiner->simple_aggregates = NULL; combiner->copy_file = NULL; @@ -903,22 +927,247 @@ ValidateAndResetCombiner(RemoteQueryState *combiner) } /* + * It is possible if multiple steps share the same data node connection, when + * executor is running multi-step query or client is running multiple queries + * using Extended Query Protocol. After returning next tuple ExecRemoteQuery + * function passes execution control to the executor and then it can be given + * to the same RemoteQuery or to different one. It is possible that before + * returning a tuple the function do not read all data node responses. In this + * case pending responses should be read in context of original RemoteQueryState + * till ReadyForQuery message and data rows should be stored (buffered) to be + * available when fetch from that RemoteQueryState is requested again. + * BufferConnection function does the job. + * If a RemoteQuery is going to use connection it should check connection state. + * DN_CONNECTION_STATE_QUERY indicates query has data to read and combiner + * points to the original RemoteQueryState. If combiner differs from "this" the + * connection should be buffered. + */ +void +BufferConnection(PGXCNodeHandle *conn) +{ + RemoteQueryState *combiner = conn->combiner; + /* + * When BufferConnection is invoked CurrentContext is related to other + * portal, which is trying to control the connection. + * TODO See if we can find better context to switch to + */ + MemoryContext oldcontext = MemoryContextSwitchTo(combiner->ss.ss_ScanTupleSlot->tts_mcxt); + + Assert(conn->state == DN_CONNECTION_STATE_QUERY && combiner); + + /* Verify the connection is in use by the combiner */ + combiner->current_conn = 0; + while (combiner->current_conn < combiner->conn_count) + { + if (combiner->connections[combiner->current_conn] == conn) + break; + combiner->current_conn++; + } + Assert(combiner->current_conn < combiner->conn_count); + + /* + * Buffer data rows until data node return number of rows specified by the + * fetch_size parameter of last Execute message (PortalSuspended message) + * or end of result set is reached (CommandComplete message) + */ + while (conn->state == DN_CONNECTION_STATE_QUERY) + { + int res; + + /* Move to buffer currentRow (received from the data node) */ + if (combiner->currentRow.msg) + { + RemoteDataRow dataRow = (RemoteDataRow) palloc(sizeof(RemoteDataRowData)); + *dataRow = combiner->currentRow; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = lappend(combiner->rowBuffer, dataRow); + } + + res = handle_response(conn, combiner); + /* + * If response message is a DataRow it will be handled on the next + * iteration. + * PortalSuspended will cause connection state change and break the loop + * The same is for CommandComplete, but we need additional handling - + * remove connection from the list of active connections. + * We may need to add handling error response + */ + if (res == RESPONSE_EOF) + { + /* incomplete message, read more */ + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + continue; + } + else if (res == RESPONSE_COMPLETE) + { + /* + * End of result set is reached, so either set the pointer to the + * connection to NULL (step with sort) or remove it from the list + * (step without sort) + */ + if (combiner->tuplesortstate) + { + combiner->connections[combiner->current_conn] = NULL; + if (combiner->tapenodes == NULL) + combiner->tapenodes = (int*) palloc0(NumDataNodes * sizeof(int)); + combiner->tapenodes[combiner->current_conn] = conn->nodenum; + } + else + /* Remove current connection, move last in-place, adjust current_conn */ + if (combiner->current_conn < --combiner->conn_count) + combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count]; + else + combiner->current_conn = 0; + } + /* + * Before output RESPONSE_COMPLETE or PORTAL_SUSPENDED handle_response() + * changes connection state to DN_CONNECTION_STATE_IDLE, breaking the + * loop. We do not need to do anything specific in case of + * PORTAL_SUSPENDED so skiping "else if" block for that case + */ + } + MemoryContextSwitchTo(oldcontext); + conn->combiner = NULL; +} + +/* * Get next data row from the combiner's buffer into provided slot - * Just clear slot and return false if buffer is empty, that means more data - * should be read + * Just clear slot and return false if buffer is empty, that means end of result + * set is reached */ bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) { - /* have messages in the buffer, consume them */ - if (combiner->msg) + bool have_tuple = false; + + while (combiner->conn_count > 0) { - ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true); - combiner->msg = NULL; - combiner->msglen = 0; + PGXCNodeHandle *conn; + int res; + + /* If we have message in the buffer, consume it */ + if (combiner->currentRow.msg) + { + ExecStoreDataRowTuple(combiner->currentRow.msg, + combiner->currentRow.msglen, + combiner->currentRow.msgnode, slot, true); + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + have_tuple = true; + } + /* + * If this is ordered fetch we can not know what is the node + * to handle next, so sorter will choose next itself and set it as + * currentRow to have it consumed on the next call to FetchTuple + */ + if (((RemoteQuery *)combiner->ss.ps.plan)->sort) + return have_tuple; + + /* + * Note: If we are fetching not sorted results we can not have both + * currentRow and buffered rows. When connection is buffered currentRow + * is moved to buffer, and then it is cleaned after buffering is + * completed. Afterwards rows will be taken from the buffer bypassing + * currentRow until buffer is empty, and only after that data are read + * from a connection. + */ + if (list_length(combiner->rowBuffer) > 0) + { + RemoteDataRow dataRow = (RemoteDataRow) linitial(combiner->rowBuffer); + combiner->rowBuffer = list_delete_first(combiner->rowBuffer); + ExecStoreDataRowTuple(dataRow->msg, dataRow->msglen, + dataRow->msgnode, slot, true); + pfree(dataRow); + return true; + } + + conn = combiner->connections[combiner->current_conn]; + + /* Going to use a connection, buffer it if needed */ + if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != NULL + && conn->combiner != combiner) + BufferConnection(conn); + + /* + * If current connection is idle it means portal on the data node is + * suspended. If we have a tuple do not hurry to request more rows, + * leave connection clean for other RemoteQueries. + * If we do not have, request more and try to get it + */ + if (conn->state == DN_CONNECTION_STATE_IDLE) + { + /* + * If we have tuple to return do not hurry to request more, keep + * connection clean + */ + if (have_tuple) + return have_tuple; + else + { + if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + if (pgxc_node_send_sync(conn) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + conn->combiner = combiner; + } + } + + /* read messages */ + res = handle_response(conn, combiner); + if (res == RESPONSE_EOF) + { + /* incomplete message, read more */ + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + continue; + } + else if (res == RESPONSE_SUSPENDED) + { + /* Make next connection current */ + if (++combiner->current_conn >= combiner->conn_count) + combiner->current_conn = 0; + } + else if (res == RESPONSE_COMPLETE) + { + /* Remove current connection, move last in-place, adjust current_conn */ + if (combiner->current_conn < --combiner->conn_count) + combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count]; + else + combiner->current_conn = 0; + } + + /* If we have a tuple we can leave now. */ + if (have_tuple) + return true; + } + /* Wrap up last message if exists */ + if (combiner->currentRow.msg) + { + ExecStoreDataRowTuple(combiner->currentRow.msg, + combiner->currentRow.msglen, + combiner->currentRow.msgnode, slot, true); + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; return true; } - /* inform caller that buffer is empty */ + /* otherwise report end of data to the caller */ ExecClearTuple(slot); return false; } @@ -986,10 +1235,11 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections, * Long term, we should look into cancelling executing statements * and closing the connections. * Return values: - * EOF - need to receive more data for the connection - * 0 - done with the connection - * 1 - got data row - * 2 - got copy response + * RESPONSE_EOF - need to receive more data for the connection + * RESPONSE_COMPLETE - done with the connection + * RESPONSE_TUPLEDESC - got tuple description + * RESPONSE_DATAROW - got data row + * RESPONSE_COPY - got copy response */ int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) @@ -997,38 +1247,40 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) char *msg; int msg_len; char msg_type; + bool suspended = false; for (;;) { - /* No data available, exit */ - if (conn->state == DN_CONNECTION_STATE_QUERY) - return RESPONSE_EOF; + Assert(conn->state != DN_CONNECTION_STATE_IDLE); + Assert(conn->combiner == combiner || conn->combiner == NULL); /* * If we are in the process of shutting down, we - * may be rolling back, and the buffer may contain other messages. - * We want to avoid a procarray exception - * as well as an error stack overflow. - */ + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ if (proc_exit_inprogress) - { conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + + /* don't read from from the connection if there is a fatal error */ + if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL) + return RESPONSE_COMPLETE; + + /* No data available, exit */ + if (!HAS_MESSAGE_BUFFERED(conn)) return RESPONSE_EOF; - } /* TODO handle other possible responses */ msg_type = get_message(conn, &msg_len, &msg); switch (msg_type) { case '\0': /* Not enough data in the buffer */ - conn->state = DN_CONNECTION_STATE_QUERY; return RESPONSE_EOF; case 'c': /* CopyToCommandComplete */ - conn->state = DN_CONNECTION_STATE_COMPLETED; HandleCopyOutComplete(combiner); break; case 'C': /* CommandComplete */ - conn->state = DN_CONNECTION_STATE_COMPLETED; HandleCommandComplete(combiner, msg, msg_len); break; case 'T': /* RowDescription */ @@ -1043,8 +1295,17 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) #ifdef DN_CONNECTION_DEBUG Assert(conn->have_row_desc); #endif - HandleDataRow(combiner, msg, msg_len); + HandleDataRow(combiner, msg, msg_len, conn->nodenum); return RESPONSE_DATAROW; + case 's': /* PortalSuspended */ + suspended = true; + break; + case '1': /* ParseComplete */ + case '2': /* BindComplete */ + case '3': /* CloseComplete */ + case 'n': /* NoData */ + /* simple notifications, continue reading */ + break; case 'G': /* CopyInResponse */ conn->state = DN_CONNECTION_STATE_COPY_IN; HandleCopyIn(combiner); @@ -1060,7 +1321,6 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) break; case 'E': /* ErrorResponse */ HandleError(combiner, msg, msg_len); - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; /* * Do not return with an error, we still need to consume Z, * ready-for-query @@ -1074,12 +1334,22 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) */ break; case 'Z': /* ReadyForQuery */ + { + /* + * Return result depends on previous connection state. + * If it was PORTAL_SUSPENDED coordinator want to send down + * another EXECUTE to fetch more rows, otherwise it is done + * with the connection + */ + int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE; conn->transaction_status = msg[0]; conn->state = DN_CONNECTION_STATE_IDLE; + conn->combiner = NULL; #ifdef DN_CONNECTION_DEBUG conn->have_row_desc = false; #endif - return RESPONSE_COMPLETE; + return result; + } case 'I': /* EmptyQuery */ default: /* sync lost? */ @@ -1092,6 +1362,7 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) return RESPONSE_EOF; } + /* * Like handle_response, but for consuming the messages, * in case we of an error to clean the data node connection. @@ -1138,8 +1409,8 @@ handle_response_clear(PGXCNodeHandle * conn) case 'N': /* NoticeResponse */ break; case 'E': /* ErrorResponse */ - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; /* + * conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; * Do not return with an error, we still need to consume Z, * ready-for-query */ @@ -1176,6 +1447,8 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections, /* Send BEGIN */ for (i = 0; i < conn_count; i++) { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid)) return EOF; @@ -1372,7 +1645,7 @@ finish: rollback_xid = BeginTranGTM(NULL); - /* + /* * Send xid and rollback prepared down to Datanodes and Coordinators * Even if we get an error on one, we try and send to the others */ @@ -1398,7 +1671,7 @@ finish: /* - * Commit prepared transaction on Datanodes and Coordinators (as necessary) + * Commit prepared transaction on Datanodes and Coordinators (as necessary) * where it has been prepared. * Connection to backends has been cut when transaction has been prepared, * So it is necessary to send the COMMIT PREPARE message to all the nodes. @@ -1831,18 +2104,6 @@ pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles) int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; - /* - * Rollback is a special case, being issued because of an error. - * We try to read and throw away any extra data on the connection before - * issuing our rollbacks so that we did not read the results of the - * previous command. - */ - for (i = 0; i < dn_conn_count; i++) - clear_socket_data(pgxc_handles->datanode_handles[i]); - - for (i = 0; i < co_conn_count; i++) - clear_socket_data(pgxc_handles->coord_handles[i]); - /* Send ROLLBACK to all handles */ if (pgxc_all_handles_send_query(pgxc_handles, "ROLLBACK", false)) result = EOF; @@ -1963,6 +2224,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ /* Send query to nodes */ for (i = 0; i < conn_count; i++) { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) { @@ -2369,7 +2632,6 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) RemoteQueryState *remotestate; Relation currentRelation; - remotestate = CreateResponseCombiner(0, node->combine_type); remotestate->ss.ps.plan = (Plan *) node; remotestate->ss.ps.state = estate; @@ -2409,6 +2671,10 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); } + + if (innerPlan(node)) + innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags); + if (outerPlan(node)) outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags); @@ -2425,7 +2691,9 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) if (src->tts_mcxt == dst->tts_mcxt) { /* now dst slot controls the backing message */ - ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow); + ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, + src->tts_dataNode, dst, + src->tts_shouldFreeRow); src->tts_shouldFreeRow = false; } else @@ -2433,10 +2701,11 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) /* have to make a copy */ MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt); int len = src->tts_dataLen; + int node = src->tts_dataNode; char *msg = (char *) palloc(len); memcpy(msg, src->tts_dataRow, len); - ExecStoreDataRowTuple(msg, len, dst, true); + ExecStoreDataRowTuple(msg, len, node, dst, true); MemoryContextSwitchTo(oldcontext); } } @@ -2610,20 +2879,21 @@ ExecRemoteQuery(RemoteQueryState *node) int total_conn_count; bool need_tran; PGXCNodeAllHandles *pgxc_connections; + TupleTableSlot *innerSlot = NULL; /* - * If coordinator plan is specified execute it first. - * If the plan is returning we are returning these tuples immediately. - * If it is not returning or returned them all by current invocation - * we will go ahead and execute remote query. Then we will never execute - * the outer plan again because node->query_Done flag will be set and - * execution won't get to that place. + * Inner plan for RemoteQuery supplies parameters. + * We execute inner plan to get a tuple and use values of the tuple as + * parameter values when executing this remote query. + * If returned slot contains NULL tuple break execution. + * TODO there is a problem how to handle the case if both inner and + * outer plans exist. We can decide later, since it is never used now. */ - if (outerPlanState(node)) + if (innerPlanState(node)) { - TupleTableSlot *slot = ExecProcNode(outerPlanState(node)); - if (!TupIsNull(slot)) - return slot; + innerSlot = ExecProcNode(innerPlanState(node)); +// if (TupIsNull(innerSlot)) +// return innerSlot; } /* @@ -2712,6 +2982,9 @@ ExecRemoteQuery(RemoteQueryState *node) /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { + if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(primaryconnection); + /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) { @@ -2751,7 +3024,6 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - Assert(node->combine_type == COMBINE_TYPE_SAME); while (node->command_complete_count < 1) @@ -2760,11 +3032,7 @@ ExecRemoteQuery(RemoteQueryState *node) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to read response from data nodes"))); - while (handle_response(primaryconnection, node) == RESPONSE_EOF) - if (pgxc_node_receive(1, &primaryconnection, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); + ... [truncated message content] |
From: mason_s <ma...@us...> - 2010-11-14 19:43:02
|
Project "Postgres-XC". The branch, master has been updated via 894fc6d421a6ec402b3b02ba6f3988c1068cc113 (commit) from cb6b36c3f6aab15eb924e7e476ace178935072fb (commit) - Log ----------------------------------------------------------------- commit 894fc6d421a6ec402b3b02ba6f3988c1068cc113 Author: Mason Sharp <ma...@us...> Date: Thu Nov 4 16:47:57 2010 -0400 Add a message that the RETURNING clause is not yet supported. diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index e678a14..5ea6d8a 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -2179,6 +2179,10 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) case CMD_INSERT: case CMD_UPDATE: case CMD_DELETE: + if (query->returningList) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("RETURNING clause not yet supported")))); /* Set result relations */ if (query->commandType != CMD_SELECT) result->resultRelations = list_make1_int(query->resultRelation); ----------------------------------------------------------------------- Summary of changes: src/backend/pgxc/plan/planner.c | 4 ++++ 1 files changed, 4 insertions(+), 0 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-04 19:40:32
|
Project "Postgres-XC". The branch, master has been updated via cb6b36c3f6aab15eb924e7e476ace178935072fb (commit) from 44ca05af2742271abdc5c14f5ca313d5ea307875 (commit) - Log ----------------------------------------------------------------- commit cb6b36c3f6aab15eb924e7e476ace178935072fb Author: Mason Sharp <ma...@us...> Date: Thu Nov 4 15:39:28 2010 -0400 Fix bug with primary key in CREATE TABLE statement. By Benny Mei Le diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index cfa470f..7a31dcb 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -1293,7 +1293,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) } /* Existing table, check if it is safe */ - if (!cxt->distributeby && !isLocalSafe) + if (cxt->isalter && !cxt->distributeby && !isLocalSafe) isLocalSafe = CheckLocalIndexColumn ( cxt->rel->rd_locator_info->locatorType, cxt->rel->rd_locator_info->partAttrName, key); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index f01d4af..4187887 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -963,7 +963,7 @@ ProcessUtility(Node *parsetree, false, /* quiet */ stmt->concurrent); /* concurrent */ #ifdef PGXC - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && !stmt->isconstraint) ExecUtilityStmtOnNodes(queryString, NULL, stmt->concurrent, EXEC_ON_ALL_NODES); #endif ----------------------------------------------------------------------- Summary of changes: src/backend/parser/parse_utilcmd.c | 2 +- src/backend/tcop/utility.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) hooks/post-receive -- Postgres-XC |
From: mason_s <ma...@us...> - 2010-11-03 17:38:58
|
Project "Postgres-XC". The branch, master has been updated via 44ca05af2742271abdc5c14f5ca313d5ea307875 (commit) from d62381f6e17e57fd4b3d7b51afce3032fe35507a (commit) - Log ----------------------------------------------------------------- commit 44ca05af2742271abdc5c14f5ca313d5ea307875 Author: Mason Sharp <ma...@us...> Date: Wed Nov 3 13:06:24 2010 -0400 Improve performance of "multi-step" queries (an on-going process). We already had code that detected when we can reduce the entire query to a single step, including joins. We also had general code that allowed for handling arbitrary queries that could not be reduced into one step, with the catch that joins will be done at the coordinator level. This commit allows for some joins to be pushed down when possible. For example, in a three-way join, if two tables are co-located, they may be joined in a single step down at the data nodes, and those results will be joined with the third table (on the coordinator, however). In addition, the previous code was based on a SQL/MED patch, which meant when accessing remote tables, it selected all of the columns. With this commit, we project only the needed columns. Written primarily by Pavan Deolasee (join push-dwon and column selection) and Mason Sharp (safe push-down detection). Squashed commit of the following: commit eb50a76cb929fbe4a31d093b43e1589382c892a0 Author: Pavan Deolasee <pav...@gm...> Date: Wed Oct 27 16:09:28 2010 +0530 Set remote relation stats (pages, rows etc) to a lower value so that NestLoop joins are preferred over other join types. This is necessary until we can handle other join types for remote join reduction commit 69bb66c62f71b9be918475ea65931adb3bbfba20 Author: Pavan Deolasee <pav...@gm...> Date: Tue Oct 19 12:20:44 2010 +0530 Set aliases properly for join reduction commit 2a313446f3e714ba36c9ccc5c5167309b7c89a95 Author: Mason Sharp <ma...@us...> Date: Mon Oct 18 16:15:16 2010 -0400 Added IsJoinReducible to determine if the two plan nodes can be joined. See comments for this function for more details. Basically, we use examine_conditions_walker to check if it is safe to join the two. Partitioned-partitioned joins are safe to collapse, and partitioned-replicated are safe iff one of the nodes does not already contain such a collapsed node. commit f275fa535e9673af0964ecc7ca93ab1b49df2317 Author: Pavan Deolasee <pav...@gm...> Date: Mon Oct 18 11:53:54 2010 +0530 Fix a bug where rte/alias were not getting set up properly commit 6af07721357944af801a384ed1eb54e363839403 Author: Pavan Deolasee <pav...@gm...> Date: Mon Oct 18 11:35:43 2010 +0530 Update some missing copy/out/read functions commit 7bcb490dc50eeb1ad1569d90cc5eb759b766aa91 Author: Pavan Deolasee <pav...@gm...> Date: Mon Oct 18 11:33:54 2010 +0530 Initial implementation of remote join reduction. We still don't have the logic to determine whether its safe to reduce two join trees or not commit aefc06e7bd90c657fb093a923f7b66177687561d Author: Pavan Deolasee <pav...@gm...> Date: Mon Oct 18 11:29:21 2010 +0530 First step to SQL-med integration. Moving query generation to planning stage diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index aa92917..5099162 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -686,7 +686,11 @@ explain_outNode(StringInfo str, Assert(rte->rtekind == RTE_RELATION); /* We only show the rel name, not schema name */ +#ifdef PGXC + relname = rte->relname; +#else relname = get_rel_name(rte->relid); +#endif appendStringInfo(str, " on %s", quote_identifier(relname)); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index c58e2a0..d4ae006 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -840,6 +840,17 @@ _copyRemoteQuery(RemoteQuery *from) COPY_SCALAR_FIELD(read_only); COPY_SCALAR_FIELD(force_autocommit); + COPY_STRING_FIELD(relname); + COPY_SCALAR_FIELD(remotejoin); + COPY_SCALAR_FIELD(reduce_level); + COPY_NODE_FIELD(base_tlist); + COPY_STRING_FIELD(outer_alias); + COPY_STRING_FIELD(inner_alias); + COPY_SCALAR_FIELD(outer_reduce_level); + COPY_SCALAR_FIELD(inner_reduce_level); + COPY_BITMAPSET_FIELD(outer_relids); + COPY_BITMAPSET_FIELD(inner_relids); + return newnode; } @@ -1836,6 +1847,13 @@ _copyRangeTblEntry(RangeTblEntry *from) RangeTblEntry *newnode = makeNode(RangeTblEntry); COPY_SCALAR_FIELD(rtekind); + +#ifdef PGXC + COPY_STRING_FIELD(relname); + if (from->reltupdesc) + newnode->reltupdesc = CreateTupleDescCopy(from->reltupdesc); +#endif + COPY_SCALAR_FIELD(relid); COPY_NODE_FIELD(subquery); COPY_SCALAR_FIELD(jointype); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 1c8691a..85cfaca 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1502,6 +1502,9 @@ _outPlannerInfo(StringInfo str, PlannerInfo *node) WRITE_BOOL_FIELD(hasHavingQual); WRITE_BOOL_FIELD(hasPseudoConstantQuals); WRITE_BOOL_FIELD(hasRecursion); +#ifdef PGXC + WRITE_INT_FIELD(rs_alias_index); +#endif WRITE_INT_FIELD(wt_param_id); } @@ -2015,16 +2018,39 @@ _outSetOperationStmt(StringInfo str, SetOperationStmt *node) static void _outRangeTblEntry(StringInfo str, RangeTblEntry *node) { +#ifdef PGXC + int i; +#endif + WRITE_NODE_TYPE("RTE"); /* put alias + eref first to make dump more legible */ WRITE_NODE_FIELD(alias); WRITE_NODE_FIELD(eref); WRITE_ENUM_FIELD(rtekind, RTEKind); +#ifdef PGXC + WRITE_STRING_FIELD(relname); +#endif switch (node->rtekind) { case RTE_RELATION: +#ifdef PGXC + /* write tuple descriptor */ + appendStringInfo(str, " :tupdesc_natts %d (", node->reltupdesc->natts); + + for (i = 0 ; i < node->reltupdesc->natts ; i++) + { + appendStringInfo(str, ":colname "); + _outToken(str, NameStr(node->reltupdesc->attrs[i]->attname)); + appendStringInfo(str, " :coltypid %u ", + node->reltupdesc->attrs[i]->atttypid); + appendStringInfo(str, ":coltypmod %d ", + node->reltupdesc->attrs[i]->atttypmod); + } + + appendStringInfo(str, ") "); + #endif case RTE_SPECIAL: WRITE_OID_FIELD(relid); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 1f562d7..c928bf8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -31,7 +31,9 @@ #include "nodes/parsenodes.h" #include "nodes/readfuncs.h" - +#ifdef PGXC +#include "access/htup.h" +#endif /* * Macros to simplify reading of different kinds of fields. Use these @@ -1104,16 +1106,71 @@ _readFromExpr(void) static RangeTblEntry * _readRangeTblEntry(void) { +#ifdef PGXC + int natts, i; + char *colname; + Oid typid, typmod; +#endif + READ_LOCALS(RangeTblEntry); /* put alias + eref first to make dump more legible */ READ_NODE_FIELD(alias); READ_NODE_FIELD(eref); READ_ENUM_FIELD(rtekind, RTEKind); +#ifdef PGXC + READ_STRING_FIELD(relname); +#endif switch (local_node->rtekind) { case RTE_RELATION: +#ifdef PGXC + /* read tuple descriptor */ + token = pg_strtok(&length); /* skip :tupdesc_natts */ + token = pg_strtok(&length); /* get field value */ + + natts = atoi(token); + + if (natts > 0 && natts <= MaxTupleAttributeNumber) + local_node->reltupdesc = CreateTemplateTupleDesc(natts, false); + else + elog(ERROR, "invalid node field to read"); + + token = pg_strtok(&length); /* skip '(' */ + + if (length == 1 && pg_strncasecmp(token, "(", length) == 0) + { + for (i = 0 ; i < natts ; i++) + { + token = pg_strtok(&length); /* skip :colname */ + token = pg_strtok(&length); /* get colname */ + colname = nullable_string(token, length); + + if (colname == NULL) + elog(ERROR, "invalid node field to read"); + + token = pg_strtok(&length); /* skip :coltypid */ + token = pg_strtok(&length); /* get typid */ + typid = atooid(token); + + token = pg_strtok(&length); /* skip :coltypmod */ + token = pg_strtok(&length); /* get typmod */ + typmod = atoi(token); + + TupleDescInitEntry(local_node->reltupdesc, + (i + 1), colname, typid, typmod, 0); + } + } + else + elog(ERROR, "invalid node field to read"); + + token = pg_strtok(&length); /* skip '(' */ + + if (!(length == 1 && pg_strncasecmp(token, ")", length) == 0)) + elog(ERROR, "invalid node field to read"); +#endif + case RTE_SPECIAL: READ_OID_FIELD(relid); break; diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index fcbb8ca..8bb9057 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -109,6 +109,9 @@ bool enable_hashagg = true; bool enable_nestloop = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +#ifdef PGXC +bool enable_remotejoin = true; +#endif typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 818ea1b..a753e95 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -34,6 +34,11 @@ #include "parser/parsetree.h" #ifdef PGXC #include "pgxc/planner.h" +#include "access/sysattr.h" +#include "utils/builtins.h" +#include "utils/syscache.h" +#include "catalog/pg_proc.h" +#include "executor/executor.h" #endif #include "utils/lsyscache.h" @@ -72,6 +77,14 @@ static WorkTableScan *create_worktablescan_plan(PlannerInfo *root, Path *best_pa #ifdef PGXC static RemoteQuery *create_remotequery_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Plan *create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, + Plan *parent, Plan *outer_plan, Plan *inner_plan); +static void create_remote_target_list(PlannerInfo *root, + StringInfo targets, List *out_tlist, List *in_tlist, + char *out_alias, int out_index, + char *in_alias, int in_index); +static Alias *generate_remote_rte_alias(RangeTblEntry *rte, int varno, + char *aliasname, int reduce_level); #endif static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path, Plan *outer_plan, Plan *inner_plan); @@ -141,6 +154,14 @@ static Sort *make_sort(PlannerInfo *root, Plan *lefttree, int numCols, double limit_tuples); static Material *make_material(Plan *lefttree); +#ifdef PGXC +static void findReferencedVars(List *parent_vars, Plan *plan, List **out_tlist, Relids *out_relids); +extern bool is_foreign_qual(Node *clause); +static void create_remote_clause_expr(PlannerInfo *root, Plan *parent, StringInfo clauses, + List *qual, RemoteQuery *scan); +static void create_remote_expr(PlannerInfo *root, Plan *parent, StringInfo expr, + Node *node, RemoteQuery *scan); +#endif /* * create_plan @@ -221,9 +242,6 @@ create_scan_plan(PlannerInfo *root, Path *best_path) List *tlist; List *scan_clauses; Plan *plan; -#ifdef PGXC - Plan *matplan; -#endif /* * For table scans, rather than using the relation targetlist (which is @@ -445,9 +463,6 @@ disuse_physical_tlist(Plan *plan, Path *path) case T_ValuesScan: case T_CteScan: case T_WorkTableScan: -#ifdef PGXC - case T_RemoteQuery: -#endif plan->targetlist = build_relation_tlist(path->parent); break; default: @@ -557,9 +572,627 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) get_actual_clauses(get_loc_restrictinfo(best_path)))); #endif +#ifdef PGXC + /* check if this join can be reduced to an equiv. remote scan node */ + plan = create_remotejoin_plan(root, best_path, plan, outer_plan, inner_plan); +#endif + return plan; } +#ifdef PGXC +/* + * create_remotejoin_plan + * check if the children plans involve remote entities from the same remote + * node. If so, this join can be reduced to an equivalent remote scan plan + * node + * + * RULES: + * + * * provide unique aliases to both inner and outer nodes to represent their + * corresponding subqueries + * + * * identify target entries from both inner and outer that appear in the join + * targetlist, only those need to be selected from these aliased subqueries + * + * * a join node has a joinqual list which represents the join condition. E.g. + * SELECT * from emp e LEFT JOIN emp2 d ON e.x = d.x + * Here the joinqual contains "e.x = d.x". If the joinqual itself has a local + * dependency, e.g "e.x = localfunc(d.x)", then this join cannot be reduced + * + * * other than the joinqual, the join node can contain additional quals. Even + * if they have any local dependencies, we can reduce the join and just + * append these quals into the reduced remote scan node. We DO do a pass to + * identify remote quals and ship those in the squery though + * + * * these quals (both joinqual and normal quals with no local dependencies) + * need to be converted into expressions referring to the aliases assigned to + * the nodes. These expressions will eventually become part of the squery of + * the reduced remote scan node + * + * * the children remote scan nodes themselves can have local dependencies in + * their quals (the remote ones are already part of the squery). We can still + * reduce the join and just append these quals into the reduced remote scan + * node + * + * * if we reached successfully so far, generate a new remote scan node with + * this new squery generated using the aliased references + * + * One important point to note here about targetlists is that this function + * does not set any DUMMY var references in the Var nodes appearing in it. It + * follows the standard mechanism as is followed by other nodes. Similar to the + * existing nodes, the references which point to DUMMY vars is done in + * set_remote_references() function in set_plan_references phase at the fag + * end. Avoiding such DUMMY references manipulations till the end also makes + * this code a lot much readable and easier. + */ +static Plan * +create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Plan *outer_plan, Plan *inner_plan) +{ + NestLoop *nest_parent; + + if (!enable_remotejoin) + return parent; + + /* meh, what are these for :( */ + if (root->hasPseudoConstantQuals) + return parent; + + /* Works only for SELECT commands right now */ + if (root->parse->commandType != CMD_SELECT) + return parent; + + /* do not optimize CURSOR based select statements */ + if (root->parse->rowMarks != NIL) + return parent; + + /* + * optimize only simple NestLoop joins for now. Other joins like Merge and + * Hash can be reduced too. But they involve additional intermediate nodes + * and we need to understand them a bit more as yet + */ + if (!IsA(parent, NestLoop)) + return parent; + else + nest_parent = (NestLoop *)parent; + + /* check if both the nodes qualify for reduction */ + if (IsA(outer_plan, Material) && + IsA(((Material *) outer_plan)->plan.lefttree, RemoteQuery) && + IsA(inner_plan, Material) && + IsA(((Material *) inner_plan)->plan.lefttree, RemoteQuery)) + { + int i; + List *rtable_list = NIL; + bool partitioned_replicated_join = false; + + Material *outer_mat = (Material *)outer_plan; + Material *inner_mat = (Material *)inner_plan; + + RemoteQuery *outer = (RemoteQuery *)outer_mat->plan.lefttree; + RemoteQuery *inner = (RemoteQuery *)inner_mat->plan.lefttree; + + /* + * Check if both these plans are from the same remote node. If yes, + * replace this JOIN along with it's two children with one equivalent + * remote node + */ + + /* + * Build up rtable for XC Walker + * (was not sure I could trust this, but it seems to work in various cases) + */ + for (i = 0; i < root->simple_rel_array_size; i++) + { + RangeTblEntry *rte = root->simple_rte_array[i]; + + /* Check for NULL first, sometimes it is NULL at position 0 */ + if (rte) + rtable_list = lappend(rtable_list, root->simple_rte_array[i]); + } + + /* XXX Check if the join optimization is possible */ + if (IsJoinReducible(inner, outer, rtable_list, best_path, &partitioned_replicated_join)) + { + RemoteQuery *result; + Plan *result_plan; + StringInfoData targets, clauses, scan_clauses, fromlist; + StringInfoData squery; + List *parent_vars, *out_tlist = NIL, *in_tlist = NIL, *base_tlist; + ListCell *l; + char in_alias[15], out_alias[15]; + Relids out_relids = NULL, in_relids = NULL; + bool use_where = false; + Index dummy_rtindex; + RangeTblEntry *dummy_rte; + List *local_scan_clauses = NIL, *remote_scan_clauses = NIL; + char *pname; + + + /* KISS! As long as distinct aliases are provided for all the objects in + * involved in query, remote server should not crib! */ + sprintf(in_alias, "out_%d", root->rs_alias_index); + sprintf(out_alias, "in_%d", root->rs_alias_index); + + /* + * Walk the left, right trees and identify which vars appear in the + * parent targetlist, only those need to be selected. Note that + * depending on whether the parent targetlist is top-level or + * intermediate, the children vars may or may not be referenced + * multiple times in it. + */ + parent_vars = pull_var_clause((Node *)parent->targetlist, PVC_REJECT_PLACEHOLDERS); + + findReferencedVars(parent_vars, outer_plan, &out_tlist, &out_relids); + findReferencedVars(parent_vars, inner_plan, &in_tlist, &in_relids); + + /* + * If the JOIN ON clause has a local dependency then we cannot ship + * the join to the remote side at all, bail out immediately. + */ + if (!is_foreign_qual((Node *)nest_parent->join.joinqual)) + { + elog(DEBUG1, "cannot reduce: local dependencies in the joinqual"); + return parent; + } + + /* + * If the normal plan qual has local dependencies, the join can + * still be shipped. Try harder to ship remote clauses out of the + * entire list. These local quals will become part of the quals + * list of the reduced remote scan node down later. + */ + if (!is_foreign_qual((Node *)nest_parent->join.plan.qual)) + { + elog(DEBUG1, "local dependencies in the join plan qual"); + + /* + * trawl through each entry and come up with remote and local + * clauses... sigh + */ + foreach(l, nest_parent->join.plan.qual) + { + Node *clause = lfirst(l); + + /* + * if the currentof in the above call to + * clause_is_local_bound is set, somewhere in the list there + * is currentof clause, so keep that information intact and + * pass a dummy argument here. + */ + if (!is_foreign_qual((Node *)clause)) + local_scan_clauses = lappend(local_scan_clauses, clause); + else + remote_scan_clauses = lappend(remote_scan_clauses, clause); + } + } + else + { + /* + * there is no local bound clause, all the clauses are remote + * scan clauses + */ + remote_scan_clauses = nest_parent->join.plan.qual; + } + + /* generate the tlist for the new RemoteScan node using out_tlist, in_tlist */ + initStringInfo(&targets); + create_remote_target_list(root, &targets, out_tlist, in_tlist, + out_alias, outer->reduce_level, in_alias, inner->reduce_level); + + /* + * generate the fromlist now. The code has to appropriately mention + * the JOIN type in the string being generated. + */ + initStringInfo(&fromlist); + appendStringInfo(&fromlist, " (%s) %s ", + outer->sql_statement, quote_identifier(out_alias)); + + use_where = false; + switch (nest_parent->join.jointype) + { + case JOIN_INNER: + pname = ", "; + use_where = true; + break; + case JOIN_LEFT: + pname = "LEFT JOIN"; + break; + case JOIN_FULL: + pname = "FULL JOIN"; + break; + case JOIN_RIGHT: + pname = "RIGHT JOIN"; + break; + case JOIN_SEMI: + case JOIN_ANTI: + default: + return parent; + } + + /* + * splendid! we can actually replace this join hierarchy with a + * single RemoteScan node now. Start off by constructing the + * appropriate new tlist and tupdescriptor + */ + result = makeNode(RemoteQuery); + + /* + * Save various information about the inner and the outer plans. We + * may need this information later if more entries are added to it + * as part of the remote expression optimization + */ + result->remotejoin = true; + result->inner_alias = pstrdup(in_alias); + result->outer_alias = pstrdup(out_alias); + result->inner_reduce_level = inner->reduce_level; + result->outer_reduce_level = outer->reduce_level; + result->inner_relids = in_relids; + result->outer_relids = out_relids; + + appendStringInfo(&fromlist, " %s (%s) %s", + pname, inner->sql_statement, quote_identifier(in_alias)); + + /* generate join.joinqual remote clause string representation */ + initStringInfo(&clauses); + if (nest_parent->join.joinqual != NIL) + { + create_remote_clause_expr(root, parent, &clauses, + nest_parent->join.joinqual, result); + } + + /* generate join.plan.qual remote clause string representation */ + initStringInfo(&scan_clauses); + if (remote_scan_clauses != NIL) + { + create_remote_clause_expr(root, parent, &scan_clauses, + remote_scan_clauses, result); + } + + /* + * set the base tlist of the involved base relations, useful in + * set_plan_refs later. Additionally the tupledescs should be + * generated using this base_tlist and not the parent targetlist. + * This is because we want to take into account any additional + * column references from the scan clauses too + */ + base_tlist = add_to_flat_tlist(NIL, list_concat(out_tlist, in_tlist)); + + /* cook up the reltupdesc using this base_tlist */ + dummy_rte = makeNode(RangeTblEntry); + dummy_rte->reltupdesc = ExecTypeFromTL(base_tlist, false); + dummy_rte->rtekind = RTE_RELATION; + + /* use a dummy relname... */ + dummy_rte->relname = "__FOREIGN_QUERY__"; + dummy_rte->eref = makeAlias("__FOREIGN_QUERY__", NIL); + /* not sure if we need to set the below explicitly.. */ + dummy_rte->inh = false; + dummy_rte->inFromCl = false; + dummy_rte->requiredPerms = 0; + dummy_rte->checkAsUser = 0; + dummy_rte->selectedCols = NULL; + dummy_rte->modifiedCols = NULL; + + /* + * Append the dummy range table entry to the range table. + * Note that this modifies the master copy the caller passed us, otherwise + * e.g EXPLAIN VERBOSE will fail to find the rte the Vars built below refer + * to. + */ + root->parse->rtable = lappend(root->parse->rtable, dummy_rte); + dummy_rtindex = list_length(root->parse->rtable); + + result_plan = &result->scan.plan; + + /* the join targetlist becomes this node's tlist */ + result_plan->targetlist = parent->targetlist; + result_plan->lefttree = NULL; + result_plan->righttree = NULL; + result->scan.scanrelid = dummy_rtindex; + + /* generate the squery for this node */ + + /* NOTE: it's assumed that the remote_paramNums array is + * filled in the same order as we create the query here. + * + * TODO: we need some way to ensure that the remote_paramNums + * is filled in the same order as the order in which the clauses + * are added in the query below. + */ + initStringInfo(&squery); + appendStringInfo(&squery, "SELECT %s FROM %s", targets.data, fromlist.data); + + if (clauses.data[0] != '\0') + appendStringInfo(&squery, " %s %s", use_where? " WHERE " : " ON ", clauses.data); + + if (scan_clauses.data[0] != '\0') + appendStringInfo(&squery, " %s %s", use_where? " AND " : " WHERE ", scan_clauses.data); + + result->sql_statement = squery.data; + /* don't forget to increment the index for the next time around! */ + result->reduce_level = root->rs_alias_index++; + + + /* set_plan_refs needs this later */ + result->base_tlist = base_tlist; + result->relname = "__FOREIGN_QUERY__"; + + result->partitioned_replicated = partitioned_replicated_join; + + /* + * if there were any local scan clauses stick them up here. They + * can come from the join node or from remote scan node themselves. + * Because of the processing being done earlier in + * create_remotescan_plan, all of the clauses if present will be + * local ones and hence can be stuck without checking for + * remoteness again here into result_plan->qual + */ + result_plan->qual = list_concat(result_plan->qual, outer_plan->qual); + result_plan->qual = list_concat(result_plan->qual, inner_plan->qual); + result_plan->qual = list_concat(result_plan->qual, local_scan_clauses); + + /* we actually need not worry about costs since this is the final plan */ + result_plan->startup_cost = outer_plan->startup_cost; + result_plan->total_cost = outer_plan->total_cost; + result_plan->plan_rows = outer_plan->plan_rows; + result_plan->plan_width = outer_plan->plan_width; + + return (Plan *)make_material(result_plan); + } + } + + return parent; +} + +/* + * Generate aliases for columns of remote tables using the + * colname_varno_varattno_reduce_level nomenclature + */ +static Alias * +generate_remote_rte_alias(RangeTblEntry *rte, int varno, char *aliasname, int reduce_level) +{ + TupleDesc tupdesc; + int maxattrs; + int varattno; + List *colnames = NIL; + StringInfo attr = makeStringInfo(); + + if (rte->rtekind != RTE_RELATION) + elog(ERROR, "called in improper context"); + + if (reduce_level == 0) + return makeAlias(aliasname, NIL); + + tupdesc = rte->reltupdesc; + maxattrs = tupdesc->natts; + + for (varattno = 0; varattno < maxattrs; varattno++) + { + Form_pg_attribute att = tupdesc->attrs[varattno]; + Value *attrname; + + resetStringInfo(attr); + appendStringInfo(attr, "%s_%d_%d_%d", + NameStr(att->attname), varno, varattno + 1, reduce_level); + + attrname = makeString(pstrdup(attr->data)); + + colnames = lappend(colnames, attrname); + } + + return makeAlias(aliasname, colnames); +} + +/* create_remote_target_list + * generate a targetlist using out_alias and in_alias appropriately. It is + * possible that in case of multiple-hierarchy reduction, both sides can have + * columns with the same name. E.g. consider the following: + * + * select * from emp e join emp f on e.x = f.x, emp g; + * + * So if we just use new_alias.columnname it can + * very easily clash with other columnname from the same side of an already + * reduced join. To avoid this, we generate unique column aliases using the + * following convention: + * colname_varno_varattno_reduce_level_index + * + * Each RemoteScan node carries it's reduce_level index to indicate the + * convention that should be adopted while referring to it's columns. If the + * level is 0, then normal column names can be used because they will never + * clash at the join level + */ +static void +create_remote_target_list(PlannerInfo *root, StringInfo targets, List *out_tlist, List *in_tlist, + char *out_alias, int out_index, char *in_alias, int in_index) +{ + int i = 0; + ListCell *l; + StringInfo attrname = makeStringInfo(); + bool add_null_target = true; + + foreach(l, out_tlist) + { + Var *var = (Var *) lfirst(l); + RangeTblEntry *rte = planner_rt_fetch(var->varno, root); + char *attname; + + + if (i++ > 0) + appendStringInfo(targets, ", "); + + attname = get_rte_attribute_name(rte, var->varattno); + + if (out_index) + { + resetStringInfo(attrname); + /* varattno can be negative for sys attributes, hence the abs! */ + appendStringInfo(attrname, "%s_%d_%d_%d", + attname, var->varno, abs(var->varattno), out_index); + appendStringInfo(targets, "%s.%s", + quote_identifier(out_alias), quote_identifier(attrname->data)); + } + else + appendStringInfo(targets, "%s.%s", + quote_identifier(out_alias), quote_identifier(attname)); + + /* generate the new alias now using root->rs_alias_index */ + resetStringInfo(attrname); + appendStringInfo(attrname, "%s_%d_%d_%d", + attname, var->varno, abs(var->varattno), root->rs_alias_index); + appendStringInfo(targets, " AS %s", quote_identifier(attrname->data)); + add_null_target = false; + } + + foreach(l, in_tlist) + { + Var *var = (Var *) lfirst(l); + RangeTblEntry *rte = planner_rt_fetch(var->varno, root); + char *attname; + + if (i++ > 0) + appendStringInfo(targets, ", "); + + attname = get_rte_attribute_name(rte, var->varattno); + + if (in_index) + { + resetStringInfo(attrname); + /* varattno can be negative for sys attributes, hence the abs! */ + appendStringInfo(attrname, "%s_%d_%d_%d", + attname, var->varno, abs(var->varattno), in_index); + appendStringInfo(targets, "%s.%s", + quote_identifier(in_alias), quote_identifier(attrname->data)); + } + else + appendStringInfo(targets, "%s.%s", + quote_identifier(in_alias), quote_identifier(attname)); + + /* generate the new alias now using root->rs_alias_index */ + resetStringInfo(attrname); + appendStringInfo(attrname, "%s_%d_%d_%d", + attname, var->varno, abs(var->varattno), root->rs_alias_index); + appendStringInfo(targets, " AS %s", quote_identifier(attrname->data)); + add_null_target = false; + } + + /* + * It's possible that in some cases, the targetlist might not refer to any + * vars from the joined relations, eg. + * select count(*) from t1, t2; select const from t1, t2; etc + * For such cases just add a NULL selection into this targetlist + */ + if (add_null_target) + appendStringInfo(targets, " NULL "); +} + +/* + * create_remote_clause_expr + * generate a string to represent the clause list expression using out_alias + * and in_alias references. This function does a cute hack by temporarily + * modifying the rte->eref entries of the involved relations to point to + * out_alias and in_alias appropriately. The deparse_expression call then + * generates a string using these erefs which is exactly what is desired here. + * + * Additionally it creates aliases for the column references based on the + * reduce_level values too. This handles the case when both sides have same + * named columns.. + * + * Obviously this function restores the eref, alias values to their former selves + * appropriately too, after use + */ +static void +create_remote_clause_expr(PlannerInfo *root, Plan *parent, StringInfo clauses, + List *qual, RemoteQuery *scan) +{ + Node *node = (Node *) make_ands_explicit(qual); + + return create_remote_expr(root, parent, clauses, node, scan); +} + +static void +create_remote_expr(PlannerInfo *root, Plan *parent, StringInfo expr, + Node *node, RemoteQuery *scan) +{ + List *context; + List *leref = NIL; + ListCell *cell; + char *exprstr; + int rtindex; + Relids tmprelids, relids; + + relids = pull_varnos((Node *)node); + + tmprelids = bms_copy(relids); + + while ((rtindex = bms_first_member(tmprelids)) >= 0) + { + RangeTblEntry *rte = planner_rt_fetch(rtindex, root); + + /* + * This rtindex should be a member of either out_relids or + * in_relids and never both + */ + if (bms_is_member(rtindex, scan->outer_relids) && + bms_is_member(rtindex, scan->inner_relids)) + elog(ERROR, "improper relid references in the join clause list"); + + /* + * save the current rte->eref and rte->alias values and stick in a new + * one in the rte with the proper inner or outer alias + */ + leref = lappend(leref, rte->eref); + leref = lappend(leref, rte->alias); + + if (bms_is_member(rtindex, scan->outer_relids)) + { + rte->eref = makeAlias(scan->outer_alias, NIL); + + /* attach proper column aliases.. */ + rte->alias = generate_remote_rte_alias(rte, rtindex, + scan->outer_alias, scan->outer_reduce_level); + } + if (bms_is_member(rtindex, scan->inner_relids)) + { + rte->eref = makeAlias(scan->inner_alias, NIL); + + /* attach proper column aliases.. */ + rte->alias = generate_remote_rte_alias(rte, rtindex, + scan->inner_alias, scan->inner_reduce_level); + } + } + bms_free(tmprelids); + + /* Set up deparsing context */ + context = deparse_context_for_plan((Node *) parent, + NULL, + root->parse->rtable, + NULL); + + exprstr = deparse_expression(node, context, true, false); + + /* revert back the saved eref entries in the same order now! */ + cell = list_head(leref); + tmprelids = bms_copy(relids); + while ((rtindex = bms_first_member(tmprelids)) >= 0) + { + RangeTblEntry *rte = planner_rt_fetch(rtindex, root); + + Assert(cell != NULL); + + rte->eref = lfirst(cell); + cell = lnext(cell); + + rte->alias = lfirst(cell); + cell = lnext(cell); + } + bms_free(tmprelids); + + appendStringInfo(expr, " %s", exprstr); + return; +} +#endif + /* * create_append_plan * Create an Append plan for 'best_path' and (recursively) plans @@ -1583,9 +2216,23 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses) { RemoteQuery *scan_plan; + bool prefix; Index scan_relid = best_path->parent->relid; RangeTblEntry *rte; - + char *wherestr = NULL; + Bitmapset *varattnos = NULL; + List *remote_scan_clauses = NIL; + List *local_scan_clauses = NIL; + Oid nspid; + char *nspname; + char *relname; + const char *nspname_q; + const char *relname_q; + const char *aliasname_q; + int i; + TupleDesc tupdesc; + bool first; + StringInfoData sql; Assert(scan_relid > 0); rte = planner_rt_fetch(scan_relid, root); @@ -1598,16 +2245,159 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ scan_clauses = extract_actual_clauses(scan_clauses, false); + if (scan_clauses) + { + ListCell *l; + + foreach(l, (List *)scan_clauses) + { + Node *clause = lfirst(l); + + if (is_foreign_qual(clause)) + remote_scan_clauses = lappend(remote_scan_clauses, clause); + else + local_scan_clauses = lappend(local_scan_clauses, clause); + } + } + + /* + * Incorporate any remote_scan_clauses into the WHERE clause that + * we intend to push to the remote server. + */ + if (remote_scan_clauses) + { + char *sep = ""; + ListCell *l; + StringInfoData buf; + List *deparse_context; + + initStringInfo(&buf); + + deparse_context = deparse_context_for_remotequery( + get_rel_name(rte->relid), rte->relid); + + /* + * remote_scan_clauses is a list of scan clauses (restrictions) that we + * can push to the remote server. We want to deparse each of those + * expressions (that is, each member of the List) and AND them together + * into a WHERE clause. + */ + + foreach(l, (List *)remote_scan_clauses) + { + Node *clause = lfirst(l); + + appendStringInfo(&buf, "%s", sep ); + appendStringInfo(&buf, "%s", deparse_expression(clause, deparse_context, false, false)); + sep = " AND "; + } + + wherestr = buf.data; + } + + /* + * Now walk through the target list and the scan clauses to get the + * interesting attributes. Only those attributes will be fetched from the + * remote side. + */ + varattnos = pull_varattnos_varno((Node *) best_path->parent->reltargetlist, best_path->parent->relid, + varattnos); + varattnos = pull_varattnos_varno((Node *) local_scan_clauses, + best_path->parent->relid, varattnos); + /* + * Scanning multiple relations in a RemoteQuery node is not supported. + */ + prefix = false; +#if 0 + prefix = list_length(estate->es_range_table) > 1; +#endif + + /* Get quoted names of schema, table and alias */ + nspid = get_rel_namespace(rte->relid); + nspname = get_namespace_name(nspid); + relname = get_rel_name(rte->relid); + nspname_q = quote_identifier(nspname); + relname_q = quote_identifier(relname); + aliasname_q = quote_identifier(rte->eref->aliasname); + + initStringInfo(&sql); + + /* deparse SELECT clause */ + appendStringInfo(&sql, "SELECT "); + + /* + * TODO: omit (deparse to "NULL") columns which are not used in the + * original SQL. + * + * We must parse nodes parents of this RemoteQuery node to determine unused + * columns because some columns may be used only in parent Sort/Agg/Limit + * nodes. + */ + tupdesc = best_path->parent->reltupdesc; + first = true; + for (i = 0; i < tupdesc->natts; i++) + { + /* skip dropped attributes */ + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (!first) + appendStringInfoString(&sql, ", "); + + if (bms_is_member(i + 1 - FirstLowInvalidHeapAttributeNumber, varattnos)) + { + if (prefix) + appendStringInfo(&sql, "%s.%s", + aliasname_q, tupdesc->attrs[i]->attname.data); + else + appendStringInfo(&sql, "%s", tupdesc->attrs[i]->attname.data); + } + else + appendStringInfo(&sql, "%s", "NULL"); + first = false; + } + + /* if target list is composed only of system attributes, add dummy column */ + if (first) + appendStringInfo(&sql, "NULL"); + + /* deparse FROM clause */ + appendStringInfo(&sql, " FROM "); + /* + * XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for + * the foreign table name instead of the local name ? + */ + appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q); + pfree(nspname); + pfree(relname); + if (nspname_q != nspname_q) + pfree((char *) nspname_q); + if (relname_q != relname_q) + pfree((char *) relname_q); + if (aliasname_q != rte->eref->aliasname) + pfree((char *) aliasname_q); + + if (wherestr) + { + appendStringInfo(&sql, " WHERE "); + appendStringInfo(&sql, "%s", wherestr); + pfree(wherestr); + } + + bms_free(varattnos); + scan_plan = make_remotequery(tlist, rte, - scan_clauses, + local_scan_clauses, scan_relid); + scan_plan->sql_statement = sql.data; + copy_path_costsize(&scan_plan->scan.plan, best_path); /* PGXCTODO - get better estimates */ scan_plan->scan.plan.plan_rows = 1000; - + return scan_plan; } #endif @@ -3819,3 +4609,56 @@ is_projection_capable_plan(Plan *plan) } return true; } + +#ifdef PGXC +/* + * findReferencedVars() + * + * Constructs a list of those Vars in targetlist which are found in + * parent_vars (in other words, the intersection of targetlist and + * parent_vars). Returns a new list in *out_tlist and a bitmap of + * those relids found in the result. + * + * Additionally do look at the qual references to other vars! They + * also need to be selected.. + */ +static void +findReferencedVars(List *parent_vars, Plan *plan, List **out_tlist, Relids *out_relids) +{ + List *vars; + Relids relids = NULL; + List *tlist = NIL; + ListCell *l; + + /* Pull vars from both the targetlist and the clauses attached to this plan */ + vars = pull_var_clause((Node *)plan->targetlist, PVC_REJECT_PLACEHOLDERS); + + foreach(l, vars) + { + Var *var = lfirst(l); + + if (search_tlist_for_var(var, parent_vars)) + tlist = lappend(tlist, var); + + if (!bms_is_member(var->varno, relids)) + relids = bms_add_member(relids, var->varno); + } + + /* now consider the local quals */ + vars = pull_var_clause((Node *)plan->qual, PVC_REJECT_PLACEHOLDERS); + + foreach(l, vars) + { + Var *var = lfirst(l); + + if (search_tlist_for_var(var, tlist) == NULL) + tlist = lappend(tlist, var); + + if (!bms_is_member(var->varno, relids)) + relids = bms_add_member(relids, var->varno); + } + + *out_tlist = tlist; + *out_relids = relids; +} +#endif diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2c95815..dc6ff35 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -301,6 +301,9 @@ subquery_planner(PlannerGlobal *glob, Query *parse, root->eq_classes = NIL; root->append_rel_list = NIL; +#ifdef PGXC + root->rs_alias_index = 1; +#endif root->hasRecursion = hasRecursion; if (hasRecursion) root->wt_param_id = SS_assign_worktable_param(root); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index cab7fb4..950e388 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1401,6 +1401,32 @@ search_indexed_tlist_for_non_var(Node *node, return NULL; /* no match */ } +#ifdef PGXC +/* + * search_tlist_for_var --- find a Var in the provided tlist. This does a + * basic scan through the list. So not very efficient... + * + * If no match, return NULL. + * + */ +Var * +search_tlist_for_var(Var *var, List *jtlist) +{ + Index varno = var->varno; + AttrNumber varattno = var->varattno; + ListCell *l; + + foreach(l, jtlist) + { + Var *listvar = (Var *) lfirst(l); + + if (listvar->varno == varno && listvar->varattno == varattno) + return var; + } + return NULL; /* no match */ +} +#endif + /* * search_indexed_tlist_for_sortgroupref --- find a sort/group expression * (which is assumed not to be just a Var) diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index 1d93203..b1c8bcb 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -92,12 +92,30 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind) rel->index_outer_relids = NULL; rel->index_inner_paths = NIL; +#ifdef PGXC + rel->reltupdesc = rte->reltupdesc; +#endif + /* Check type of rtable entry */ switch (rte->rtekind) { case RTE_RELATION: /* Table --- retrieve statistics from the system catalogs */ get_relation_info(root, rte->relid, rte->inh, rel); +#ifdef PGXC + /* + * This is a remote table... we have no idea how many pages/rows + * we may get from a scan of this table. However, we should set the + * costs in such a manner that cheapest paths should pick up the + * ones involving these remote rels + * + * These allow for maximum query shipping to the remote + * side later during the planning phase + */ + rel->pages = 1; + rel->tuples = 1; + rel->rows = 1; +#endif break; case RTE_SUBQUERY: case RTE_FUNCTION: diff --git a/src/backend/optimizer/util/var.c b/src/backend/optimizer/util/var.c index 1a6826f..a574278 100644 --- a/src/backend/optimizer/util/var.c +++ b/src/backend/optimizer/util/var.c @@ -34,6 +34,14 @@ typedef struct int sublevels_up; } pull_varnos_context; +#ifdef PGXC +typedef struct +{ + Index varno; + Bitmapset *varattnos; +} pull_varattnos_context; +#endif + typedef struct { int var_location; @@ -68,6 +76,10 @@ typedef struct static bool pull_varnos_walker(Node *node, pull_varnos_context *context); static bool pull_varattnos_walker(Node *node, Bitmapset **varattnos); +#ifdef PGXC +static bool pull_varattnos_varno_walker(Node *node, + pull_varattnos_context *context); +#endif static bool contain_var_clause_walker(Node *node, void *context); static bool contain_vars_of_level_walker(Node *node, int *sublevels_up); static bool locate_var_of_level_walker(Node *node, @@ -228,6 +240,54 @@ contain_var_clause(Node *node) return contain_var_clause_walker(node, NULL); } +#ifdef PGXC +/* + * pull_varattnos_varno + * Find all the distinct attribute numbers present in an expression tree, + * and add them to the initial contents of *varattnos. + * + * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that + * we can include system attributes (e.g., OID) in the bitmap representation. + * + * This is same as pull_varattnos except for the fact that it gets attributes + * for the given varno + */ +Bitmapset * +pull_varattnos_varno(Node *node, Index varno, Bitmapset *varattnos) +{ + pull_varattnos_context context; + + context.varno = varno; + context.varattnos = varattnos; + + (void) pull_varattnos_varno_walker(node, &context); + + return context.varattnos; +} + +static bool +pull_varattnos_varno_walker(Node *node, pull_varattnos_context *context) +{ + if (node == NULL) + return false; + + Assert(context != NULL); + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varno == context->varno) + context->varattnos = bms_add_member(context->varattnos, + var->varattno - FirstLowInvalidHeapAttributeNumber); + return false; + } + + return expression_tree_walker(node, pull_varattnos_varno_walker, + (void *) context); +} +#endif + static bool contain_var_clause_walker(Node *node, void *context) { diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index 5a42451..229b16d 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -923,6 +923,11 @@ addRangeTableEntry(ParseState *pstate, rel = parserOpenTable(pstate, relation, lockmode); rte->relid = RelationGetRelid(rel); +#ifdef PGXC + rte->reltupdesc = CreateTupleDescCopyConstr(rel->rd_att); + rte->relname = RelationGetRelationName(rel); +#endif + /* * Build the list of effective column names using user-supplied aliases * and/or actual column names. @@ -985,6 +990,11 @@ addRangeTableEntryForRelation(ParseState *pstate, rte->alias = alias; rte->relid = RelationGetRelid(rel); +#ifdef PGXC + rte->reltupdesc = CreateTupleDescCopyConstr(rel->rd_att); + rte->relname = RelationGetRelationName(rel); +#endif + /* * Build the list of effective column names using user-supplied aliases * and/or actual column names. diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 29e4ee0..e678a14 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -87,7 +87,8 @@ typedef struct /* If two relations are joined based on special location information */ typedef enum PGXCJoinType { - JOIN_REPLICATED, + JOIN_REPLICATED_ONLY, + JOIN_REPLICATED_PARTITIONED, JOIN_COLOCATED_PARTITIONED, JOIN_OTHER } PGXCJoinType; @@ -144,6 +145,7 @@ static ExecNodes *get_plan_nodes(Query *query, bool isRead); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); +static void InitXCWalkerContext(XCWalkerContext *context); /* * True if both lists contain only one node and are the same @@ -693,15 +695,20 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) { + /* add to replicated join conditions */ context->conditions->replicated_joins = - lappend(context->conditions->replicated_joins, opexpr); + lappend(context->conditions->replicated_joins, pgxc_join); if (colvar->varlevelsup != colvar2->varlevelsup) context->multilevel_join = true; - if (rel_loc_info2->locatorType != LOCATOR_TYPE_REPLICATED) + if (rel_loc_info2->locatorType == LOCATOR_TYPE_REPLICATED) + pgxc_join->join_type = JOIN_REPLICATED_ONLY; + else { + pgxc_join->join_type = JOIN_REPLICATED_PARTITIONED; + /* Note other relation, saves us work later. */ context->conditions->base_rel_name = column_base2->relname; context->conditions->base_rel_loc_info = rel_loc_info2; @@ -717,23 +724,21 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) FreeRelationLocInfo(rel_loc_info2); } - /* note nature of join between the two relations */ - pgxc_join->join_type = JOIN_REPLICATED; return false; } else if (rel_loc_info2->locatorType == LOCATOR_TYPE_REPLICATED) { + /* note nature of join between the two relations */ + pgxc_join->join_type = JOIN_REPLICATED_PARTITIONED; + /* add to replicated join conditions */ context->conditions->replicated_joins = - lappend(context->conditions->replicated_joins, opexpr); + lappend(context->conditions->replicated_joins, pgxc_join); /* other relation not replicated, note it for later */ context->conditions->base_rel_name = column_base->relname; context->conditions->base_rel_loc_info = rel_loc_info1; - /* note nature of join between the two relations */ - pgxc_join->join_type = JOIN_REPLICATED; - if (rel_loc_info2) FreeRelationLocInfo(rel_loc_info2); @@ -1259,6 +1264,23 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) return false; } +/* + * Set initial values for expression walker + */ +static void +InitXCWalkerContext(XCWalkerContext *context) +{ + context->isRead = true; + context->exec_nodes = NULL; + context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); + context->rtables = NIL; + context->multilevel_join = false; + context->varno = 0; + context->within_or = false; + context->within_not = false; + context->exec_on_coord = false; + context->join_list = NIL; +} /* * Top level entry point before walking query to determine plan nodes @@ -1271,18 +1293,9 @@ get_plan_nodes(Query *query, bool isRead) XCWalkerContext context; - context.query = query; + InitXCWalkerContext(&context); context.isRead = isRead; - context.exec_nodes = NULL; - context.conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); - context.rtables = NIL; context.rtables = lappend(context.rtables, query->rtable); - context.multilevel_join = false; - context.varno = 0; - context.within_or = false; - context.within_not = false; - context.exec_on_coord = false; - context.join_list = NIL; if (!get_plan_nodes_walker((Node *) query, &context)) result_nodes = context.exec_nodes; @@ -2315,3 +2328,148 @@ free_query_step(RemoteQuery *query_step) list_free_deep(query_step->simple_aggregates); pfree(query_step); } + + +/* + * See if we can reduce the passed in RemoteQuery nodes to a single step. + * + * We need to check when we can further collapse already collapsed nodes. + * We cannot always collapse- we do not want to allow a replicated table + * to be used twice. That is if we have + * + * partitioned_1 -- replicated -- partitioned_2 + * + * partitioned_1 and partitioned_2 cannot (usually) be safely joined only + * locally. + * We can do this by checking (may need tracking) what type it is, + * and looking at context->conditions->replicated_joins + * + * The following cases are possible, and whether or not it is ok + * to reduce. + * + * If the join between the two RemoteQuery nodes is replicated + * + * Node 1 Node 2 + * rep-part folded rep-part folded ok to reduce? + * 0 0 0 1 1 + * 0 0 1 1 1 + * 0 1 0 1 1 + * 0 1 1 1 1 + * 1 1 1 1 0 + * + * + * If the join between the two RemoteQuery nodes is replicated - partitioned + * + * Node 1 Node 2 + * rep-part folded rep-part folded ok to reduce? + * 0 0 0 1 1 + * 0 0 1 1 0 + * 0 1 0 1 1 + * 0 1 1 1 0 + * 1 1 1 1 0 + * + * + * If the join between the two RemoteQuery nodes is partitioned - partitioned + * it is always reducibile safely, + * + * RemoteQuery *innernode - the inner node + * RemoteQuery *outernode - the outer node + * bool *partitioned_replicated - set to true if we have a partitioned-replicated + * join. We want to use replicated tables with non-replicated + * tables ony once. Only use this value if this function + * returns true. + */ +bool +IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, + List *rtable_list, JoinPath *join_path, bool *partitioned_replicated) +{ + XCWalkerContext context; + ListCell *cell; + bool maybe_reducible = false; + bool result = false; + + + *partitioned_replicated = false; + + InitXCWalkerContext(&context); + context.isRead = true; /* PGXCTODO - determine */ + context.rtables = NIL; + context.rtables = lappend(context.rtables, rtable_list); /* add to list of lists */ + + + + foreach(cell, join_path->joinrestrictinfo) + { + RestrictInfo *node = (RestrictInfo *) lfirst(cell); + + /* + * Check if we can fold these safely. + * + * If examine_conditions_walker() returns true, + * then it definitely is not collapsable. + * If it returns false, it may or may not be, we have to check + * context.conditions at the end. + * We keep trying, because another condition may fulfill the criteria. + */ + maybe_reducible = !examine_conditions_walker((Node *) node->clause, &context); + + if (!maybe_reducible) + break; + + } + + /* check to see if we found any partitioned or replicated joins */ + if (maybe_reducible && + (context.conditions->partitioned_parent_child + || context.conditions->replicated_joins)) + { + /* + * If we get here, we think that we can fold the + * RemoteQuery nodes into a single one. + */ + result = true; + + /* Check replicated-replicated and replicated-partitioned joins */ + if (context.conditions->replicated_joins) + { + ListCell *cell; + + /* if we already reduced with replicated tables already, we + * cannot here. + * PGXCTODO - handle more cases and use outer_relids and inner_relids + * For now we just give up. + */ + if ((innernode->remotejoin && innernode->partitioned_replicated) && + (outernode->remotejoin && outernode->partitioned_replicated)) + { + /* not reducible after all */ + return false; + } + + foreach(cell, context.conditions->replicated_joins) + { + PGXC_Join *pgxc_join = (PGXC_Join *) lfirst(cell); + + if (pgxc_join->join_type == JOIN_REPLICATED_PARTITIONED) + { + *partitioned_replicated = true; + + /* + * If either of these already have such a join, we do not + * want to add it a second time. + */ + if ((innernode->remotejoin && innernode->partitioned_replicated) || + (outernode->remotejoin && outernode->partitioned_replicated)) + { + /* not reducible after all */ + return false; + } + } + } + } + } + + return result; +} + + diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 14dce33..7fe08be 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -2388,20 +2388,6 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ExecInitScanTupleSlot(estate, &remotestate->ss); - /* - * Initialize scan relation. get the relation object id from the - * relid'th entry in the range table, open that relation and acquire - * appropriate lock on it. - * This is needed for deparseSQL - * We should remove these lines once we plan and deparse earlier. - */ - if (!node->is_single_step) - { - currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid); - remotestate->ss.ss_currentRelation = currentRelation; - ExecAssignScanType(&remotestate->ss, RelationGetDescr(currentRelation)); - } - remotestate->ss.ps.ps_TupFromTlist = false; /* @@ -2723,11 +2709,6 @@ ExecRemoteQuery(RemoteQueryState *node) errmsg("Could not begin transaction on data nodes."))); } - /* Get the SQL string */ - /* only do if not single step */ - if (!step->is_single_step) - step->sql_statement = deparseSql(node); - /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { diff --git a/src/backend/pgxc/pool/postgresql_fdw.c b/src/backend/pgxc/pool/postgresql_fdw.c index dabf5da..14c0ddb 100644 --- a/src/backend/pgxc/pool/postgresql_fdw.c +++ b/src/backend/pgxc/pool/postgresql_fdw.c @@ -45,7 +45,7 @@ /* deparse SQL from the request */ bool is_immutable_func(Oid funcid); -static bool is_foreign_qual(ExprState *state); +bool is_foreign_qual(Node *node); static bool foreign_qual_walker(Node *node, void *context); char *deparseSql(RemoteQueryState *scanstate); @@ -103,10 +103,10 @@ is_immutable_func(Oid funcid) * local server in the foreign server. * - scalar array operator (ANY/ALL) */ -static bool -is_foreign_qual(ExprState *state) +bool +is_foreign_qual(Node *node) { - return !foreign_qual_walker((Node *) state->expr, NULL); + return !foreign_qual_walker(node, NULL); } /* @@ -120,6 +120,9 @@ foreign_qual_walker(Node *node, void *context) switch (nodeTag(node)) { + case T_ExprState: + return foreign_qual_walker((Node *) ((ExprState *) node)->expr, NULL); + case T_Param: /* TODO: pass internal parameters to the foreign server */ if (((Param *) node)->paramkind != PARAM_EXTERN) @@ -286,7 +289,7 @@ elog(DEBUG2, "%s(%u) called", __FUNCTION__, __LINE__); { ExprState *state = lfirst(lc); - if (is_foreign_qual(state)) + if (is_foreign_qual((Node *) state)) { elog(DEBUG1, "foreign qual: %s", nodeToString(state->expr)); foreign_qual = lappend(foreign_qual, state); @@ -317,7 +320,7 @@ elog(DEBUG2, "%s(%u) called", __FUNCTION__, __LINE__); Node *node; node = (Node *) make_ands_explicit(foreign_expr); appendStringInfo(&sql, " WHERE "); - appendStringInfo(&sql, + appendStringInfo(&sql, "%s", deparse_expression(node, context, prefix, false)); /* * The contents of the list MUST NOT be free-ed because they are diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index c930701..130dff3 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -114,6 +114,8 @@ typedef struct List *subplans; /* List of subplans, in plan-tree case */ Plan *outer_plan; /* OUTER subplan, or NULL if none */ Plan *inner_plan; /* INNER subplan, or NULL if none */ + + bool remotequery; /* deparse context for remote query */ } deparse_namespace; @@ -1936,10 +1938,42 @@ deparse_context_for(const char *aliasname, Oid relid) dpns->ctes = NIL; dpns->subplans = NIL; dpns->outer_plan = dpns->inner_plan = NULL; +#ifdef PGXC + dpns->remotequery = false; +#endif + + /* Return a one-deep namespace stack */ + return list_make1(dpns); +} + +#ifdef PGXC +List * +deparse_context_for_remotequery(const char *aliasname, Oid relid) +{ + deparse_namespace *dpns; + RangeTblEntry *rte; + + dpns = (deparse_namespace *) palloc(sizeof(deparse_namespace)); + + /* Build a minimal RTE for the rel */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = relid; + rte->eref = makeAlias(aliasname, NIL); + rte->inh = false; + rte->inFromCl = true; + + /* Build one-element rtable */ + dpns->rtable = list_make1(rte); + dpns->ctes = NIL; + dpns->subplans = NIL; + dpns->outer_plan = dpns->inner_plan = NULL; + dpns->remotequery = true; /* Return a one-deep namespace stack */ return list_make1(dpns); } +#endif /* * deparse_context_for_plan - Build deparse context for a plan node @@ -1974,7 +2008,9 @@ deparse_context_for_plan(Node *plan, Node *outer_plan, dpns->rtable = rtable; dpns->ctes = NIL; dpns->subplans = subplans; - +#ifdef PGXC + dpns->remotequery = false; +#endif /* * Set up outer_plan and inner_plan from the Plan node (this includes * various special cases for particular Plan types). @@ -2138,7 +2174,9 @@ make_ruledef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc, dpns.ctes = query->cteList; dpns.subplans = NIL; dpns.outer_plan = dpns.inner_plan = NULL; - +#ifdef PGXC + dpns.remotequery = false; +#endif get_rule_expr(qual, &context, false); } @@ -2285,7 +2323,9 @@ get_query_def(Query *query, StringInfo buf, List *parentnamespace, dpns.ctes = query->cteList; dpns.subplans = NIL; dpns.outer_plan = dpns.inner_plan = NULL; - +#ifdef PGXC + dpns.remotequery = false; +#endif switch (query->commandType) { case CMD_SELECT: @@ -3379,6 +3419,14 @@ get_variable(Var *var, int levelsup, bool showstar, deparse_context *context) * likely that varno is OUTER or INNER, in which case we must dig down * into the subplans. */ +#ifdef PGXC + if (dpns->remotequery... [truncated message content] |
From: mason_s <ma...@us...> - 2010-11-01 15:36:20
|
Project "Postgres-XC". The branch, master has been updated via d62381f6e17e57fd4b3d7b51afce3032fe35507a (commit) from d7d492eaeca181add193b4705de58637f5ba7c58 (commit) - Log ----------------------------------------------------------------- commit d62381f6e17e57fd4b3d7b51afce3032fe35507a Author: Mason Sharp <ma...@us...> Date: Mon Nov 1 11:27:00 2010 -0400 Special hanlding for ANALYZE on the data nodes. Before this commit, if autovacuum executed analyze, it would get the XID from GTM, and the XID would be included in all global snapshots, even though the XID will never be involved in a transaction on the other nodes. This commit adds special handling to track these separately. They are excluded from global snapshots, and when the data node receives a snapshot, it adds any of these XIDs to the snapshot. This helps to prevent any long-running auto-analyze on a single node slowing down execution on other nodes because of a low xmin value. diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 03e6d90..d87abed 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -25,6 +25,7 @@ #ifdef PGXC #include "pgxc/pgxc.h" #include "access/gtm.h" +#include "storage/procarray.h" #endif @@ -110,7 +111,10 @@ GetNewTransactionId(bool isSubXact) * block all other processes. * GXID can just be obtained from a remote Coordinator */ - xid = (TransactionId) BeginTranGTM(timestamp); + if (IsAutoVacuumWorkerProcess() && (MyProc->vacuumFlags & PROC_IN_VACUUM)) + xid = (TransactionId) BeginTranAutovacuumGTM(); + else + xid = (TransactionId) BeginTranGTM(timestamp); *timestamp_received = true; } #endif @@ -148,20 +152,22 @@ GetNewTransactionId(bool isSubXact) if (IsAutoVacuumWorkerProcess()) { if (MyProc->vacuumFlags & PROC_IN_VACUUM) - { elog (DEBUG1, "Getting XID for autovacuum"); - /* Try and get gxid directly from GTM. - * We use a different function so that GTM knows to - * exclude it from other snapshots. - */ - next_xid = (TransactionId) BeginTranAutovacuumGTM(); - } else { elog (DEBUG1, "Getting XID for autovacuum worker (analyze)"); - /* try and get gxid directly from GTM */ - next_xid = (TransactionId) BeginTranGTM(NULL); + /* + * Acquire the Analyze array lock. + * We track ANALYZE XIDs separately and add them only to local snapshots. + */ + LWLockAcquire(AnalyzeProcArrayLock, LW_EXCLUSIVE); } + /* + * Get gxid directly from GTM. + * We use a separate function so that GTM knows to exclude it from + * other snapshots. + */ + next_xid = (TransactionId) BeginTranAutovacuumGTM(); } else if (GetForceXidFromGTM()) { @@ -328,6 +334,15 @@ GetNewTransactionId(bool isSubXact) } } +#ifdef PGXC + /* If it is auto-analyze, we need to add it to the array and unlock */ + if(IS_PGXC_DATANODE && IsAutoVacuumAnalyzeWorker()) + { + AnalyzeProcArrayAdd(MyProc); + LWLockRelease(AnalyzeProcArrayLock); + } +#endif + LWLockRelease(XidGenLock); return xid; } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index f51672e..604d48a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1835,7 +1835,18 @@ CommitTransaction(void) /* If we are autovacuum, commit on GTM */ if ((IsAutoVacuumWorkerProcess() || GetForceXidFromGTM()) && IsGTMConnected()) + { + if(IsAutoVacuumAnalyzeWorker()) + LWLockAcquire(AnalyzeProcArrayLock, LW_EXCLUSIVE); + CommitTranGTM((GlobalTransactionId) latestXid); + + if(IsAutoVacuumAnalyzeWorker()) + { + AnalyzeProcArrayRemove(MyProc, latestXid); + LWLockRelease(AnalyzeProcArrayLock); + } + } } #endif @@ -2300,10 +2311,19 @@ AbortTransaction(void) } else if (IS_PGXC_DATANODE || IsConnFromCoord()) { + if(IsAutoVacuumAnalyzeWorker()) + LWLockAcquire(AnalyzeProcArrayLock, LW_EXCLUSIVE); + /* If we are autovacuum, commit on GTM */ if ((IsAutoVacuumWorkerProcess() || GetForceXidFromGTM()) && IsGTMConnected()) RollbackTranGTM((GlobalTransactionId) latestXid); + + if(IsAutoVacuumAnalyzeWorker()) + { + AnalyzeProcArrayRemove(MyProc, latestXid); + LWLockRelease(AnalyzeProcArrayLock); + } } #endif /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 3022867..4e57082 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -109,6 +109,9 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, MultiXactShmemSize()); size = add_size(size, LWLockShmemSize()); size = add_size(size, ProcArrayShmemSize()); +#ifdef PGXC + size = add_size(size, AnalyzeProcArrayShmemSize()); +#endif size = add_size(size, BackendStatusShmemSize()); size = add_size(size, SInvalShmemSize()); size = add_size(size, PMSignalShmemSize()); @@ -197,6 +200,9 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) if (!IsUnderPostmaster) InitProcGlobal(); CreateSharedProcArray(); +#ifdef PGXC + CreateSharedAnalyzeProcArray(); +#endif CreateSharedBackendStatus(); /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 7551d95..6000fdb 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -17,6 +17,14 @@ * as are the myProcLocks lists. They can be distinguished from regular * backend PGPROCs at need by checking for pid == 0. * +#ifdef PGXC + * For Postgres-XC, there is some special handling for ANALYZE. + * An XID for a local ANALYZE command will never involve other nodes. + * Also, ANALYZE may run for a long time, affecting snapshot xmin values + * on other nodes unnecessarily. We want to exclude the XID + * in global snapshots, but include it in local ones. As a result, + * these are tracked in shared memory separately. +#endif * * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -42,6 +50,7 @@ #ifdef PGXC #include "pgxc/pgxc.h" #include "access/gtm.h" +#include "storage/ipc.h" /* PGXC_DATANODE */ #include "postmaster/autovacuum.h" #endif @@ -62,6 +71,10 @@ typedef struct ProcArrayStruct static ProcArrayStruct *procArray; +#ifdef PGXC +static ProcArrayStruct *analyzeProcArray; +#endif + #ifdef XIDCACHE_DEBUG @@ -1571,6 +1584,9 @@ GetSnapshotDataDataNode(Snapshot snapshot) if ((snapshot_source == SNAPSHOT_COORDINATOR || snapshot_source == SNAPSHOT_DIRECT) && TransactionIdIsValid(gxmin)) { + int index; + ProcArrayStruct *arrayP = analyzeProcArray; + snapshot->xmin = gxmin; snapshot->xmax = gxmax; snapshot->xcnt = gxcnt; @@ -1605,6 +1621,16 @@ GetSnapshotDataDataNode(Snapshot snapshot) (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } + else if (snapshot->max_xcnt < gxcnt) + { + snapshot->xip = (TransactionId *) + realloc(snapshot->xip, gxcnt * sizeof(TransactionId)); + if (snapshot->xip == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + snapshot->max_xcnt = gxcnt; + } memcpy(snapshot->xip, gxip, gxcnt * sizeof(TransactionId)); snapshot->curcid = GetCurrentCommandId(false); @@ -1631,6 +1657,72 @@ GetSnapshotDataDataNode(Snapshot snapshot) snapshot->regd_count = 0; snapshot->copied = false; + /* + * Start of handling for local ANALYZE + * Make adjustments for any running auto ANALYZE commands + */ + LWLockAcquire(AnalyzeProcArrayLock, LW_SHARED); + + /* + * Spin over analyzeProcArray and add these local analyze XIDs to the + * local snapshot. + */ + for (index = 0; index < arrayP->numProcs; index++) + { + volatile PGPROC *proc = arrayP->procs[index]; + TransactionId xid; + + + /* Update globalxmin to be the smallest valid xmin */ + xid = proc->xmin; /* fetch just once */ + + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, RecentGlobalXmin)) + RecentGlobalXmin = xid; + + /* Fetch xid just once - see GetNewTransactionId */ + xid = proc->xid; + + /* + * If the transaction has been assigned an xid < xmax we add it to the + * snapshot, and update xmin if necessary. There's no need to store + * XIDs >= xmax, since we'll treat them as running anyway. We don't + * bother to examine their subxids either. + * + * We don't include our own XID (if any) in the snapshot, but we must + * include it into xmin. + */ + if (TransactionIdIsNormal(xid)) + { + if (TransactionIdFollowsOrEquals(xid, snapshot->xmax)) + continue; + if (proc != MyProc) + { + if (snapshot->xcnt >= snapshot->max_xcnt) + { + snapshot->max_xcnt += arrayP->numProcs; + + snapshot->xip = (TransactionId *) + realloc(snapshot->xip, snapshot->max_xcnt * sizeof(TransactionId)); + if (snapshot->xip == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + snapshot->xip[snapshot->xcnt++] = xid; + elog(DEBUG1, "Adding Analyze for xid %d to snapshot", proc->xid); + } + if (TransactionIdPrecedes(xid, snapshot->xmin)) + snapshot->xmin = xid; + } + } + + if (!TransactionIdIsValid(MyProc->xmin)) + MyProc->xmin = snapshot->xmin; + + LWLockRelease(AnalyzeProcArrayLock); + /* End handling of local analyze XID in snapshots */ + return true; } return false; @@ -1745,4 +1837,101 @@ GetSnapshotDataCoordinator(Snapshot snapshot) } return false; } + + +/* + * Report shared-memory space needed by CreateSharedAnalyzeProcArray. + */ +Size +AnalyzeProcArrayShmemSize(void) +{ + Size size; + + size = offsetof(ProcArrayStruct, procs); + size = add_size(size, mul_size(sizeof(PGPROC *), autovacuum_max_workers)); + + return size; +} + +/* + * Initialize the shared ANALYZE PGPROC array during postmaster startup. + */ +void +CreateSharedAnalyzeProcArray(void) +{ + bool found; + + /* Create or attach to the ProcArray shared structure */ + analyzeProcArray = (ProcArrayStruct *) + ShmemInitStruct("Analyze Proc Array", AnalyzeProcArrayShmemSize(), &found); + + if (!found) + { + /* + * We're the first - initialize. + */ + analyzeProcArray->numProcs = 0; + analyzeProcArray->maxProcs = autovacuum_max_workers; + } +} + +/* + * Add the specified PGPROC to the shared ANALYZE array. + * + * It assumes that AnalyzeProcArrayLock is already held, + * and will be held at exit. + */ +void +AnalyzeProcArrayAdd(PGPROC *proc) +{ + ProcArrayStruct *arrayP = analyzeProcArray; + + if (arrayP->numProcs >= arrayP->maxProcs) + { + /* + * Ooops, no room. (This really shouldn't happen, since there is a + * fixed supply of PGPROC structs too, and so we should have failed + * earlier.) + */ + LWLockRelease(AnalyzeProcArrayLock); + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many analyze clients already"))); + } + + arrayP->procs[arrayP->numProcs] = proc; + arrayP->numProcs++; + + elog(DEBUG1, "Added analyze proc %p for xid %d in AnalyzeProcArray", proc, proc->xid); +} + +/* + * Remove the specified PGPROC from the shared ANALYZE array. + * We assume that AnalyzeProcArrayLock is already held, + * and it will still be held at exit + */ +void +AnalyzeProcArrayRemove(PGPROC *proc, TransactionId latestXid) +{ + ProcArrayStruct *arrayP = analyzeProcArray; + int index; + +#ifdef XIDCACHE_DEBUG + /* dump stats at backend shutdown, but not prepared-xact end */ + if (proc->pid != 0) + DisplayXidCache(); +#endif + + for (index = 0; index < arrayP->numProcs; index++) + { + if (arrayP->procs[index] == proc) + { + arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1]; + arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */ + arrayP->numProcs--; + elog(DEBUG1, "Removed analyze proc %p for xid %d in AnalyzeProcArray", proc, proc->xid); + return; + } + } +} #endif /* PGXC */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 0b918cb..9b47ff6 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -39,6 +39,9 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -580,6 +583,15 @@ RemoveProcFromArray(int code, Datum arg) { Assert(MyProc != NULL); ProcArrayRemove(MyProc, InvalidTransactionId); +#ifdef PGXC + /* Remove from the analyze array */ + if (IS_PGXC_DATANODE && IsAutoVacuumAnalyzeWorker()) + { + LWLockAcquire(AnalyzeProcArrayLock, LW_EXCLUSIVE); + AnalyzeProcArrayRemove(MyProc, InvalidTransactionId); + LWLockRelease(AnalyzeProcArrayLock); + } +#endif } /* diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index 640e973..06be483 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -17,6 +17,11 @@ #include "storage/lock.h" + +#ifdef PGXC /* PGXC_DATANODE */ +#define IsAutoVacuumAnalyzeWorker() (IsAutoVacuumWorkerProcess() && !(MyProc->vacuumFlags & PROC_IN_VACUUM)) +#endif + /* GUC variables */ extern bool autovacuum_start_daemon; extern int autovacuum_max_workers; @@ -61,8 +66,4 @@ extern void AutovacuumLauncherIAm(void); extern Size AutoVacuumShmemSize(void); extern void AutoVacuumShmemInit(void); -#ifdef PGXC /* PGXC_DATANODE */ -bool IsAutoVacuumWorkerProcess(void); -#endif - #endif /* AUTOVACUUM_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e389c61..0548159 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -67,6 +67,9 @@ typedef enum LWLockId AutovacuumLock, AutovacuumScheduleLock, SyncScanLock, +#ifdef PGXC + AnalyzeProcArrayLock, +#endif /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 3cc1689..f872c48 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -28,6 +28,10 @@ extern void ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid); extern void ProcArrayClearTransaction(PGPROC *proc); #ifdef PGXC /* PGXC_DATANODE */ +extern Size AnalyzeProcArrayShmemSize(void); +extern void AnalyzeProcArrayAdd(PGPROC *proc); +extern void CreateSharedAnalyzeProcArray(void); +extern void AnalyzeProcArrayRemove(PGPROC *proc, TransactionId latestXid); extern void SetGlobalSnapshotData(int xmin, int xmax, int xcnt, int *xip); extern void UnsetGlobalSnapshotData(void); #endif /* PGXC */ ----------------------------------------------------------------------- Summary of changes: src/backend/access/transam/varsup.c | 35 +++++-- src/backend/access/transam/xact.c | 20 ++++ src/backend/storage/ipc/ipci.c | 6 + src/backend/storage/ipc/procarray.c | 189 +++++++++++++++++++++++++++++++++++ src/backend/storage/lmgr/proc.c | 12 ++ src/include/postmaster/autovacuum.h | 9 +- src/include/storage/lwlock.h | 3 + src/include/storage/procarray.h | 4 + 8 files changed, 264 insertions(+), 14 deletions(-) hooks/post-receive -- Postgres-XC |