Skip to content

Commit 84dd6be

Browse files
committed
[#27316] YSQL: Improve TabletServer::GetTserverCatalogMessageLists
Summary: The function `TabletServer::GetTserverCatalogMessageLists` is used to serve a PG backend's local tserver request to get the invalidation messages associated with a consecutive sequence of catalog versions. For example, if a PG backend has current local catalog version 3, and the shared memory catalog version is 5, it will ask its local tserver for the invalidation messages associated with catalog version 4 and 5. For each database, a tserver maintains a queue of `(catalog version, message_list)` pairs. The maximum queue length is determined by `--ysql_max_invalidation_message_queue_size` (default value 1024). Right now `GetTserverCatalogMessageLists` performs a linear scan to find out the first catalog version that is asked for (4 in the previous example). For a full length queue, when a PG backend is not lagging far behind, it means that we need to scan from the beginning of the queue to nearly the end to find out the first catalog version. Consider that the queue is sorted by catalog version, a linear scan is not efficient. We should use `std::lower_bound` to locate the first catalog version more quickly. This diff replaces the linear scan with `std::lower_bound`, also made some change to use SCHECK, and improved logging when no match can be found. Jira: DB-16821 Test Plan: ./yb_build.sh --cxx-test pg_catalog_version-test Reviewers: kfranz, sanketh, mihnea Reviewed By: sanketh Subscribers: yql Differential Revision: https://phorge.dev.yugabyte.com/D44152
1 parent 4da46d1 commit 84dd6be

File tree

1 file changed

+57
-26
lines changed

1 file changed

+57
-26
lines changed

src/yb/tserver/tablet_server.cc

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,49 +1055,80 @@ Status TabletServer::GetTserverCatalogMessageLists(
10551055
const auto db_oid = req.db_oid();
10561056
const auto ysql_catalog_version = req.ysql_catalog_version();
10571057
const auto num_catalog_versions = req.num_catalog_versions();
1058-
DCHECK_GT(db_oid, 0);
1059-
DCHECK_GT(num_catalog_versions, 0);
1058+
SCHECK_GT(db_oid, 0, IllegalState, "Invalid db_oid");
1059+
SCHECK_GT(num_catalog_versions, 0, IllegalState, "Invalid num_catalog_versions");
10601060
auto it = ysql_db_invalidation_messages_map_.find(db_oid);
10611061
if (it == ysql_db_invalidation_messages_map_.end()) {
1062-
DCHECK_EQ(resp->entries_size(), 0);
1062+
SCHECK_EQ(resp->entries_size(), 0, IllegalState, "Invalid entries_size");
10631063
LOG(WARNING) << "Could not find messages for database " << db_oid;
10641064
return Status::OK();
10651065
}
10661066
const auto& messages_vec = it->second.queue;
10671067
uint64_t expected_version = ysql_catalog_version + 1;
1068-
std::set<uint64_t> current_versions;
1069-
for (const auto& info : messages_vec) {
1070-
CHECK(current_versions.insert(info.first).second);
1071-
if (info.first <= ysql_catalog_version) {
1072-
continue;
1073-
}
1074-
if (info.first == expected_version) {
1075-
auto* entry = resp->add_entries();
1076-
if (info.second.has_value()) {
1077-
entry->set_message_list(info.second.value());
1078-
}
1079-
++expected_version;
1068+
// Because messages_vec is sorted, we can use std::lower_bound with a custom
1069+
// comparator function to find expected_version.
1070+
auto comp = [](const std::pair<uint64_t, std::optional<std::string>>& p,
1071+
uint64_t expected_version) {
1072+
return p.first < expected_version;
1073+
};
1074+
auto it2 = std::lower_bound(messages_vec.begin(), messages_vec.end(),
1075+
expected_version, comp);
1076+
// std::lower_bound: returns an iterator pointing to the first element in the range
1077+
// that is not less than (i.e., greater than or equal to) expected_version.
1078+
while (it2 != messages_vec.end() && it2->first == expected_version) {
1079+
auto* entry = resp->add_entries();
1080+
if (it2->second.has_value()) {
1081+
entry->set_message_list(it2->second.value());
10801082
}
1083+
++expected_version;
10811084
if (expected_version > ysql_catalog_version + num_catalog_versions) {
10821085
break;
10831086
}
1087+
++it2;
10841088
}
10851089
// We find a consecutive list (without any holes) matching with what the client asks for.
10861090
if (resp->entries_size() == static_cast<int32_t>(num_catalog_versions)) {
10871091
return Status::OK();
10881092
}
10891093

1090-
if (resp->entries_size() < static_cast<int32_t>(num_catalog_versions)) {
1091-
LOG(INFO) << "Could not find a matching consecutive list"
1092-
<< ", db_oid: " << db_oid
1093-
<< ", ysql_catalog_version: " << ysql_catalog_version
1094-
<< ", num_catalog_versions: " << num_catalog_versions
1095-
<< ", current_versions: " << yb::ToString(current_versions)
1096-
<< ", messages_vec.size(): " << messages_vec.size();
1097-
// Clear any entries that might have matched and added to ensure PG backend
1098-
// will do a full catalog cache refresh.
1099-
resp->mutable_entries()->Clear();
1100-
}
1094+
// The way we populate resp->entries() should ensure this assertion.
1095+
DCHECK_LT(resp->entries_size(), static_cast<int32_t>(num_catalog_versions));
1096+
std::set<uint64_t> current_versions;
1097+
uint64_t last_version = 0;
1098+
for (const auto& info : messages_vec) {
1099+
const auto current_version = info.first;
1100+
SCHECK_LT(last_version, current_version, IllegalState, "Not sorted by catalog version");
1101+
last_version = current_version;
1102+
// Because we have verified last_version < current_version, we can assume insert will
1103+
// be successful.
1104+
current_versions.insert(current_version);
1105+
}
1106+
std::string current_versions_str;
1107+
if (!current_versions.empty()) {
1108+
auto max_version = *current_versions.rbegin();
1109+
auto min_version = *current_versions.begin();
1110+
if (max_version - min_version + 1 > current_versions.size()) {
1111+
// There are holes from min_version to max_version. Print the entire list
1112+
// of versions to allow one to find out the holes.
1113+
current_versions_str = yb::ToString(current_versions);
1114+
} else {
1115+
// There are no holes from min_version to max_version. Print a shorthand
1116+
// rather than the entire list of versions.
1117+
current_versions_str = Format("[$0--$1]", min_version, max_version);
1118+
}
1119+
} else {
1120+
current_versions_str = "[]";
1121+
}
1122+
LOG(INFO) << "Could not find a matching consecutive list"
1123+
<< ", db_oid: " << db_oid
1124+
<< ", ysql_catalog_version: " << ysql_catalog_version
1125+
<< ", num_catalog_versions: " << num_catalog_versions
1126+
<< ", entries_size: " << resp->entries_size()
1127+
<< ", current_versions: " << current_versions_str
1128+
<< ", messages_vec.size(): " << messages_vec.size();
1129+
// Clear any entries that might have matched and added to ensure PG backend
1130+
// will do a full catalog cache refresh.
1131+
resp->mutable_entries()->Clear();
11011132
return Status::OK();
11021133
}
11031134

0 commit comments

Comments
 (0)