Skip to content

Commit 990c16f

Browse files
committed
[#27269] docdb: Add flag to allow continuing tablet writes after compaction failure
Summary: Added `allow_compaction_failures_for_tablet_ids` tserver-side flag allowing to specify list of tablets for which we allow compactions to fail (for example due to on-disk data corruption) and that shoudn't fail subsequent writes to affected RocksDB instance. Should be used with extra care only for troubleshooting issues while/if we don't have other options to mitigate the problem. Jira: DB-16750 Test Plan: Manually tested with local cluster and sample workload: ``` ./bin/yb-ctl destroy && ./bin/yb-ctl start java -jar ~/code/yb-sample-apps/target/yb-sample-apps.jar --workload CassandraBatchTimeseries --nodes 127.0.0.1:9042 --num_threads_read 0 --num_threads_write 4 --nouuid --min_metrics_count 10000 --max_metrics_count 20000 --num_unique_keys 100000000 find ~/yugabyte-data -name "*.sst" // cd to created tablet RocksDB directory dd if=/dev/zero of=000010.sst.sblock.0 bs=1 conv=notrunc count=1000000 seek=100000 // Will crash on compaction export TABLET_ID=... ./bin/yb-ctl restart --tserver_flags '"allow_compaction_failures_for_tablet_ids=$TABLET_ID"' java -jar ~/code/yb-sample-apps/target/yb-sample-apps.jar --workload CassandraBatchTimeseries --nodes 127.0.0.1:9042 --num_threads_read 0 --num_threads_write 4 --nouuid --min_metrics_count 10000 --max_metrics_count 20000 --num_unique_keys 100000000 // Shouldn't crash on failed compactions, flushes should work // Query accesing corrupt data should fail, but tserver shouldn't crash ./bin/ycqlsh --request-timeout 600 -e 'select count(*) from ybdemo_keyspace.batch_ts_metrics_raw' ./build/latest/bin/yb-ts-cli set_flag allow_compaction_failures_for_tablet_ids "" // Should crash on next compaction attempt which includes 000010.sst ./bin/yb-ctl restart ./build/latest/bin/yb-ts-cli set_flag allow_compaction_failures_for_tablet_ids "$TABLET_ID" java -jar ~/code/yb-sample-apps/target/yb-sample-apps.jar --workload CassandraBatchTimeseries --nodes 127.0.0.1:9042 --num_threads_read 0 --num_threads_write 4 --nouuid --min_metrics_count 10000 --max_metrics_count 20000 --num_unique_keys 100000000 // Shouldn't crash on failed compactions, flushes should work ``` Reviewers: hsunder, yyan, rthallam, arybochkin Reviewed By: rthallam, arybochkin Subscribers: yql, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D44079
1 parent e1b746a commit 990c16f

15 files changed

+130
-8
lines changed

src/yb/master/sys_catalog.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,7 @@ Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMetadata
608608
.metric_registry = metric_registry_,
609609
.log_anchor_registry = tablet_peer()->log_anchor_registry(),
610610
.tablet_options = tablet_options,
611+
.mutable_tablet_options = tablet::MutableTabletOptions{},
611612
.log_prefix_suffix = " P " + tablet_peer()->permanent_uuid(),
612613
.transaction_participant_context = tablet_peer().get(),
613614
.local_tablet_filter = client::LocalTabletFilter(),

src/yb/rocksdb/db.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,12 @@ class DB {
930930
// Returns approximate middle key (see Version::GetMiddleKey).
931931
virtual yb::Result<std::string> GetMiddleKey() = 0;
932932

933+
// If true, will allow compactions to fail without setting bg_error and not causing writes to
934+
// fail. Should only be used with extra care for troubleshooting when/while there are no other
935+
// options available.
936+
// Default: false
937+
virtual void SetAllowCompactionFailures(AllowCompactionFailures allow_compaction_failures) = 0;
938+
933939
// Returns a table reader for the largest SST file.
934940
virtual yb::Result<TableReader*> TEST_GetLargestSstTableReader() {
935941
return STATUS(NotSupported, "");

src/yb/rocksdb/db/db_impl.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2603,7 +2603,8 @@ Status DBImpl::CompactFilesImpl(
26032603
"[%s] [JOB %d] Compaction error: %s",
26042604
c->column_family_data()->GetName().c_str(), job_context->job_id,
26052605
status.ToString().c_str());
2606-
if (db_options_.paranoid_checks && bg_error_.ok()) {
2606+
if (db_options_.paranoid_checks && !allow_compaction_failures_ &&
2607+
bg_error_.ok()) {
26072608
bg_error_ = status;
26082609
}
26092610
}
@@ -4046,7 +4047,8 @@ Result<FileNumbersHolder> DBImpl::BackgroundCompaction(
40464047
} else {
40474048
RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s",
40484049
status.ToString().c_str());
4049-
if (db_options_.paranoid_checks && bg_error_.ok()) {
4050+
if (db_options_.paranoid_checks && !allow_compaction_failures_ &&
4051+
bg_error_.ok()) {
40504052
bg_error_ = status;
40514053
}
40524054
}
@@ -7140,4 +7142,10 @@ size_t DBImpl::TEST_NumNotStartedCompactionsUnlocked(CompactionSizeKind compacti
71407142
});
71417143
}
71427144

7145+
void DBImpl::SetAllowCompactionFailures(AllowCompactionFailures allow_compaction_failures) {
7146+
if (allow_compaction_failures_.exchange(allow_compaction_failures) != allow_compaction_failures) {
7147+
LOG_WITH_PREFIX(INFO) << "allow_compaction_failures changed to: " << allow_compaction_failures;
7148+
}
7149+
}
7150+
71437151
} // namespace rocksdb

src/yb/rocksdb/db/db_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,8 @@ class DBImpl : public DB {
508508

509509
Result<std::string> GetMiddleKey() override;
510510

511+
void SetAllowCompactionFailures(AllowCompactionFailures allow_compaction_failures) override;
512+
511513
// Returns a table reader for the largest SST file.
512514
Result<TableReader*> TEST_GetLargestSstTableReader() override;
513515

@@ -1027,6 +1029,8 @@ class DBImpl : public DB {
10271029

10281030
std::function<void()> files_changed_listener_ GUARDED_BY(files_changed_listener_mutex_);
10291031

1032+
std::atomic<bool> allow_compaction_failures_{false};
1033+
10301034
// No copying allowed
10311035
DBImpl(const DBImpl&) = delete;
10321036
void operator=(const DBImpl&) = delete;

src/yb/rocksdb/db/db_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4754,6 +4754,10 @@ class ModelDB: public DB {
47544754
return NotSupported();
47554755
}
47564756

4757+
void SetAllowCompactionFailures(AllowCompactionFailures allow_compaction_failures) override {
4758+
LOG(FATAL) << "SetAllowCompactionFailures is not supported.";
4759+
}
4760+
47574761
private:
47584762
Status NotSupported() const {
47594763
return STATUS(NotSupported, "Not supported in Model DB");

src/yb/rocksdb/rocksdb_fwd.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class DirectWriteHandler;
6868

6969
YB_DEFINE_ENUM(FlushAbility, (kNoNewData)(kHasNewData)(kAlreadyFlushing));
7070

71+
YB_STRONGLY_TYPED_BOOL(AllowCompactionFailures);
72+
7173
// Frontier should be copyable, but should still preserve its polymorphic nature. We cannot use
7274
// shared_ptr here, because we are planning to modify the copied value. If we used shared_ptr and
7375
// modified the copied value, the original value would also change.

src/yb/tablet/tablet-harness.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ TabletInitData TabletHarness::MakeTabletInitData(const RaftGroupMetadataPtr& met
103103
.metric_registry = metrics_registry_.get(),
104104
.log_anchor_registry = new log::LogAnchorRegistry(),
105105
.tablet_options = TabletOptions(),
106+
.mutable_tablet_options = MutableTabletOptions{},
106107
.log_prefix_suffix = std::string(),
107108
.transaction_participant_context = nullptr,
108109
.local_tablet_filter = client::LocalTabletFilter(),

src/yb/tablet/tablet.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,7 @@ Tablet::Tablet(const TabletInitData& data)
649649
mvcc_(
650650
MakeTabletLogPrefix(data.metadata->raft_group_id(), data.log_prefix_suffix), data.clock),
651651
tablet_options_(data.tablet_options),
652+
mutable_tablet_options_(data.mutable_tablet_options),
652653
pending_op_counter_blocking_rocksdb_shutdown_start_(Format(
653654
"T $0 Read/write operations blocking start of RocksDB shutdown",
654655
metadata_->raft_group_id())),
@@ -1007,6 +1008,18 @@ Status Tablet::OpenKeyValueTablet() {
10071008
RETURN_NOT_OK(snapshots_->Open());
10081009
RETURN_NOT_OK(OpenRegularDB(common_options));
10091010
RETURN_NOT_OK(OpenIntentsDB(common_options));
1011+
rocksdb::AllowCompactionFailures allow_compaction_failures =
1012+
rocksdb::AllowCompactionFailures::kFalse;
1013+
{
1014+
SharedLock lock(mutable_tablet_options_mutex_);
1015+
allow_compaction_failures = mutable_tablet_options_.allow_compaction_failures;
1016+
}
1017+
for (auto* db : {regular_db_.get(), intents_db_.get()}) {
1018+
if (!db) {
1019+
continue;
1020+
}
1021+
db->SetAllowCompactionFailures(allow_compaction_failures);
1022+
}
10101023

10111024
// Don't allow reads at timestamps lower than the highest history cutoff of a past compaction.
10121025
auto regular_flushed_frontier = regular_db_->GetFlushedFrontier();
@@ -5553,6 +5566,21 @@ void Tablet::SetCompactFlushRateLimitBytesPerSec(int64_t bytes_per_sec) {
55535566
}
55545567
}
55555568

5569+
void Tablet::SetAllowCompactionFailures(
5570+
rocksdb::AllowCompactionFailures allow_compaction_failures) {
5571+
{
5572+
std::lock_guard lock(mutable_tablet_options_mutex_);
5573+
mutable_tablet_options_.allow_compaction_failures = allow_compaction_failures;
5574+
}
5575+
auto scoped_op = CreateScopedRWOperationBlockingRocksDbShutdownStart();
5576+
for (auto* db : {regular_db(), intents_db()}) {
5577+
if (!db) {
5578+
continue;
5579+
}
5580+
db->SetAllowCompactionFailures(allow_compaction_failures);
5581+
}
5582+
}
5583+
55565584
// ------------------------------------------------------------------------------------------------
55575585

55585586
Result<ScopedReadOperation> ScopedReadOperation::Create(

src/yb/tablet/tablet.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,8 @@ class Tablet : public AbstractTablet,
993993
void RefreshCompactFlushRateLimitBytesPerSec();
994994
void SetCompactFlushRateLimitBytesPerSec(int64_t bytes_per_sec);
995995

996+
void SetAllowCompactionFailures(rocksdb::AllowCompactionFailures allow_compaction_failures);
997+
996998
private:
997999
friend class Iterator;
9981000
friend class TabletPeerTest;
@@ -1153,6 +1155,9 @@ class Tablet : public AbstractTablet,
11531155
// For the block cache and memory manager shared across tablets
11541156
const TabletOptions tablet_options_;
11551157

1158+
mutable std::shared_mutex mutable_tablet_options_mutex_;
1159+
MutableTabletOptions mutable_tablet_options_ GUARDED_BY (mutable_tablet_options_mutex_);
1160+
11561161
// A lightweight way to reject new operations when the tablet is shutting down. This is used to
11571162
// prevent race conditions between destructing the RocksDB in-memory instance and read/write
11581163
// operations.

src/yb/tablet/tablet_bootstrap-test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ class BootstrapTest : public LogTestBase {
233233
.metric_registry = nullptr,
234234
.log_anchor_registry = log_anchor_registry,
235235
.tablet_options = tablet_options,
236+
.mutable_tablet_options = MutableTabletOptions{},
236237
.log_prefix_suffix = std::string(),
237238
.transaction_participant_context = nullptr,
238239
.local_tablet_filter = client::LocalTabletFilter(),

src/yb/tablet/tablet_options.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
#include "yb/docdb/local_waiting_txn_registry.h"
3131

32+
#include "yb/rocksdb/rocksdb_fwd.h"
33+
3234
#include "yb/server/server_fwd.h"
3335

3436
#include "yb/tablet/tablet_fwd.h"
@@ -64,6 +66,11 @@ struct TabletOptions {
6466
std::shared_ptr<rocksdb::RocksDBPriorityThreadPoolMetrics> priority_thread_pool_metrics;
6567
};
6668

69+
struct MutableTabletOptions {
70+
rocksdb::AllowCompactionFailures allow_compaction_failures =
71+
rocksdb::AllowCompactionFailures::kFalse;
72+
};
73+
6774
using TransactionManagerProvider = std::function<client::TransactionManager&()>;
6875

6976
YB_DEFINE_ENUM(VectorIndexThreadPoolType, (kBackground)(kBackfill)(kInsert));
@@ -81,7 +88,8 @@ struct TabletInitData {
8188
std::shared_ptr<MemTracker> block_based_table_mem_tracker;
8289
MetricRegistry* metric_registry = nullptr;
8390
log::LogAnchorRegistryPtr log_anchor_registry;
84-
const TabletOptions tablet_options;
91+
TabletOptions tablet_options;
92+
MutableTabletOptions mutable_tablet_options;
8593
std::string log_prefix_suffix;
8694
TransactionParticipantContext* transaction_participant_context = nullptr;
8795
client::LocalTabletFilter local_tablet_filter;

src/yb/tools/fs_tool.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ Status FsTool::DumpTabletData(const std::string& tablet_id) {
272272
.metric_registry = nullptr,
273273
.log_anchor_registry = reg.get(),
274274
.tablet_options = tablet_options,
275+
.mutable_tablet_options = tablet::MutableTabletOptions{},
275276
.log_prefix_suffix = std::string(),
276277
.transaction_participant_context = nullptr,
277278
.local_tablet_filter = client::LocalTabletFilter(),

src/yb/tools/sys-catalog-tool.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ Status MiniSysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMeta
205205
.metric_registry = metric_registry_.get(),
206206
.log_anchor_registry = nullptr,
207207
.tablet_options = tablet_options_,
208+
.mutable_tablet_options = tablet::MutableTabletOptions{},
208209
.log_prefix_suffix = "",
209210
.transaction_participant_context = tablet_peer_.get(),
210211
.local_tablet_filter = nullptr,

src/yb/tserver/ts_tablet_manager.cc

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,12 @@ DEFINE_RUNTIME_bool(enable_copy_retryable_requests_from_parent, true,
298298
"Whether to copy retryable requests from parent tablet when opening"
299299
"the child tablet");
300300

301+
DEFINE_RUNTIME_string(allow_compaction_failures_for_tablet_ids, "",
302+
"List of tablet IDs for which compaction failures are allowed and will not cause write "
303+
"failures and FATALs.");
304+
TAG_FLAG(allow_compaction_failures_for_tablet_ids, hidden);
305+
TAG_FLAG(allow_compaction_failures_for_tablet_ids, advanced);
306+
301307
DEFINE_NON_RUNTIME_uint32(deleted_tablet_cache_max_size, 10000,
302308
"Maximum size for the cache of recently deleted tablet ids. Used to "
303309
"reject remote bootstrap requests for recently deleted tablets.");
@@ -629,10 +635,14 @@ Status TSTabletManager::Init() {
629635
tablet_options_.rate_limiter = docdb::CreateRocksDBRateLimiter();
630636
}
631637

632-
rate_limiter_flag_callback_ = CHECK_RESULT(RegisterFlagUpdateCallback(
638+
flag_callbacks_.emplace_back(VERIFY_RESULT(RegisterFlagUpdateCallback(
639+
&FLAGS_allow_compaction_failures_for_tablet_ids,
640+
"allow_compaction_failures_for_tablet_ids",
641+
[this] { UpdateAllowCompactionFailures(); })));
642+
flag_callbacks_.emplace_back(VERIFY_RESULT(RegisterFlagUpdateCallback(
633643
&FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec,
634644
"RocksDBCompactFlushRateLimiter",
635-
[this] { UpdateCompactFlushRateLimitBytesPerSec(); }));
645+
[this] { UpdateCompactFlushRateLimitBytesPerSec(); })));
636646

637647
// Start the threadpool we'll use to open tablets.
638648
// This has to be done in Init() instead of the constructor, since the
@@ -771,6 +781,11 @@ Status TSTabletManager::Init() {
771781

772782
{
773783
std::lock_guard lock(mutex_);
784+
allow_compaction_failures_for_tablet_ids_ = FLAGS_allow_compaction_failures_for_tablet_ids;
785+
if (!allow_compaction_failures_for_tablet_ids_.empty()) {
786+
LOG_WITH_PREFIX(INFO) << "Flag allow_compaction_failures_for_tablet_ids is set to: "
787+
<< allow_compaction_failures_for_tablet_ids_;
788+
}
774789
state_ = MANAGER_RUNNING;
775790
}
776791

@@ -2058,6 +2073,14 @@ void TSTabletManager::OpenTablet(const RaftGroupMetadataPtr& meta,
20582073
}
20592074
TEST_PAUSE_IF_FLAG(TEST_pause_after_set_bootstrapping);
20602075

2076+
rocksdb::AllowCompactionFailures allow_compaction_failures =
2077+
rocksdb::AllowCompactionFailures::kFalse;
2078+
{
2079+
SharedLock lock(mutex_);
2080+
allow_compaction_failures = rocksdb::AllowCompactionFailures(
2081+
allow_compaction_failures_for_tablet_ids_.contains(tablet_id));
2082+
}
2083+
20612084
tablet::TabletInitData tablet_init_data = {
20622085
.metadata = meta,
20632086
.client_future = server_->client_future(),
@@ -2067,6 +2090,9 @@ void TSTabletManager::OpenTablet(const RaftGroupMetadataPtr& meta,
20672090
.metric_registry = metric_registry_,
20682091
.log_anchor_registry = tablet_peer->log_anchor_registry(),
20692092
.tablet_options = tablet_options_,
2093+
.mutable_tablet_options = {
2094+
.allow_compaction_failures = allow_compaction_failures,
2095+
},
20702096
.log_prefix_suffix = " P " + tablet_peer->permanent_uuid(),
20712097
.transaction_participant_context = tablet_peer.get(),
20722098
.local_tablet_filter = std::bind(&TSTabletManager::PreserveLocalLeadersOnly, this, _1),
@@ -2257,7 +2283,10 @@ void TSTabletManager::StartShutdown() {
22572283
}
22582284
}
22592285

2260-
rate_limiter_flag_callback_.Deregister();
2286+
for (auto& callback : flag_callbacks_) {
2287+
callback.Deregister();
2288+
}
2289+
flag_callbacks_.clear();
22612290

22622291
{
22632292
std::lock_guard lock(service_registration_mutex_);
@@ -3534,6 +3563,26 @@ void TSTabletManager::UpdateCompactFlushRateLimitBytesPerSec() {
35343563
}
35353564
}
35363565

3566+
void TSTabletManager::UpdateAllowCompactionFailures() {
3567+
std::string allow_compaction_failures_for_tablet_ids;
3568+
{
3569+
std::lock_guard lock(mutex_);
3570+
allow_compaction_failures_for_tablet_ids_ = FLAGS_allow_compaction_failures_for_tablet_ids;
3571+
allow_compaction_failures_for_tablet_ids = allow_compaction_failures_for_tablet_ids_;
3572+
}
3573+
for (const auto& tablet_peer : GetTabletPeers()) {
3574+
const auto shared_tablet = tablet_peer->shared_tablet();
3575+
if (!shared_tablet) {
3576+
continue;
3577+
}
3578+
3579+
const auto allow_compaction_failures = rocksdb::AllowCompactionFailures(
3580+
allow_compaction_failures_for_tablet_ids.contains(shared_tablet->tablet_id()));
3581+
3582+
shared_tablet->SetAllowCompactionFailures(allow_compaction_failures);
3583+
}
3584+
}
3585+
35373586
rpc::ThreadPool* TSTabletManager::VectorIndexThreadPool(tablet::VectorIndexThreadPoolType type) {
35383587
auto& thread_pool_ptr = vector_index_thread_pools_[std::to_underlying(type)];
35393588
auto result = thread_pool_ptr.get();

src/yb/tserver/ts_tablet_manager.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,8 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table
644644

645645
void UpdateCompactFlushRateLimitBytesPerSec();
646646

647+
void UpdateAllowCompactionFailures();
648+
647649
rpc::ThreadPool* VectorIndexThreadPool(tablet::VectorIndexThreadPoolType type);
648650
PriorityThreadPool* VectorIndexPriorityThreadPool(tablet::VectorIndexPriorityThreadPoolType type);
649651

@@ -822,8 +824,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table
822824
std::shared_ptr<client::YBMetaDataCache> metadata_cache_holder_;
823825
std::atomic<client::YBMetaDataCache*> metadata_cache_;
824826

825-
// Callback for FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec update handling.
826-
FlagCallbackRegistration rate_limiter_flag_callback_;
827+
std::vector<FlagCallbackRegistration> flag_callbacks_;
828+
829+
std::string allow_compaction_failures_for_tablet_ids_ GUARDED_BY(mutex_);
827830

828831
std::mutex vector_index_thread_pool_mutex_;
829832
std::array<AtomicUniquePtr<rpc::ThreadPool>, tablet::kVectorIndexThreadPoolTypeMapSize>

0 commit comments

Comments
 (0)