diff --git a/.clang-tidy b/.clang-tidy index 5da6c33f1106..be27a81823aa 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -12,6 +12,7 @@ Checks: > performance*, cert*, -cert-err58-cpp, + -cert-dcl58-cpp, # Ignore std changes # Doesn't work with abseil flags clang-analyzer*, google-*, @@ -81,4 +82,9 @@ Checks: > # modernize-use-nullptr, # modernize-use-equals-default, # readability-qualified-auto, -cppcoreguidelines-narrowing-conversions.WarnOnIntegerNarrowingConversion: 'false' + +CheckOptions: + - key: bugprone-narrowing-conversions.WarnOnIntegerNarrowingConversion + value: false + - key: bugprone-narrowing-conversions.WarnOnEquivalentBitWidth + value: false diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index 6d2b86a45286..05b79db8ed62 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -55,9 +55,6 @@ jobs: aws-access-key-id: ${{ secrets.AWS_S3_ACCESS_KEY }} aws-secret-access-key: ${{ secrets.AWS_S3_ACCESS_SECRET }} s3-bucket: ${{ secrets.S3_REGTEST_BUCKET }} - # Chain ternary oprator of the form (which can be nested) - # (expression == condition && || ) - epoll: ${{ matrix.proactor == 'Epoll' && 'epoll' || 'iouring' }} - name: Upload logs on failure if: failure() diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e5a1acce6c93..a320c31c07c0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -22,8 +22,8 @@ cd build-dbg && ninja dragonfly cd dragonfly # project root # Make sure you have 'pre-commit', 'clang-format' and black is installed -pip install pre-commit clang-format -pip install pre-commit black +pipx install pre-commit clang-format +pipx install pre-commit black # IMPORTANT! Enable our pre-commit message hooks # This will ensure your commits match our formatting requirements diff --git a/src/core/dfly_core_test.cc b/src/core/dfly_core_test.cc index 3eb6bdafe068..e8ca8a66930b 100644 --- a/src/core/dfly_core_test.cc +++ b/src/core/dfly_core_test.cc @@ -155,6 +155,7 @@ TEST_F(StringMatchTest, Basic) { // Wildcards EXPECT_EQ(MatchLen("*", "hello", 0), 1); + EXPECT_EQ(MatchLen("*", "1234567890123456", 0), 1); EXPECT_EQ(MatchLen("h*", "hello", 0), 1); EXPECT_EQ(MatchLen("h*", "abc", 0), 0); EXPECT_EQ(MatchLen("h*o", "hello", 0), 1); diff --git a/src/core/glob_matcher.cc b/src/core/glob_matcher.cc index 24bee465e49d..72ae36a3142c 100644 --- a/src/core/glob_matcher.cc +++ b/src/core/glob_matcher.cc @@ -236,7 +236,11 @@ GlobMatcher::GlobMatcher(string_view pattern, bool case_sensitive) regex.push_back('i'); } regex.push_back(')'); - regex.append(Glob2Regex(pattern)); + if (pattern.empty()) { + regex.append(Glob2Regex("*")); + } else { + regex.append(Glob2Regex(pattern)); + } matcher_.pattern(regex); #elif USE_PCRE2 string regex("(?s"); // dotall mode diff --git a/src/core/search/indices.cc b/src/core/search/indices.cc index 9d77f1a1b5f9..9f739b05cf65 100644 --- a/src/core/search/indices.cc +++ b/src/core/search/indices.cc @@ -281,7 +281,10 @@ struct HnswlibAdapter { } void Remove(DocId id) { - world_.markDelete(id); + try { + world_.markDelete(id); + } catch (const std::exception& e) { + } } vector> Knn(float* target, size_t k, std::optional ef) { diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 4c325e092d8b..8221193038ba 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -13,6 +13,7 @@ #include #include +#include "base/cycle_clock.h" #include "base/flags.h" #include "base/histogram.h" #include "base/io_buf.h" @@ -23,7 +24,9 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "glog/logging.h" #include "io/file.h" +#include "util/fibers/fibers.h" #include "util/fibers/proactor_base.h" #ifdef DFLY_USE_SSL @@ -93,6 +96,10 @@ ABSL_FLAG(bool, migrate_connections, true, "they operate. Currently this is only supported for Lua script invocations, and can " "happen at most once per connection."); +ABSL_FLAG(uint32_t, max_busy_read_usec, 100, + "Maximum time we read and parse from " + "a socket without yielding. In microseconds."); + using namespace util; using namespace std; using absl::GetFlag; @@ -146,7 +153,7 @@ struct TrafficLogger { void TrafficLogger::ResetLocked() { if (log_file) { - log_file->Close(); + std::ignore = log_file->Close(); log_file.reset(); } } @@ -196,7 +203,7 @@ void OpenTrafficLogger(string_view base_path) { // Write version, incremental numbering :) uint8_t version[1] = {2}; - tl_traffic_logger.log_file->Write(version); + std::ignore = tl_traffic_logger.log_file->Write(version); } void LogTraffic(uint32_t id, bool has_more, absl::Span resp, @@ -876,6 +883,7 @@ pair Connection::GetClientInfoBeforeAfterTid() const { absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id)); if (dispatch_q_.size()) { absl::StrAppend(&after, " pipeline=", dispatch_q_.size()); + absl::StrAppend(&after, " pbuf=", pending_pipeline_bytes_); } absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_); string_view phase_name = PHASE_NAMES[phase_]; @@ -1028,7 +1036,7 @@ void Connection::ConnectionFlow() { if (io_buf_.InputLen() > 0) { phase_ = PROCESS; if (redis_parser_) { - parse_status = ParseRedis(); + parse_status = ParseRedis(10000); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); @@ -1136,19 +1144,6 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ // Dispatch async if we're handling a pipeline or if we can't dispatch sync. if (optimize_for_async || !can_dispatch_sync) { SendAsync(cmd_msg_cb()); - - auto epoch = fb2::FiberSwitchEpoch(); - - if (async_fiber_epoch_ == epoch) { - // If we pushed too many items without context switching - yield - if (++async_streak_len_ >= 10 && !cc_->async_dispatch) { - async_streak_len_ = 0; - ThisFiber::Yield(); - } - } else { - async_streak_len_ = 0; - async_fiber_epoch_ = epoch; - } } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. { @@ -1164,20 +1159,17 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ } } -Connection::ParserStatus Connection::ParseRedis() { +Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) { uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; - // Re-use connection local resources to reduce allocations - RespVec& parse_args = tmp_parse_args_; - CmdArgVec& cmd_vec = tmp_cmd_vec_; - - auto dispatch_sync = [this, &parse_args, &cmd_vec] { - RespExpr::VecToArgList(parse_args, &cmd_vec); - service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_.get(), cc_.get()); + auto dispatch_sync = [this] { + RespExpr::VecToArgList(tmp_parse_args_, &tmp_cmd_vec_); + service_->DispatchCommand(absl::MakeSpan(tmp_cmd_vec_), reply_builder_.get(), cc_.get()); }; - auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle { - return {FromArgs(std::move(parse_args), tlh)}; + + auto dispatch_async = [this, tlh = mi_heap_get_backing()]() -> MessageHandle { + return {FromArgs(std::move(tmp_parse_args_), tlh)}; }; ReadBuffer read_buffer = GetReadBuffer(); @@ -1186,10 +1178,10 @@ Connection::ParserStatus Connection::ParseRedis() { if (read_buffer.ShouldAdvance()) { // can happen only with io_uring/bundles read_buffer.slice = NextBundleBuffer(read_buffer.available_bytes); } - result = redis_parser_->Parse(read_buffer.slice, &consumed, &parse_args); + result = redis_parser_->Parse(read_buffer.slice, &consumed, &tmp_parse_args_); request_consumed_bytes_ += consumed; - if (result == RedisParser::OK && !parse_args.empty()) { - if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING) + if (result == RedisParser::OK && !tmp_parse_args_.empty()) { + if (RespExpr& first = tmp_parse_args_.front(); first.type == RespExpr::STRING) DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf()); if (io_req_size_hist) @@ -1198,12 +1190,20 @@ Connection::ParserStatus Connection::ParseRedis() { bool has_more = consumed < read_buffer.available_bytes; if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) { - LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get())); + LogTraffic(id_, has_more, absl::MakeSpan(tmp_parse_args_), + service_->GetContextInfo(cc_.get())); } DispatchSingle(has_more, dispatch_sync, dispatch_async); } read_buffer.Consume(consumed); + + // We must yield from time to time to allow other fibers to run. + // Specifically, if a client sends a huge chunk of data resulting in a very long pipeline, + // we want to yield to allow AsyncFiber to actually execute on the pending pipeline. + if (ThisFiber::GetRunningTimeCycles() > max_busy_cycles) { + ThisFiber::Yield(); + } } while (RedisParser::OK == result && read_buffer.available_bytes > 0 && !reply_builder_->GetError()); @@ -1390,6 +1390,9 @@ auto Connection::IoLoop() -> variant { ParserStatus parse_status = OK; size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len); + unsigned max_busy_read_cycles = + (base::CycleClock::Frequency() * GetFlag(FLAGS_max_busy_read_usec)) / 1000000U; + auto* peer = socket_.get(); recv_buf_.res_len = 0; @@ -1404,12 +1407,16 @@ auto Connection::IoLoop() -> variant { bool is_iobuf_full = io_buf_.AppendLen() == 0; if (redis_parser_) { - parse_status = ParseRedis(); + parse_status = ParseRedis(max_busy_read_cycles); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); } + if (reply_builder_->GetError()) { + return reply_builder_->GetError(); + } + if (parse_status == NEED_MORE) { parse_status = OK; @@ -1429,11 +1436,9 @@ auto Connection::IoLoop() -> variant { [&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); }); } - // If we got a partial request and we couldn't parse the length, just - // double the capacity. // If we got a partial request because iobuf was full, grow it up to // a reasonable limit to save on Recv() calls. - if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) { + if (is_iobuf_full && capacity < max_iobfuf_len / 2) { // Last io used most of the io_buf to the end. UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.Reserve(capacity * 2); // Valid growth range. @@ -1441,21 +1446,11 @@ auto Connection::IoLoop() -> variant { } DCHECK_GT(io_buf_.AppendLen(), 0U); - } else if (io_buf_.AppendLen() == 0) { - // We have a full buffer and we can not progress with parsing. - // This means that we have request too large. - LOG(ERROR) << "Request is too large, closing connection"; - parse_status = ERROR; - break; } } else if (parse_status != OK) { break; } - ec = reply_builder_->GetError(); - } while (peer->IsOpen() && !ec); - - if (ec) - return ec; + } while (peer->IsOpen()); return parse_status; } @@ -1833,6 +1828,7 @@ void Connection::SendAsync(MessageHandle msg) { // Squashing is only applied to redis commands if (std::holds_alternative(msg.handle)) { pending_pipeline_cmd_cnt_++; + pending_pipeline_bytes_ += used_mem; } if (msg.IsControl()) { @@ -1869,7 +1865,10 @@ void Connection::RecycleMessage(MessageHandle msg) { // Retain pipeline message in pool. if (auto* pipe = get_if(&msg.handle); pipe) { + DCHECK_GE(pending_pipeline_bytes_, used_mem); + DCHECK_GE(pending_pipeline_cmd_cnt_, 1u); pending_pipeline_cmd_cnt_--; + pending_pipeline_bytes_ -= used_mem; if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) { stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity(); pipeline_req_pool_.push_back(std::move(*pipe)); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3b3095a1ee53..5e1e1be97f3a 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -367,7 +367,7 @@ class Connection : public util::Connection { // Create new pipeline request, re-use from pool when possible. PipelineMessagePtr FromArgs(RespVec args, mi_heap_t* heap); - ParserStatus ParseRedis(); + ParserStatus ParseRedis(unsigned max_busy_cycles); ParserStatus ParseMemcache(); void OnBreakCb(int32_t mask); @@ -427,6 +427,7 @@ class Connection : public util::Connection { util::fb2::Fiber async_fb_; // async fiber (if started) uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q + size_t pending_pipeline_bytes_ = 0; // how many bytes of the queued Redis async commands // how many bytes of the current request have been consumed size_t request_consumed_bytes_ = 0; @@ -455,10 +456,6 @@ class Connection : public util::Connection { unsigned parser_error_ = 0; - // amount of times we enqued requests asynchronously during the same async_fiber_epoch_. - unsigned async_streak_len_ = 0; - uint64_t async_fiber_epoch_ = 0; - BreakerCb breaker_cb_; // Used by redis parser to avoid allocations diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 1b8932838114..70f0feea540c 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -63,7 +63,8 @@ struct ArgRange { ArgRange(ArgRange& range) : ArgRange((const ArgRange&)range) { } - template ArgRange(T&& span) : span(std::forward(span)) { + template , bool> = true> + ArgRange(T&& span) : span(std::forward(span)) { // NOLINT google-explicit-constructor) } size_t Size() const { @@ -163,7 +164,9 @@ struct ErrorReply { std::string_view kind = {}) // to resolve ambiguity of constructors above : message{std::string_view{msg}}, kind{kind} { } - ErrorReply(OpStatus status) : message{}, kind{}, status{status} { + + ErrorReply(OpStatus status) // NOLINT google-explicit-constructor) + : status{status} { } std::string_view ToSv() const { diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index c550d8cdd495..9a58e82b8c7c 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -728,9 +728,17 @@ ResultType Get::ApplyTo(Overflow ov, const string* bitfield) { const size_t offset = attr_.offset; auto last_byte_offset = GetByteIndex(attr_.offset + attr_.encoding_bit_size - 1); + if (GetByteIndex(offset) >= total_bytes) { + return 0; + } + + const string* result_str = bitfield; + string buff; uint32_t lsb = attr_.offset + attr_.encoding_bit_size - 1; - if (last_byte_offset > total_bytes) { - return {}; + if (last_byte_offset >= total_bytes) { + buff = *bitfield; + buff.resize(last_byte_offset + 1, 0); + result_str = &buff; } const bool is_negative = @@ -738,7 +746,7 @@ ResultType Get::ApplyTo(Overflow ov, const string* bitfield) { int64_t result = 0; for (size_t i = 0; i < attr_.encoding_bit_size; ++i) { - uint8_t byte{GetByteValue(bytes, lsb)}; + uint8_t byte{GetByteValue(*result_str, lsb)}; int32_t index = GetNormalizedBitIndex(lsb); int64_t old_bit = CheckBitStatus(byte, index); result |= old_bit << i; @@ -830,10 +838,11 @@ ResultType IncrBy::ApplyTo(Overflow ov, string* bitfield) { string& bytes = *bitfield; Get get(attr_); auto res = get.ApplyTo(ov, &bytes); + const int32_t total_bytes = static_cast(bytes.size()); + auto last_byte_offset = GetByteIndex(attr_.offset + attr_.encoding_bit_size - 1); - if (!res) { - Set set(attr_, incr_value_); - return set.ApplyTo(ov, &bytes); + if (last_byte_offset >= total_bytes) { + bytes.resize(last_byte_offset + 1, 0); } if (!HandleOverflow(ov, &*res)) { diff --git a/src/server/bitops_family_test.cc b/src/server/bitops_family_test.cc index 17bcca6a0161..82ec8b209709 100644 --- a/src/server/bitops_family_test.cc +++ b/src/server/bitops_family_test.cc @@ -805,4 +805,21 @@ TEST_F(BitOpsFamilyTest, BitFieldOperations) { ASSERT_THAT(Run({"bitfield", "foo", "get", "u1", "15"}), IntArg(1)); } +TEST_F(BitOpsFamilyTest, BitFieldLargeOffset) { + Run({"set", "foo", "bar"}); + + auto resp = Run({"bitfield", "foo", "get", "u32", "0", "overflow", "fail", "incrby", "u32", "0", + "4294967295"}); + EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(1650553344), ArgType(RespExpr::NIL)))); + + resp = Run({"strlen", "foo"}); + EXPECT_THAT(resp, 4); + + resp = Run({"get", "foo"}); + EXPECT_THAT(ToSV(resp.GetBuf()), Eq(std::string_view("bar\0", 4))); + + resp = Run({"bitfield", "foo", "get", "u32", "4294967295"}); + EXPECT_THAT(resp, 0); +} + } // end of namespace dfly diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 3581f0e6eacd..52d47b07be22 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -311,6 +311,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), nullptr, ClientPause::ALL, is_pause_in_progress); + DCHECK(pause_fb_opt); if (!pause_fb_opt) { auto err = absl::StrCat("Migration finalization time out ", cf_->MyID(), " : ", migration_info_.node_info.id, " attempt ", attempt); diff --git a/src/server/cluster_support.cc b/src/server/cluster_support.cc index aea847ed015d..47754287da01 100644 --- a/src/server/cluster_support.cc +++ b/src/server/cluster_support.cc @@ -17,6 +17,10 @@ ABSL_FLAG(string, cluster_mode, "", "Cluster mode supported. Possible values are " "'emulated', 'yes' or ''"); +ABSL_FLAG(bool, experimental_cluster_shard_by_slot, false, + "If true, cluster mode is enabled and sharding is done by slot. " + "Otherwise, sharding is done by hash tag."); + namespace dfly { void UniqueSlotChecker::Add(std::string_view key) { @@ -43,16 +47,13 @@ optional UniqueSlotChecker::GetUniqueSlotId() const { return slot_id_ > kMaxSlotNum ? optional() : slot_id_; } -namespace { -enum class ClusterMode { - kUninitialized, - kNoCluster, - kEmulatedCluster, - kRealCluster, -}; - +namespace detail { ClusterMode cluster_mode = ClusterMode::kUninitialized; -} // namespace +bool cluster_shard_by_slot = false; + +} // namespace detail + +using namespace detail; void InitializeCluster() { string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode); @@ -67,14 +68,10 @@ void InitializeCluster() { LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting..."; exit(1); } -} -bool IsClusterEnabled() { - return cluster_mode == ClusterMode::kRealCluster; -} - -bool IsClusterEmulated() { - return cluster_mode == ClusterMode::kEmulatedCluster; + if (cluster_mode != ClusterMode::kNoCluster) { + cluster_shard_by_slot = absl::GetFlag(FLAGS_experimental_cluster_shard_by_slot); + } } SlotId KeySlot(std::string_view key) { @@ -82,10 +79,6 @@ SlotId KeySlot(std::string_view key) { return crc16(tag.data(), tag.length()) & kMaxSlotNum; } -bool IsClusterEnabledOrEmulated() { - return IsClusterEnabled() || IsClusterEmulated(); -} - bool IsClusterShardedByTag() { return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled; } diff --git a/src/server/cluster_support.h b/src/server/cluster_support.h index 48de740cfd37..01a230531b68 100644 --- a/src/server/cluster_support.h +++ b/src/server/cluster_support.h @@ -10,6 +10,20 @@ namespace dfly { +namespace detail { + +enum class ClusterMode { + kUninitialized, + kNoCluster, + kEmulatedCluster, + kRealCluster, +}; + +extern ClusterMode cluster_mode; +extern bool cluster_shard_by_slot; + +}; // namespace detail + using SlotId = std::uint16_t; constexpr SlotId kMaxSlotNum = 0x3FFF; @@ -42,9 +56,23 @@ class UniqueSlotChecker { SlotId KeySlot(std::string_view key); void InitializeCluster(); -bool IsClusterEnabled(); -bool IsClusterEmulated(); -bool IsClusterEnabledOrEmulated(); + +inline bool IsClusterEnabled() { + return detail::cluster_mode == detail::ClusterMode::kRealCluster; +} + +inline bool IsClusterEmulated() { + return detail::cluster_mode == detail::ClusterMode::kEmulatedCluster; +} + +inline bool IsClusterEnabledOrEmulated() { + return IsClusterEnabled() || IsClusterEmulated(); +} + +inline bool IsClusterShardedBySlot() { + return detail::cluster_shard_by_slot; +} + bool IsClusterShardedByTag(); } // namespace dfly diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index a8c5b9aae7d3..4cd9b0e6e694 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -32,51 +32,62 @@ static void SendSubscriptionChangedResponse(string_view action, std::optionalSendLong(count); } -StoredCmd::StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode) - : cid_{cid}, buffer_{}, sizes_(args.size()), reply_mode_{mode} { +StoredCmd::StoredCmd(const CommandId* cid, bool own_args, ArgSlice args) + : cid_{cid}, args_{args}, reply_mode_{facade::ReplyMode::FULL} { + if (!own_args) + return; + + auto& own_storage = args_.emplace(args.size()); size_t total_size = 0; for (auto args : args) { total_size += args.size(); } - - buffer_.resize(total_size); - char* next = buffer_.data(); + own_storage.buffer.resize(total_size); + char* next = own_storage.buffer.data(); for (unsigned i = 0; i < args.size(); i++) { if (args[i].size() > 0) memcpy(next, args[i].data(), args[i].size()); next += args[i].size(); - sizes_[i] = args[i].size(); + own_storage.sizes[i] = args[i].size(); } } StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, ArgSlice args, facade::ReplyMode mode) - : cid_{cid}, buffer_{std::move(buffer)}, sizes_(args.size()), reply_mode_{mode} { + : cid_{cid}, args_{OwnStorage{args.size()}}, reply_mode_{mode} { + OwnStorage& own_storage = std::get(args_); + own_storage.buffer = std::move(buffer); + for (unsigned i = 0; i < args.size(); i++) { // Assume tightly packed list. DCHECK(i + 1 == args.size() || args[i].data() + args[i].size() == args[i + 1].data()); - sizes_[i] = args[i].size(); + own_storage.sizes[i] = args[i].size(); } } -void StoredCmd::Fill(absl::Span args) { - DCHECK_GE(args.size(), sizes_.size()); - - unsigned offset = 0; - for (unsigned i = 0; i < sizes_.size(); i++) { - args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]}; - offset += sizes_[i]; - } -} - -size_t StoredCmd::NumArgs() const { - return sizes_.size(); +CmdArgList StoredCmd::ArgList(CmdArgVec* scratch) const { + return std::visit( + Overloaded{[&](const OwnStorage& s) { + unsigned offset = 0; + scratch->resize(s.sizes.size()); + for (unsigned i = 0; i < s.sizes.size(); i++) { + (*scratch)[i] = string_view{s.buffer.data() + offset, s.sizes[i]}; + offset += s.sizes[i]; + } + return CmdArgList{*scratch}; + }, + [&](const CmdArgList& s) { return s; }}, + args_); } std::string StoredCmd::FirstArg() const { - if (sizes_.size() == 0) { + if (NumArgs() == 0) { return {}; } - return buffer_.substr(0, sizes_[0]); + return std::visit(Overloaded{[&](const OwnStorage& s) { return s.buffer.substr(0, s.sizes[0]); }, + [&](const ArgSlice& s) { + return std::string{s[0].data(), s[0].size()}; + }}, + args_); } facade::ReplyMode StoredCmd::ReplyMode() const { @@ -91,9 +102,14 @@ template size_t IsStoredInlined(const C& c) { } size_t StoredCmd::UsedMemory() const { - size_t buffer_size = IsStoredInlined(buffer_) ? 0 : buffer_.size(); - size_t sz_size = IsStoredInlined(sizes_) ? 0 : sizes_.size() * sizeof(uint32_t); - return buffer_size + sz_size; + return std::visit(Overloaded{[&](const OwnStorage& s) { + size_t buffer_size = + IsStoredInlined(s.buffer) ? 0 : s.buffer.capacity(); + size_t sz_size = IsStoredInlined(s.sizes) ? 0 : s.sizes.memsize(); + return buffer_size + sz_size; + }, + [&](const ArgSlice&) -> size_t { return 0U; }}, + args_); } const CommandId* StoredCmd::Cid() const { @@ -157,6 +173,11 @@ void ConnectionContext::ChangeMonitor(bool start) { EnableMonitoring(start); } +void ConnectionContext::SwitchTxCmd(const CommandId* cid) { + transaction->MultiSwitchCmd(cid); + this->cid = cid; +} + void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args, facade::RedisReplyBuilder* rb) { vector result = ChangeSubscriptions(args, false, to_add, to_reply); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 92542ff06268..aabcab5cf2a4 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -8,6 +8,7 @@ #include #include "acl/acl_commands_def.h" +#include "core/overloaded.h" #include "facade/acl_commands_def.h" #include "facade/conn_context.h" #include "facade/reply_capture.h" @@ -27,25 +28,20 @@ struct FlowInfo; // Used for storing MULTI/EXEC commands. class StoredCmd { public: - StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode = facade::ReplyMode::FULL); + StoredCmd(const CommandId* cid, bool own_args, CmdArgList args); // Create on top of already filled tightly-packed buffer. - StoredCmd(std::string&& buffer, const CommandId* cid, ArgSlice args, - facade::ReplyMode mode = facade::ReplyMode::FULL); + StoredCmd(std::string&& buffer, const CommandId* cid, CmdArgList args, facade::ReplyMode mode); - size_t NumArgs() const; - - size_t UsedMemory() const; - - // Fill the arg list with stored arguments, it should be at least of size NumArgs(). - // Between filling and invocation, cmd should NOT be moved. - void Fill(absl::Span args); - - void Fill(CmdArgVec* dest) { - dest->resize(sizes_.size()); - Fill(absl::MakeSpan(*dest)); + size_t NumArgs() const { + return std::visit(Overloaded{// + [](const OwnStorage& s) { return s.sizes.size(); }, + [](const CmdArgList& s) { return s.size(); }}, + args_); } + size_t UsedMemory() const; + facade::CmdArgList ArgList(CmdArgVec* scratch) const; std::string FirstArg() const; const CommandId* Cid() const; @@ -53,10 +49,16 @@ class StoredCmd { facade::ReplyMode ReplyMode() const; private: - const CommandId* cid_; // underlying command - std::string buffer_; // underlying buffer - absl::FixedArray sizes_; // sizes of arg part - facade::ReplyMode reply_mode_; // reply mode + const CommandId* cid_; // underlying command + struct OwnStorage { + std::string buffer; // underlying buffer + absl::FixedArray sizes; // sizes of arg part + explicit OwnStorage(size_t sz) : sizes(sz) { + } + }; + + std::variant args_; // args storage + facade::ReplyMode reply_mode_; // reply mode }; struct ConnectionState { @@ -301,6 +303,7 @@ class ConnectionContext : public facade::ConnectionContext { void UnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb); void PUnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb); void ChangeMonitor(bool start); // either start or stop monitor on a given connection + void SwitchTxCmd(const CommandId* cid); size_t UsedMemory() const override; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 38d3d8e8164a..9b4a457c812c 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -1309,9 +1309,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat args_view.push_back(arg); } auto args_span = absl::MakeSpan(args_view); - - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; + local_cntx.SwitchTxCmd(cid); crb.SetReplyMode(ReplyMode::NONE); stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span); @@ -1332,8 +1330,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat args_view.push_back(arg); } auto args_span = absl::MakeSpan(args_view); - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; + local_cntx.SwitchTxCmd(cid); crb.SetReplyMode(ReplyMode::NONE); stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span); sf_.service().InvokeCmd(cid, args_span, &crb, &local_cntx); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 2ee33cd358b3..8dbc97137f6e 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -1,4 +1,3 @@ - // Copyright 2023, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -649,30 +648,36 @@ void sigill_hdlr(int signo) { } void PrintBasicUsageInfo() { - std::cout << " .--::--. \n"; - std::cout << " :+*=: =@@@@@@@@= :+*+: \n"; - std::cout << " %@@@@@@%*=. =@@@@@@@@- .=*%@@@@@@# \n"; - std::cout << " @@@@@@@@@@@@#+-. .%@@@@#. .-+#@@@@@@@@@@@% \n"; - std::cout << " -@@@@@@@@@@@@@@@@*:#@@#:*@@@@@@@@@@@@@@@@- \n"; - std::cout << " :+*********####-%@%%@%-####********++. \n"; - std::cout << " .%@@@@@@@@@@@@@%:@@@@@@:@@@@@@@@@@@@@@% \n"; - std::cout << " .@@@@@@@@%*+-: =@@@@= .:-+*%@@@@@@@%. \n"; - std::cout << " =*+-: ###* .:-+*= \n"; - std::cout << " %@@% \n"; - std::cout << " *@@* \n"; - std::cout << " +@@= \n"; - std::cout << " :##: \n"; - std::cout << " :@@: \n"; - std::cout << " @@ \n"; - std::cout << " .. \n"; - std::cout << "* Logs will be written to the first available of the following paths:\n"; + std::string output = + " .--::--. \n" + " :+*=: =@@@@@@@@= :+*+: \n" + " %@@@@@@%*=. =@@@@@@@@- .=*%@@@@@@# \n" + " @@@@@@@@@@@@#+-. .%@@@@#. .-+#@@@@@@@@@@@% \n" + " -@@@@@@@@@@@@@@@@*:#@@#:*@@@@@@@@@@@@@@@@- \n" + " :+*********####-%@%%@%-####********++. \n" + " .%@@@@@@@@@@@@@%:@@@@@@:@@@@@@@@@@@@@@% \n" + " .@@@@@@@@%*+-: =@@@@= .:-+*%@@@@@@@%. \n" + " =*+-: ###* .:-+*= \n" + " %@@% \n" + " *@@* \n" + " +@@= \n" + " :##: \n" + " :@@: \n" + " @@ \n" + " .. \n" + "* Logs will be written to the first available of the following paths:\n"; + for (const auto& dir : google::GetLoggingDirectories()) { const string_view maybe_slash = absl::EndsWith(dir, "/") ? "" : "/"; - std::cout << dir << maybe_slash << "dragonfly.*\n"; + absl::StrAppend(&output, dir, maybe_slash, "dragonfly.*\n"); } - std::cout << "* For the available flags type dragonfly [--help | --helpfull]\n"; - std::cout << "* Documentation can be found at: https://www.dragonflydb.io/docs"; - std::cout << endl; + + absl::StrAppend(&output, + "* For the available flags type dragonfly [--help | --helpfull]\n" + "* Documentation can be found at: https://www.dragonflydb.io/docs\n"); + + std::cout << output; + std::cout.flush(); } void ParseFlagsFromEnv() { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 139b7dd966ba..75bbabba56bb 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -33,6 +34,8 @@ #include "util/fibers/synchronization.h" using namespace std; +ABSL_FLAG(uint32_t, allow_partial_sync_with_lsn_diff, 0, + "Do partial sync in case lsn diff is less than the given threshold"); ABSL_DECLARE_FLAG(bool, info_replication_valkey_compatible); ABSL_DECLARE_FLAG(uint32_t, replication_timeout); @@ -99,6 +102,20 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r return true; } +bool IsLSNDiffBellowThreshold(const std::vector& lsn_vec1, const std::vector& lsn_vec2) { + DCHECK_EQ(lsn_vec1.size(), lsn_vec2.size()); + uint32_t allow_diff = absl::GetFlag(FLAGS_allow_partial_sync_with_lsn_diff); + for (size_t i = 0; i < lsn_vec1.size(); ++i) { + uint32_t diff = + lsn_vec1[i] > lsn_vec2[i] ? lsn_vec1[i] - lsn_vec2[i] : lsn_vec2[i] - lsn_vec1[i]; + if (diff > allow_diff) { + VLOG(1) << "No partial sync due to diff: " << diff << " allow_diff is: " << allow_diff; + return false; + } + } + return true; +} + } // namespace void DflyCmd::ReplicaInfo::Cancel() { @@ -140,7 +157,7 @@ void DflyCmd::Run(CmdArgList args, Transaction* tx, facade::RedisReplyBuilder* r return Thread(args, rb, cntx); } - if (sub_cmd == "FLOW" && (args.size() == 4 || args.size() == 5)) { + if (sub_cmd == "FLOW" && (args.size() >= 4 && args.size() <= 6)) { return Flow(args, rb, cntx); } @@ -233,11 +250,16 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn string_view flow_id_str = ArgS(args, 3); std::optional seqid; + std::optional last_master_id; + std::optional last_master_lsn; if (args.size() == 5) { seqid.emplace(); if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) { return rb->SendError(facade::kInvalidIntErr); } + } else if (args.size() == 6) { + last_master_id = ArgS(args, 4); + last_master_lsn = ArgS(args, 5); } VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str @@ -257,6 +279,7 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn return; string eof_token; + std::string_view sync_type{"FULL"}; { util::fb2::LockGuard lk{replica_ptr->shared_mu}; @@ -276,17 +299,6 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn flow.conn = cntx->conn(); flow.eof_token = eof_token; flow.version = replica_ptr->version; - } - if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) { - // Listener::PreShutdown() triggered - if (cntx->conn()->socket()->IsOpen()) { - return rb->SendError(kInvalidState); - } - return; - } - sf_->journal()->StartInThread(); - - std::string_view sync_type{"FULL"}; #if 0 // Partial synchronization is disabled if (seqid.has_value()) { @@ -307,6 +319,42 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn } #endif + std::optional data = sf_->GetLastMasterData(); + // In this flow the master and the registered replica where synced from the same master. + if (last_master_id && data && data.value().id == last_master_id.value()) { + std::vector lsn_str_vec = absl::StrSplit(last_master_lsn.value(), '-'); + if (lsn_str_vec.size() != data.value().last_journal_LSNs.size()) { + return rb->SendError(facade::kSyntaxErr); // Unexpected flow. LSN vector of same master + // should be the same size on all replicas. + } + std::vector lsn_vec; + lsn_vec.reserve(lsn_str_vec.size()); + for (string_view lsn_str : lsn_str_vec) { + int64_t value; + if (!absl::SimpleAtoi(lsn_str, &value)) { + return rb->SendError(facade::kInvalidIntErr); + } + lsn_vec.push_back(value); + } + + if (IsLSNDiffBellowThreshold(data.value().last_journal_LSNs, lsn_vec)) { + sync_type = "PARTIAL"; + flow.start_partial_sync_at = sf_->journal()->GetLsn(); + VLOG(1) << "Partial sync requested from LSN=" << flow.start_partial_sync_at.value() + << " and is available. (current_lsn=" << sf_->journal()->GetLsn() << ")"; + } + } + } + + if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) { + // Listener::PreShutdown() triggered + if (cntx->conn()->socket()->IsOpen()) { + return rb->SendError(kInvalidState); + } + return; + } + sf_->journal()->StartInThread(); + rb->StartArray(2); rb->SendSimpleString(sync_type); rb->SendSimpleString(eof_token); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 8e7e80584b71..829f05e5aeee 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -548,6 +548,23 @@ TEST_F(DflyEngineTest, PSubscribe) { EXPECT_EQ("a*", msg.pattern); } +TEST_F(DflyEngineTest, PSubscribeMatchOnlyStar) { + single_response_ = false; + auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "*"}); }); + EXPECT_THAT(resp, ArrLen(3)); + resp = pp_->at(0)->Await([&] { return Run({"PUBLISH", "1234567890123456", "abc"}); }); + EXPECT_THAT(resp, IntArg(1)); + + pp_->AwaitFiberOnAll([](ProactorBase* pb) {}); + + ASSERT_EQ(1, SubscriberMessagesLen("IO1")); + + const auto& msg = GetPublishedMessage("IO1", 0); + EXPECT_EQ("abc", msg.message); + EXPECT_EQ("1234567890123456", msg.channel); + EXPECT_EQ("*", msg.pattern); +} + TEST_F(DflyEngineTest, Unsubscribe) { auto resp = Run({"unsubscribe", "a"}); EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "a", IntArg(0))); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 6a36458c75bc..be401c45e4fa 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -261,6 +261,17 @@ __thread EngineShard* EngineShard::shard_ = nullptr; uint64_t TEST_current_time_ms = 0; ShardId Shard(string_view v, ShardId shard_num) { + // This cluster sharding is not necessary and may degrade keys distribution among shard threads. + // For example, if we have 3 shards, then no single-char keys will be assigned to shard 2 and + // 32 single char keys in range ['_' - '~'] will be assigned to shard 0. + // Yes, SlotId function does not have great distribution properties. + // On the other side, slot based sharding may help with pipeline squashing optimizations, + // because they rely on commands being single-sharded. + // TODO: once we improve our squashing logic, we can remove this. + if (IsClusterShardedBySlot()) { + return KeySlot(v) % shard_num; + } + if (IsClusterShardedByTag()) { v = LockTagOptions::instance().Tag(v); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 97c03eb96b6a..99e00f360058 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -870,6 +870,9 @@ OpResult OpRen(const OpArgs& op_args, string_view from_key, string_view to is_prior_list = (to_res.it->second.ObjType() == OBJ_LIST); } + // Delete the "from" document from the search index before deleting from the database + op_args.shard->search_indices()->RemoveDoc(from_key, op_args.db_cntx, from_res.it->second); + bool sticky = from_res.it->first.IsSticky(); uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it); @@ -1900,13 +1903,13 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"TOUCH", CO::READONLY | CO::FAST, -2, 1, -1, acl::kTouch}.HFUNC(Exists) << CI{"EXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kExpire}.HFUNC( Expire) - << CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kExpireAt}.HFUNC( + << CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kExpireAt}.HFUNC( ExpireAt) << CI{"PERSIST", CO::WRITE | CO::FAST, 2, 1, 1, acl::kPersist}.HFUNC(Persist) << CI{"KEYS", CO::READONLY, 2, 0, 0, acl::kKeys}.HFUNC(Keys) - << CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kPExpireAt}.HFUNC( - PexpireAt) - << CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kPExpire}.HFUNC( + << CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kPExpireAt} + .HFUNC(PexpireAt) + << CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kPExpire}.HFUNC( Pexpire) << CI{"FIELDEXPIRE", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, acl::kFieldExpire}.HFUNC( FieldExpire) diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index ed6587795618..ec947b11ef12 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -145,6 +145,218 @@ TEST_F(GenericFamilyTest, ExpireOptions) { EXPECT_THAT(resp.GetInt(), 101); } +TEST_F(GenericFamilyTest, ExpireAtOptions) { + auto test_time_ms = TEST_current_time_ms; + auto time_s = (test_time_ms + 500) / 1000; + auto test_time_s = time_s; + + Run({"set", "key", "val"}); + // NX and XX are mutually exclusive + auto resp = Run({"expireat", "key", "3600", "NX", "XX"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and GT are mutually exclusive + resp = Run({"expireat", "key", "3600", "NX", "GT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and LT are mutually exclusive + resp = Run({"expireat", "key", "3600", "NX", "LT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // GT and LT are mutually exclusive + resp = Run({"expireat", "key", "3600", "GT", "LT"}); + ASSERT_THAT(resp, ErrArg("GT and LT options at the same time are not compatible")); + + // NX option should be added since there is no expiry + test_time_s = time_s + 5; + resp = Run({"expireat", "key", absl::StrCat(test_time_s), "NX"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_s, CheckedInt({"EXPIRETIME", "key"})); + + // running again with NX option, should not change expiry + test_time_s = time_s + 9; + resp = Run({"expireat", "key", absl::StrCat(test_time_s), "NX"}); + EXPECT_THAT(resp, IntArg(0)); + + // given a key with no expiry + Run({"set", "key2", "val"}); + test_time_s = time_s + 9; + resp = Run({"expireat", "key2", absl::StrCat(test_time_s), "XX"}); + // XX does not apply expiry since key has no existing expiry + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"ttl", "key2"}); + EXPECT_THAT(resp.GetInt(), -1); + + // set expiry to 101 + test_time_s = time_s + 101; + resp = Run({"expireat", "key", absl::StrCat(test_time_s)}); + EXPECT_THAT(resp, IntArg(1)); + + // GT should not apply expiry since new is not greater than the current one + auto less_test_time_s = time_s + 99; + resp = Run({"expireat", "key", absl::StrCat(less_test_time_s), "GT"}); + EXPECT_THAT(resp, IntArg(0)); + EXPECT_EQ(test_time_s, CheckedInt({"EXPIRETIME", "key"})); + + // GT should apply expiry since new is greater than the current one + test_time_s = time_s + 105; + resp = Run({"expireat", "key", absl::StrCat(test_time_s), "GT"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_s, CheckedInt({"EXPIRETIME", "key"})); + + // LT should apply new expiry is smaller than current + test_time_s = time_s + 101; + resp = Run({"expireat", "key", absl::StrCat(test_time_s), "LT"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_s, CheckedInt({"EXPIRETIME", "key"})); + + // LT should not apply expiry since new is not lesser than the current one + auto gt_test_time_s = time_s + 102; + resp = Run({"expireat", "key", absl::StrCat(gt_test_time_s), "LT"}); + EXPECT_THAT(resp, IntArg(0)); + EXPECT_EQ(test_time_s, CheckedInt({"EXPIRETIME", "key"})); +} + +TEST_F(GenericFamilyTest, PExpireOptions) { + // NX and XX are mutually exclusive + Run({"set", "key", "val"}); + auto resp = Run({"pexpire", "key", "3600", "NX", "XX"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and GT are mutually exclusive + resp = Run({"pexpire", "key", "3600", "NX", "GT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and LT are mutually exclusive + resp = Run({"pexpire", "key", "3600", "NX", "LT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // GT and LT are mutually exclusive + resp = Run({"pexpire", "key", "3600", "GT", "LT"}); + ASSERT_THAT(resp, ErrArg("GT and LT options at the same time are not compatible")); + + // NX option should be added since there is no expiry + resp = Run({"pexpire", "key", "3600000", "NX"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 3600000); + + // running again with NX option, should not change expiry + resp = Run({"pexpire", "key", "42", "NX"}); + EXPECT_THAT(resp, IntArg(0)); + + // given a key with no expiry + Run({"set", "key2", "val"}); + resp = Run({"pexpire", "key2", "404", "XX"}); + // XX does not apply expiry since key has no existing expiry + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"pttl", "key2"}); + EXPECT_THAT(resp.GetInt(), -1); + + // set expiry to 101 + resp = Run({"pexpire", "key", "101000"}); + EXPECT_THAT(resp, IntArg(1)); + + // GT should not apply expiry since new is not greater than the current one + resp = Run({"pexpire", "key", "100000", "GT"}); + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 101000); + + // GT should apply expiry since new is greater than the current one + resp = Run({"pexpire", "key", "102000", "GT"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 102000); + + // GT should not apply since expiry is smaller than current + resp = Run({"pexpire", "key", "101000", "GT"}); + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 102000); + + // LT should apply new expiry is smaller than current + resp = Run({"pexpire", "key", "101000", "LT"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 101000); + + // LT should not apply since expiry is greater than current + resp = Run({"pexpire", "key", "102000", "LT"}); + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"pttl", "key"}); + EXPECT_THAT(resp.GetInt(), 101000); +} + +TEST_F(GenericFamilyTest, PExpireAtOptions) { + auto test_time_ms = TEST_current_time_ms; + Run({"set", "key", "val"}); + // NX and XX are mutually exclusive + auto resp = Run({"pexpireat", "key", "3600", "NX", "XX"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and GT are mutually exclusive + resp = Run({"pexpireat", "key", "3600", "NX", "GT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // NX and LT are mutually exclusive + resp = Run({"pexpireat", "key", "3600", "NX", "LT"}); + ASSERT_THAT(resp, ErrArg("NX and XX, GT or LT options at the same time are not compatible")); + + // GT and LT are mutually exclusive + resp = Run({"pexpireat", "key", "3600", "GT", "LT"}); + ASSERT_THAT(resp, ErrArg("GT and LT options at the same time are not compatible")); + + // NX option should be added since there is no expiry + test_time_ms = TEST_current_time_ms + 3600; + resp = Run({"pexpireat", "key", absl::StrCat(test_time_ms), "NX"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_ms, CheckedInt({"PEXPIRETIME", "key"})); + + // running again with NX option, should not change expiry + test_time_ms = TEST_current_time_ms + 42000; + resp = Run({"pexpireat", "key", absl::StrCat(test_time_ms), "NX"}); + EXPECT_THAT(resp, IntArg(0)); + + // given a key with no expiry + Run({"set", "key2", "val"}); + test_time_ms = TEST_current_time_ms + 404; + resp = Run({"pexpireat", "key2", absl::StrCat(test_time_ms), "XX"}); + // XX does not apply expiry since key has no existing expiry + EXPECT_THAT(resp, IntArg(0)); + resp = Run({"ttl", "key2"}); + EXPECT_THAT(resp.GetInt(), -1); + + // set expiry to 101 + test_time_ms = TEST_current_time_ms + 101; + resp = Run({"pexpireat", "key", absl::StrCat(test_time_ms)}); + EXPECT_THAT(resp, IntArg(1)); + + // GT should not apply expiry since new is not greater than the current one + auto less_test_time_ms = TEST_current_time_ms + 100; + resp = Run({"pexpireat", "key", absl::StrCat(less_test_time_ms), "GT"}); + EXPECT_THAT(resp, IntArg(0)); + EXPECT_EQ(test_time_ms, CheckedInt({"PEXPIRETIME", "key"})); + + // GT should apply expiry since new is greater than the current one + test_time_ms = TEST_current_time_ms + 105; + resp = Run({"pexpireat", "key", absl::StrCat(test_time_ms), "GT"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_ms, CheckedInt({"PEXPIRETIME", "key"})); + + // LT should apply new expiry is smaller than current + test_time_ms = TEST_current_time_ms + 101; + resp = Run({"pexpireat", "key", absl::StrCat(test_time_ms), "LT"}); + EXPECT_THAT(resp, IntArg(1)); + EXPECT_EQ(test_time_ms, CheckedInt({"PEXPIRETIME", "key"})); + + // LT should not apply expiry since new is not lesser than the current one + auto gt_test_time_ms = TEST_current_time_ms + 102; + resp = Run({"pexpireat", "key", absl::StrCat(gt_test_time_ms), "LT"}); + EXPECT_THAT(resp, IntArg(0)); + EXPECT_EQ(test_time_ms, CheckedInt({"PEXPIRETIME", "key"})); +} + TEST_F(GenericFamilyTest, Del) { for (size_t i = 0; i < 1000; ++i) { Run({"set", StrCat("foo", i), "1"}); diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index d0c1f6d33c4f..a16afd43060a 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -255,14 +255,14 @@ OpResult PFMergeInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder if (result.ok()) { hlls[sid] = std::move(result.value()); } else { - success = false; + success.store(false, memory_order_relaxed); } - return result.status(); + return OpStatus::OK; }; tx->Execute(std::move(cb), false); - if (!success) { + if (!success.load(memory_order_relaxed)) { tx->Conclude(); return OpStatus::INVALID_VALUE; } diff --git a/src/server/hll_family_test.cc b/src/server/hll_family_test.cc index 7d162fc4fa63..752c640ca4b5 100644 --- a/src/server/hll_family_test.cc +++ b/src/server/hll_family_test.cc @@ -194,9 +194,12 @@ TEST_F(HllFamilyTest, MergeOverlapping) { } TEST_F(HllFamilyTest, MergeInvalid) { + Run({"exists", "key1", "key4"}); + ASSERT_EQ(GetDebugInfo().shards_count, 2); // ensure 2 shards + EXPECT_EQ(CheckedInt({"pfadd", "key1", "1", "2", "3"}), 1); - EXPECT_EQ(Run({"set", "key2", "..."}), "OK"); - EXPECT_THAT(Run({"pfmerge", "key1", "key2"}), ErrArg(HllFamily::kInvalidHllErr)); + EXPECT_EQ(Run({"set", "key4", "..."}), "OK"); + EXPECT_THAT(Run({"pfmerge", "key1", "key4"}), ErrArg(HllFamily::kInvalidHllErr)); EXPECT_EQ(CheckedInt({"pfcount", "key1"}), 3); } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 59cd1a31866f..77213ae4c320 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -324,7 +324,7 @@ OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool } OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, - bool skip_notexist, facade::ArgRange vals, bool journal_rewrite) { + bool skip_notexist, const facade::ArgRange& vals, bool journal_rewrite) { EngineShard* es = op_args.shard; DbSlice::ItAndUpdater res; @@ -666,7 +666,7 @@ OpResult OpInsert(const OpArgs& op_args, string_view key, string_view pivot QList* ql = GetQLV2(pv); QList::InsertOpt insert_opt = (insert_param == INSERT_BEFORE) ? QList::BEFORE : QList::AFTER; if (ql->Insert(pivot, elem, insert_opt)) { - res = ql->Size(); + res = int(ql->Size()); } } else { quicklist* ql = GetQL(pv); @@ -688,7 +688,7 @@ OpResult OpInsert(const OpArgs& op_args, string_view key, string_view pivot DCHECK_EQ(INSERT_BEFORE, insert_param); quicklistInsertBefore(qiter, &entry, elem.data(), elem.size()); } - res = quicklistCount(ql); + res = int(quicklistCount(ql)); } quicklistReleaseIterator(qiter); } @@ -809,7 +809,7 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { auto it = it_res->it; - long llen = it->second.Size(); + long llen = long(it->second.Size()); /* convert negative indexes */ if (start < 0) @@ -1324,27 +1324,27 @@ void ListFamily::LMPop(CmdArgList args, const CommandContext& cmd_cntx) { } void ListFamily::LPush(CmdArgList args, const CommandContext& cmd_cntx) { - return PushGeneric(ListDir::LEFT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PushGeneric(ListDir::LEFT, false, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::LPushX(CmdArgList args, const CommandContext& cmd_cntx) { - return PushGeneric(ListDir::LEFT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PushGeneric(ListDir::LEFT, true, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::LPop(CmdArgList args, const CommandContext& cmd_cntx) { - return PopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PopGeneric(ListDir::LEFT, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::RPush(CmdArgList args, const CommandContext& cmd_cntx) { - return PushGeneric(ListDir::RIGHT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PushGeneric(ListDir::RIGHT, false, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::RPushX(CmdArgList args, const CommandContext& cmd_cntx) { - return PushGeneric(ListDir::RIGHT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PushGeneric(ListDir::RIGHT, true, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::RPop(CmdArgList args, const CommandContext& cmd_cntx) { - return PopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb); + return PopGeneric(ListDir::RIGHT, args, cmd_cntx.tx, cmd_cntx.rb); } void ListFamily::LLen(CmdArgList args, const CommandContext& cmd_cntx) { @@ -1563,11 +1563,11 @@ void ListFamily::LSet(CmdArgList args, const CommandContext& cmd_cntx) { } void ListFamily::BLPop(CmdArgList args, const CommandContext& cmd_cntx) { - BPopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); + BPopGeneric(ListDir::LEFT, args, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } void ListFamily::BRPop(CmdArgList args, const CommandContext& cmd_cntx) { - BPopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); + BPopGeneric(ListDir::RIGHT, args, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } void ListFamily::LMove(CmdArgList args, const CommandContext& cmd_cntx) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 91f003b5a466..20efa1a4a4ec 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -116,8 +116,8 @@ ABSL_FLAG(size_t, serialization_max_chunk_size, 64_KB, "Maximum size of a value that may be serialized at once during snapshotting or full " "sync. Values bigger than this threshold will be serialized using streaming " "serialization. 0 - to disable streaming mode"); -ABSL_FLAG(uint32_t, max_squashed_cmd_num, 32, - "Max number of commands squashed in command squash optimizaiton"); +ABSL_FLAG(uint32_t, max_squashed_cmd_num, 100, + "Max number of commands squashed in a single shard during squash optimizaiton"); namespace dfly { @@ -287,10 +287,10 @@ void DispatchMonitor(ConnectionContext* cntx, const CommandId* cid, CmdArgList t class InterpreterReplier : public RedisReplyBuilder { public: - InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) { + explicit InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) { } - void SendError(std::string_view str, std::string_view type = std::string_view{}) final; + void SendError(std::string_view str, std::string_view type) final; void SendBulkString(std::string_view str) final; void SendSimpleString(std::string_view str) final; @@ -313,7 +313,7 @@ class InterpreterReplier : public RedisReplyBuilder { // Serialized result of script invocation to Redis protocol class EvalSerializer : public ObjectExplorer { public: - EvalSerializer(RedisReplyBuilder* rb) : rb_(rb) { + explicit EvalSerializer(RedisReplyBuilder* rb) : rb_(rb) { } void OnBool(bool b) final { @@ -443,11 +443,8 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType type) { } bool IsSHA(string_view str) { - for (auto c : str) { - if (!absl::ascii_isxdigit(c)) - return false; - } - return true; + return std::all_of(str.begin(), str.end(), + [](unsigned char c) { return absl::ascii_isxdigit(c); }); } optional EvalValidator(CmdArgList args) { @@ -637,9 +634,8 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state, // We can only tell if eval is transactional based on they keycount if (absl::StartsWith(scmd.Cid()->name(), "EVAL")) { CmdArgVec arg_vec{}; - StoredCmd cmd = scmd; - cmd.Fill(&arg_vec); - auto keys = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec)); + auto args = scmd.ArgList(&arg_vec); + auto keys = DetermineKeys(scmd.Cid(), args); transactional |= (keys && keys.value().NumArgs() > 0); } else { transactional |= scmd.Cid()->IsTransactional(); @@ -1203,9 +1199,8 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, bool is_trans_cmd = CO::IsTransKind(cid->name()); if (dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd) { // TODO: protect against aggregating huge transactions. - StoredCmd stored_cmd{cid, args_no_cmd}; - dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd)); - if (stored_cmd.Cid()->IsWriteOnly()) { + dfly_cntx->conn_state.exec_info.body.emplace_back(cid, true, args_no_cmd); + if (cid->IsWriteOnly()) { dfly_cntx->conn_state.exec_info.is_write = true; } return builder->SendSimpleString("QUEUED"); @@ -1415,11 +1410,14 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); DCHECK_EQ(builder->GetProtocol(), Protocol::REDIS); + auto* ss = dfly::ServerState::tlocal(); + // Don't even start when paused. We can only continue if DispatchTracker is aware of us running. + if (ss->IsPaused()) + return 0; + vector stored_cmds; intrusive_ptr dist_trans; - size_t dispatched = 0; - auto* ss = dfly::ServerState::tlocal(); auto perform_squash = [&] { if (stored_cmds.empty()) @@ -1448,10 +1446,6 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply stored_cmds.clear(); }; - // Don't even start when paused. We can only continue if DispatchTracker is aware of us running. - if (ss->IsPaused()) - return 0; - for (auto args : args_list) { string cmd = absl::AsciiStrToUpper(ArgS(args, 0)); const auto [cid, tail_args] = registry_.FindExtended(cmd, args.subspan(1)); @@ -1471,7 +1465,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply if (!is_multi && !is_eval && !is_blocking && cid != nullptr) { stored_cmds.reserve(args_list.size()); - stored_cmds.emplace_back(cid, tail_args); + stored_cmds.emplace_back(cid, false /* do not deep-copy commands*/, tail_args); continue; } @@ -2102,27 +2096,22 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandId* exists_cid, // Check if exec_info watches keys on dbs other than db_indx. bool IsWatchingOtherDbs(DbIndex db_indx, const ConnectionState::ExecInfo& exec_info) { - for (const auto& [key_db, _] : exec_info.watched_keys) { - if (key_db != db_indx) { - return true; - } - } - return false; + return std::any_of(exec_info.watched_keys.begin(), exec_info.watched_keys.end(), + [db_indx](const auto& pair) { return pair.first != db_indx; }); } -template void IterateAllKeys(ConnectionState::ExecInfo* exec_info, F&& f) { +template void IterateAllKeys(const ConnectionState::ExecInfo* exec_info, F&& f) { for (auto& [dbid, key] : exec_info->watched_keys) f(MutableSlice{key.data(), key.size()}); CmdArgVec arg_vec{}; - for (auto& scmd : exec_info->body) { + for (const auto& scmd : exec_info->body) { if (!scmd.Cid()->IsTransactional()) continue; - scmd.Fill(&arg_vec); - - auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec)); + auto args = scmd.ArgList(&arg_vec); + auto key_res = DetermineKeys(scmd.Cid(), args); if (!key_res.ok()) continue; @@ -2224,16 +2213,12 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this, opts); } else { CmdArgVec arg_vec; - for (auto& scmd : exec_info.body) { + for (const auto& scmd : exec_info.body) { VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs(); - cmd_cntx.tx->MultiSwitchCmd(scmd.Cid()); - cntx->cid = scmd.Cid(); - - arg_vec.resize(scmd.NumArgs()); - scmd.Fill(&arg_vec); + cntx->SwitchTxCmd(scmd.Cid()); - CmdArgList args = absl::MakeSpan(arg_vec); + CmdArgList args = scmd.ArgList(&arg_vec); if (scmd.Cid()->IsTransactional()) { OpStatus st = cmd_cntx.tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args); @@ -2275,7 +2260,7 @@ void SubscribeImpl(bool reject_cluster, CmdArgList args, const CommandContext& c if (reject_cluster && IsClusterEnabled()) { return cmd_cntx.rb->SendError("SUBSCRIBE is not supported in cluster mode yet"); } - cmd_cntx.conn_cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args), + cmd_cntx.conn_cntx->ChangeSubscription(true /*add*/, true /* reply*/, args, static_cast(cmd_cntx.rb)); } diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index e52423e39996..f6bd2a5965ca 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -92,7 +92,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar return sinfo; } -MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cmd) { +MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredCmd* cmd) { DCHECK(cmd->Cid()); if (!cmd->Cid()->IsTransactional() || (cmd->Cid()->opt_mask() & CO::BLOCKING) || @@ -103,8 +103,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; } - cmd->Fill(&tmp_keylist_); - auto args = absl::MakeSpan(tmp_keylist_); + auto args = cmd->ArgList(&tmp_keylist_); if (args.empty()) return SquashResult::NOT_SQUASHED; @@ -136,11 +135,10 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } -bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd) { +bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted - cmd->Fill(&tmp_keylist_); - auto args = absl::MakeSpan(tmp_keylist_); + auto args = cmd->ArgList(&tmp_keylist_); if (opts_.verify_commands) { if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { @@ -151,8 +149,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor } auto* tx = cntx_->transaction; - tx->MultiSwitchCmd(cmd->Cid()); - cntx_->cid = cmd->Cid(); + cntx_->SwitchTxCmd(cmd->Cid()); if (cmd->Cid()->IsTransactional()) tx->InitByArgs(cntx_->ns, cntx_->conn_state.db_index, args); @@ -171,13 +168,11 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v if (cntx_->conn()) { local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged(); } - absl::InlinedVector arg_vec; - for (auto* cmd : sinfo.cmds) { - arg_vec.resize(cmd->NumArgs()); - auto args = absl::MakeSpan(arg_vec); - cmd->Fill(args); + CmdArgVec arg_vec; + for (const auto* cmd : sinfo.cmds) { + auto args = cmd->ArgList(&arg_vec); if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { @@ -189,8 +184,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v } } - local_tx->MultiSwitchCmd(cmd->Cid()); - local_cntx.cid = cmd->Cid(); + local_cntx.SwitchTxCmd(cmd->Cid()); crb.SetReplyMode(cmd->ReplyMode()); local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); @@ -205,7 +199,6 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v CheckConnStateClean(local_state); } - reverse(sinfo.replies.begin(), sinfo.replies.end()); return OpStatus::OK; } @@ -255,15 +248,15 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { bool aborted = false; for (auto idx : order_) { - auto& replies = sharded_[idx].replies; - CHECK(!replies.empty()); + auto& sinfo = sharded_[idx]; + auto& replies = sinfo.replies; + DCHECK_LT(sinfo.reply_id, replies.size()); - aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back()); - - current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); - CapturingReplyBuilder::Apply(std::move(replies.back()), rb); - replies.pop_back(); + auto& reply = replies[sinfo.reply_id++]; + aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply); + current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed); + CapturingReplyBuilder::Apply(std::move(reply), rb); if (aborted) break; } @@ -271,8 +264,11 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; - for (auto& sinfo : sharded_) + for (auto& sinfo : sharded_) { sinfo.cmds.clear(); + sinfo.replies.clear(); + sinfo.reply_id = 0; + } order_.clear(); return !aborted; diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index b54c7b2f5f30..657ecef1e467 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -28,6 +28,7 @@ class MultiCommandSquasher { unsigned max_squash_size = 32; // How many commands to squash at once }; + // Returns number of processed commands. static size_t Execute(absl::Span cmds, facade::RedisReplyBuilder* rb, ConnectionContext* cntx, Service* service, const Opts& opts) { return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb); @@ -40,17 +41,17 @@ class MultiCommandSquasher { private: // Per-shard execution info. struct ShardExecInfo { - ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} { + ShardExecInfo() : local_tx{nullptr} { } - std::vector cmds; // accumulated commands + std::vector cmds; // accumulated commands std::vector replies; + unsigned reply_id = 0; boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard }; enum class SquashResult { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR }; - private: MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* Service, const Opts& opts); @@ -58,10 +59,10 @@ class MultiCommandSquasher { ShardExecInfo& PrepareShardInfo(ShardId sid); // Retrun squash flags - SquashResult TrySquash(StoredCmd* cmd); + SquashResult TrySquash(const StoredCmd* cmd); // Execute separate non-squashed cmd. Return false if aborting on error. - bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd); + bool ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd); // Callback that runs on shards during squashed hop. facade::OpStatus SquashedHopCb(EngineShard* es, facade::RespVersion resp_v); @@ -69,12 +70,11 @@ class MultiCommandSquasher { // Execute all currently squashed commands. Return false if aborting on error. bool ExecuteSquashed(facade::RedisReplyBuilder* rb); - // Run all commands until completion. Returns number of squashed commands. + // Run all commands until completion. Returns number of processed commands. size_t Run(facade::RedisReplyBuilder* rb); bool IsAtomic() const; - private: absl::Span cmds_; // Input range of stored commands ConnectionContext* cntx_; // Underlying context Service* service_; diff --git a/src/server/replica.cc b/src/server/replica.cc index 58eb0298e240..32879b6e6a5b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -126,18 +126,19 @@ GenericError Replica::Start() { return {}; } -void Replica::StartMainReplicationFiber() { - sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this); +void Replica::StartMainReplicationFiber(std::optional last_master_sync_data) { + sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this, + std::move(last_master_sync_data)); } void Replica::EnableReplication(facade::SinkReplyBuilder* builder) { VLOG(1) << "Enabling replication"; - state_mask_.store(R_ENABLED); // set replica state to enabled - sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber + state_mask_.store(R_ENABLED); // set replica state to enabled + sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this, nullopt); // call replication fiber } -void Replica::Stop() { +std::optional Replica::Stop() { VLOG(1) << "Stopping replication " << this; // Stops the loop in MainReplicationFb. @@ -154,6 +155,11 @@ void Replica::Stop() { for (auto& flow : shard_flows_) { flow.reset(); } + + if (last_journal_LSNs_.has_value()) { + return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()}; + } + return nullopt; } void Replica::Pause(bool pause) { @@ -183,7 +189,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) { return ec; } -void Replica::MainReplicationFb() { +void Replica::MainReplicationFb(std::optional last_master_sync_data) { VLOG(1) << "Main replication fiber started " << this; // Switch shard states to replication. SetShardStates(true); @@ -231,9 +237,9 @@ void Replica::MainReplicationFb() { // 3. Initiate full sync if ((state_mask_.load() & R_SYNC_OK) == 0) { - if (HasDflyMaster()) - ec = InitiateDflySync(); - else + if (HasDflyMaster()) { + ec = InitiateDflySync(std::exchange(last_master_sync_data, nullopt)); + } else ec = InitiatePSync(); if (ec) { @@ -468,7 +474,7 @@ error_code Replica::InitiatePSync() { } // Initialize and start sub-replica for each flow. -error_code Replica::InitiateDflySync() { +error_code Replica::InitiateDflySync(std::optional last_master_sync_data) { auto start_time = absl::Now(); // Initialize MultiShardExecution. @@ -530,7 +536,8 @@ error_code Replica::InitiateDflySync() { auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &exec_st_, last_journal_LSNs_.has_value() ? std::optional((*last_journal_LSNs_)[id]) - : std::nullopt); + : std::nullopt, + last_master_sync_data); if (ec.has_value()) is_full_sync[id] = ec.value(); else @@ -542,7 +549,7 @@ error_code Replica::InitiateDflySync() { lock_guard lk{flows_op_mu_}; shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); - + last_journal_LSNs_.reset(); size_t num_full_flows = std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0); @@ -558,7 +565,6 @@ error_code Replica::InitiateDflySync() { } else if (num_full_flows == 0) { sync_type = "partial"; } else { - last_journal_LSNs_.reset(); exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable), "Won't do a partial sync: some flows must fully resync"); } @@ -734,8 +740,9 @@ error_code Replica::SendNextPhaseRequest(string_view kind) { return std::error_code{}; } -io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionState* cntx, - std::optional lsn) { +io::Result DflyShardReplica::StartSyncFlow( + BlockingCounter sb, ExecutionState* cntx, std::optional lsn, + std::optional last_master_data) { using nonstd::make_unexpected; DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); proactor_index_ = ProactorBase::me()->GetPoolIndex(); @@ -746,6 +753,7 @@ io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " " << master_context_.dfly_session_id << " " << flow_id_; + // DFLY FLOW [lsn] [last_master_id lsn-vec] std::string cmd = StrCat("DFLY FLOW ", master_context_.master_repl_id, " ", master_context_.dfly_session_id, " ", flow_id_); // Try to negotiate a partial sync if possible. @@ -753,6 +761,12 @@ io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt absl::GetFlag(FLAGS_replica_partial_sync)) { absl::StrAppend(&cmd, " ", *lsn); } + if (last_master_data && master_context_.version >= DflyVersion::VER5 && + absl::GetFlag(FLAGS_replica_partial_sync)) { + string lsn_str = absl::StrJoin(last_master_data.value().last_journal_LSNs, "-"); + absl::StrAppend(&cmd, " ", last_master_data.value().id, " ", lsn_str); + VLOG(1) << "Sending last master sync flow " << last_master_data.value().id << " " << lsn_str; + } ResetParser(RedisParser::Mode::CLIENT); leftover_buf_.emplace(128); diff --git a/src/server/replica.h b/src/server/replica.h index 5c9fb9de834c..c2f7aaefc253 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -59,14 +59,18 @@ class Replica : ProtocolClient { // Returns true if initial link with master has been established or // false if it has failed. GenericError Start(); - void StartMainReplicationFiber(); + struct LastMasterSyncData { + std::string id; + std::vector last_journal_LSNs; // lsn for each master shard. + }; + void StartMainReplicationFiber(std::optional data); // Sets the server state to have replication enabled. // It is like Start(), but does not attempt to establish // a connection right-away, but instead lets MainReplicationFb do the work. void EnableReplication(facade::SinkReplyBuilder* builder); - void Stop(); // thread-safe + std::optional Stop(); // thread-safe void Pause(bool pause); @@ -78,15 +82,15 @@ class Replica : ProtocolClient { private: /* Main standalone mode functions */ // Coordinate state transitions. Spawned by start. - void MainReplicationFb(); + void MainReplicationFb(std::optional data); std::error_code Greet(); // Send PING and REPLCONF. std::error_code HandleCapaDflyResp(); std::error_code ConfigureDflyMaster(); - std::error_code InitiatePSync(); // Redis full sync. - std::error_code InitiateDflySync(); // Dragonfly full sync. + std::error_code InitiatePSync(); // Redis full sync. + std::error_code InitiateDflySync(std::optional data); // Dragonfly full sync. std::error_code ConsumeRedisStream(); // Redis stable state. std::error_code ConsumeDflyStream(); // Dragonfly stable state. @@ -185,7 +189,8 @@ class DflyShardReplica : public ProtocolClient { // Start replica initialized as dfly flow. // Sets is_full_sync when successful. io::Result StartSyncFlow(util::fb2::BlockingCounter block, ExecutionState* cntx, - std::optional); + std::optional, + std::optional data); // Transition into stable state mode as dfly flow. std::error_code StartStableSyncFlow(ExecutionState* cntx); diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 82fbc3e71012..38dc0a9bdefb 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -2722,4 +2722,35 @@ TEST_F(SearchFamilyTest, JsonWithNullFields) { AreDocIds("doc:1", "doc:2")); } +TEST_F(SearchFamilyTest, TestHsetDeleteDocumentHnswSchemaCrash) { + EXPECT_EQ(Run({"FT.CREATE", "idx", "SCHEMA", "n", "NUMERIC", "v", "VECTOR", "HNSW", "8", "TYPE", + "FLOAT16", "DIM", "4", "DISTANCE_METRIC", "L2", "M", "65536"}), + "OK"); + + auto res = Run({"HSET", "doc", "n", "0"}); + EXPECT_EQ(res, 1); + + res = Run({"DEL", "doc"}); + EXPECT_EQ(res, 1); +} + +TEST_F(SearchFamilyTest, RenameDocumentBetweenIndices) { + absl::FlagSaver fs; + + SetTestFlag("cluster_mode", "emulated"); + ResetService(); + + EXPECT_EQ(Run({"ft.create", "idx1", "prefix", "1", "idx1", "filter", "@index==\"yes\"", "schema", + "t", "text"}), + "OK"); + EXPECT_EQ(Run({"ft.create", "idx2", "prefix", "1", "idx2", "filter", "@index==\"yes\"", "schema", + "t", "text"}), + "OK"); + + Run({"hset", "idx1:{doc}1", "t", "foo1", "index", "yes"}); + + EXPECT_EQ(Run({"rename", "idx1:{doc}1", "idx2:{doc}1"}), "OK"); + EXPECT_EQ(Run({"rename", "idx2:{doc}1", "idx1:{doc}1"}), "OK"); +} + } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 40c4243fda80..9c7129cdffc3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1314,6 +1314,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "", conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_hop_total", "", m.coordinator_stats.multi_squash_executions, + MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_commands_total", "", m.coordinator_stats.squashed_commands, + MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_hop_duration_seconds", "", + m.coordinator_stats.multi_squash_exec_hop_usec * 1e-6, + MetricType::COUNTER, &resp->body()); + AppendMetricWithoutLabels("cmd_squash_hop_reply_seconds", "", + m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6, + MetricType::COUNTER, &resp->body()); + AppendMetricWithoutLabels("commands_squashing_replies_bytes", "", MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE, &resp->body()); @@ -2210,7 +2224,10 @@ void ServerFamily::ResetStat(Namespace* ns) { shard_set->pool()->AwaitBrief( [registry = service_.mutable_registry(), ns](unsigned index, auto*) { registry->ResetCallStats(index); - ns->GetCurrentDbSlice().ResetEvents(); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + ns->GetDbSlice(shard->shard_id()).ResetEvents(); + } facade::ResetStats(); ServerState::tlocal()->exec_freq_count.clear(); }); @@ -2486,7 +2503,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other); append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); - append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands); append("pipeline_throttle_total", conn_stats.pipeline_throttle_count); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); @@ -2628,9 +2644,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("eval_shardlocal_coordination_total", m.coordinator_stats.eval_shardlocal_coordination_cnt); append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); - append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions); - append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec); - append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec); }; auto add_repl_info = [&] { @@ -2943,7 +2956,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) cmd_cntx.rb->SendError(ec.Format()); return; } - add_replica->StartMainReplicationFiber(); + add_replica->StartMainReplicationFiber(nullopt); cluster_replicas_.push_back(std::move(add_replica)); cmd_cntx.rb->SendOk(); } @@ -2951,6 +2964,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ActionOnConnectionFail on_err) { std::shared_ptr new_replica; + std::optional last_master_data; { util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time @@ -2974,7 +2988,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply CHECK(replica_); SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica - replica_->Stop(); + last_master_data_ = replica_->Stop(); replica_.reset(); StopAllClusterReplicas(); @@ -2987,8 +3001,9 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply } // If any replication is in progress, stop it, cancellation should kick in immediately + if (replica_) - replica_->Stop(); + last_master_data = replica_->Stop(); StopAllClusterReplicas(); // First, switch into the loading state @@ -3044,8 +3059,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply // If we are called by "Replicate", tx will be null but we do not need // to flush anything. if (on_err == ActionOnConnectionFail::kReturnOnError) { - Drakarys(tx, DbSlice::kDbAll); - new_replica->StartMainReplicationFiber(); + new_replica->StartMainReplicationFiber(last_master_data); } builder->SendOk(); } @@ -3118,7 +3132,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) LOG(INFO) << "Takeover successful, promoting this instance to master."; SetMasterFlagOnAllThreads(true); - replica_->Stop(); + last_master_data_ = replica_->Stop(); replica_.reset(); return builder->SendOk(); } diff --git a/src/server/server_family.h b/src/server/server_family.h index 5e43ed9c5db9..013a75d2c551 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -242,6 +242,10 @@ class ServerFamily { return dfly_cmd_.get(); } + std::optional GetLastMasterData() const { + return last_master_data_; + } + absl::Span GetListeners() const { return listeners_; } @@ -368,6 +372,7 @@ class ServerFamily { std::unique_ptr dfly_cmd_; std::string master_replid_; + std::optional last_master_data_; time_t start_time_ = 0; // in seconds, epoch time. diff --git a/src/server/version.h b/src/server/version.h index e89d19bb7060..ff9f8e825ba9 100644 --- a/src/server/version.h +++ b/src/server/version.h @@ -33,8 +33,11 @@ enum class DflyVersion { // - Periodic lag checks from master to replica VER4, + // - Support partial sync from different master + VER5, + // Always points to the latest version - CURRENT_VER = VER4, + CURRENT_VER = VER5, }; } // namespace dfly diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 3af54832a757..7d829f267444 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -525,7 +525,7 @@ async def test_keyspace_events_config_set(async_client: aioredis.Redis): await collect_expiring_events(pclient, keys) -@pytest.mark.exclude_epoll +@dfly_args({"max_busy_read_usec": 10000}) async def test_reply_count(async_client: aioredis.Redis): """Make sure reply aggregations reduce reply counts for common cases""" @@ -537,6 +537,7 @@ async def measure(aw): await aw return await get_reply_count() - before - 1 + await async_client.config_resetstat() base = await get_reply_count() info_diff = await get_reply_count() - base assert info_diff == 1 @@ -561,13 +562,16 @@ async def measure(aw): e = async_client.pipeline(transaction=True) for _ in range(100): e.incr("num-1") - assert await measure(e.execute()) == 2 # OK + Response + + # one - for MULTI-OK, one for the rest. Depends on the squashing efficiency, + # can be either 1 or 2 replies. + assert await measure(e.execute()) <= 2 # Just pipeline p = async_client.pipeline(transaction=False) for _ in range(100): p.incr("num-1") - assert await measure(p.execute()) == 1 + assert await measure(p.execute()) <= 2 # Script result assert await measure(async_client.eval('return {1,2,{3,4},5,6,7,8,"nine"}', 0)) == 1 @@ -1118,14 +1122,15 @@ async def wait_for_stuck_on_send(): # Test that the cache pipeline does not grow or shrink under constant pipeline load. -@dfly_args({"proactor_threads": 1, "pipeline_squash": 9}) +@dfly_args({"proactor_threads": 1, "pipeline_squash": 9, "max_busy_read_usec": 10000}) async def test_pipeline_cache_only_async_squashed_dispatches(df_factory): server = df_factory.create() server.start() client = server.client() + await client.ping() # Make sure the connection and the protocol were established - async def push_pipeline(size=1): + async def push_pipeline(size): p = client.pipeline(transaction=True) for i in range(size): p.info() @@ -1136,14 +1141,15 @@ async def push_pipeline(size=1): # should be zero because: # We always dispatch the items that will be squashed, so when `INFO` gets called # the cache is empty because the pipeline consumed it throughout its execution - for i in range(0, 30): + # high max_busy_read_usec ensures that the connection fiber has enough time to push + # all the commands to reach the squashing limit. + for i in range(0, 10): # it's actually 11 commands. 8 INFO + 2 from the MULTI/EXEC block that is injected - # by the client. Connection fiber yields to dispatch/async fiber when - # ++async_streak_len_ >= 10. The minimum to squash is 9 so it will squash the pipeline + # by the client. The minimum to squash is 9 so it will squash the pipeline # and INFO ALL should return zero for all the squashed commands in the pipeline res = await push_pipeline(8) - for i in range(1): - assert res[i]["pipeline_cache_bytes"] == 0 + for r in res: + assert r["pipeline_cache_bytes"] == 0 # Non zero because we reclaimed/recycled the messages back to the cache info = await client.info() diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 2a4261c86fbf..eaa81cee9190 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -3038,3 +3038,75 @@ async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyIn await wait_available_async(c_node) assert await c_node.execute_command("dbsize") > 0 await c_node.execute_command("FLUSHALL") + + +@pytest.mark.parametrize( + "use_takeover, allowed_diff", + [(False, 2), (False, 0), (True, 0)], +) +async def test_partial_replication_on_same_source_master(df_factory, use_takeover, allowed_diff): + master = df_factory.create() + replica1 = df_factory.create(allow_partial_sync_with_lsn_diff=allowed_diff) + replica2 = df_factory.create() + + df_factory.start_all([master, replica1, replica2]) + c_master = master.client() + c_replica1 = replica1.client() + c_replica2 = replica2.client() + + logging.debug("Fill master with test data") + seeder = DebugPopulateSeeder(key_target=50) + await seeder.run(c_master) + + logging.debug("Start replication and wait for full sync") + await c_replica1.execute_command(f"REPLICAOF localhost {master.port}") + await wait_for_replicas_state(c_replica1) + await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") + await wait_for_replicas_state(c_replica2) + + # Send some traffic + seeder = SeederV2(key_target=8_000) + await seeder.run(c_master, target_deviation=0.01) + + # Wait for all journal changes propagate to replicas + await check_all_replicas_finished([c_replica1, c_replica2], c_master) + + if use_takeover: + # Promote first replica to master + await c_replica1.execute_command(f"REPLTAKEOVER 5") + else: + # Promote first replica to master + await c_replica1.execute_command(f"REPLICAOF NO ONE") + # Send 2 more commands to be propagated to second replica + # Sending 2 more commands will result in partial sync if allow_partial_sync_with_lsn_diff is equal or higher + await c_master.set("x", "y") + await c_master.set("x", "y") + await check_all_replicas_finished([c_replica2], c_master) + + # Start replication with new master + await c_replica2.execute_command(f"REPLICAOF localhost {replica1.port}") + + await check_all_replicas_finished([c_replica2], c_replica1) + # Validate data + if use_takeover: + hash1, hash2 = await asyncio.gather( + *(SeederV2.capture(c) for c in (c_replica1, c_replica2)) + ) + assert hash1 == hash2 + + # Check we can takeover to the second replica + await c_replica2.execute_command(f"REPLTAKEOVER 5") + + replica1.stop() + replica2.stop() + if use_takeover or (allowed_diff > 0 and not use_takeover): + # Check logs for partial replication + lines = replica2.find_in_logs(f"Started partial sync with localhost:{replica1.port}") + assert len(lines) == 1 + # Check no full sync logs + lines = replica2.find_in_logs(f"Started full with localhost:{replica1.port}") + assert len(lines) == 0 + else: + lines = replica2.find_in_logs(f"Started full with localhost:{replica1.port}") + assert len(lines) == 0 + assert len(replica1.find_in_logs("No partial sync due to diff")) > 0 diff --git a/tests/dragonfly/seeder_test.py b/tests/dragonfly/seeder_test.py index c3a5cb59aec4..8e1ebeb1086f 100644 --- a/tests/dragonfly/seeder_test.py +++ b/tests/dragonfly/seeder_test.py @@ -135,3 +135,45 @@ async def test_seeder_fake_redis( capture = await seeder.capture_fake_redis() assert await seeder.compare(capture, instance.port) + + +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 2}) +async def test_seeder_huge_value( + df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory +): + instance = df_factory.create() + df_factory.start_all([instance]) + + expected_huge_value_count = 10 + seeder = df_seeder_factory.create( + keys=100, + port=instance.port, + huge_value_count=expected_huge_value_count, + huge_value_size=240_000, + ) + + def custom_command_generation_probability(): + return [ + 0.0, + 0.0, + 100.0, + ] # We will only execute GROW commands + + # Provide custom function for command generation probability + seeder.gen.size_change_probs = custom_command_generation_probability + + await seeder.run(target_ops=100) + + client = instance.client() + + keys = await client.execute_command("KEYS *") + huge_val_keys_count = 0 + + for key in keys: + key_size = await client.execute_command(f"MEMORY USAGE {key}") + # Count all keys that have memory - i.e. contain huge strings + if key_size != None and key_size > 100_000: + huge_val_keys_count += 1 + + assert huge_val_keys_count == expected_huge_value_count diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index bfaec24193dc..5e6fe38c6d01 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -141,13 +141,26 @@ class ValueType(Enum): class CommandGenerator: """Class for generating complex command sequences""" - def __init__(self, target_keys, val_size, batch_size, max_multikey, unsupported_types=[]): + def __init__( + self, + target_keys, + val_size, + huge_val_count, + huge_val_size, + batch_size, + max_multikey, + unsupported_types=[], + ): self.key_cnt_target = target_keys self.val_size = val_size self.batch_size = min(batch_size, target_keys) self.max_multikey = max_multikey self.unsupported_types = unsupported_types + # Generate sorted list of random samples in target_keys range + self.huge_val_sample = sorted(random.sample(range(target_keys), huge_val_count)) + self.huge_val_size = huge_val_size + # Key management self.key_sets = [set() for _ in ValueType] self.key_cursor = 0 @@ -205,26 +218,49 @@ def randomize_key(self, t=None, pop=False): return k, t - def generate_val(self, t: ValueType): + def generate_val(self, t: ValueType, idx): """Generate filler value of configured size for type t""" + # If current key count matches huge val sample than we will create one element with huge val size. + generate_huge_val = False + if len(self.huge_val_sample) and self.huge_val_sample[0] == (self.key_cnt + idx): + generate_huge_val = True + # Remove this sample from list + self.huge_val_sample.pop(0) + def rand_str(k=3, s=""): # Use small k value to reduce mem usage and increase number of ops return s.join(random.choices(string.ascii_letters, k=k)) if t == ValueType.STRING: # Random string for MSET - return (rand_str(self.val_size),) + return (rand_str(self.huge_val_size if generate_huge_val else self.val_size),) elif t == ValueType.LIST: # Random sequence k-letter elements for LPUSH - return tuple(rand_str() for _ in range(self.val_size // 4)) + list_size = self.val_size // 4 + element_size = ( + self.huge_val_size // list_size if generate_huge_val else self.val_size // list_size + ) + return tuple(rand_str(element_size) for i in range(list_size)) elif t == ValueType.SET: # Random sequence of k-letter elements for SADD - return tuple(rand_str() for _ in range(self.val_size // 4)) + set_size = self.val_size // 4 + element_size = ( + self.huge_val_size // set_size if generate_huge_val else self.val_size // set_size + ) + return tuple(rand_str(element_size) for i in range(set_size)) elif t == ValueType.HSET: # Random sequence of k-letter keys + int and two start values for HSET + hset_size = self.val_size // 5 + element_size = ( + self.huge_val_size // hset_size if generate_huge_val else self.val_size // hset_size + ) elements = ( - (rand_str(), random.randint(0, self.val_size)) for _ in range(self.val_size // 5) + ( + rand_str(element_size), + random.randint(0, self.val_size), + ) + for i in range(hset_size) ) return ("v0", 0, "v1", 0) + tuple(itertools.chain(*elements)) elif t == ValueType.ZSET: @@ -233,17 +269,29 @@ def rand_str(k=3, s=""): # This ensures that we test both the ZSET implementation with listpack and the our custom BPtree. value_sizes = [self.val_size // 4, 130] probabilities = [8, 1] - value_size = random.choices(value_sizes, probabilities)[0] - elements = ((random.randint(0, self.val_size), rand_str()) for _ in range(value_size)) + zset_size = random.choices(value_sizes, probabilities)[0] + element_size = ( + self.huge_val_size // zset_size if generate_huge_val else self.val_size // zset_size + ) + elements = ( + ( + random.randint(0, self.val_size), + rand_str(element_size), + ) + for i in range(zset_size) + ) return tuple(itertools.chain(*elements)) - elif t == ValueType.JSON: # Json object with keys: # - arr (array of random strings) # - ints (array of objects {i:random integer}) # - i (random integer) - ints = [{"i": random.randint(0, 100)} for i in range(self.val_size // 6)] - strs = [rand_str() for _ in range(self.val_size // 6)] + json_size = self.val_size // 6 + element_size = ( + self.huge_val_size // json_size if generate_huge_val else self.val_size // json_size + ) + ints = [{"i": random.randint(0, 100)} for i in range(json_size)] + strs = [rand_str(element_size) for i in range(json_size)] return "$", json.dumps({"arr": strs, "ints": ints, "i": random.randint(0, 100)}) else: assert False, "Invalid ValueType" @@ -313,7 +361,9 @@ def gen_grow_cmd(self): count = 1 keys = (self.add_key(t) for _ in range(count)) - payload = itertools.chain(*((f"k{k}",) + self.generate_val(t) for k in keys)) + payload = itertools.chain( + *((f"k{k}",) + self.generate_val(t, idx) for idx, k in enumerate(keys)) + ) filtered_payload = filter(lambda p: p is not None, payload) return (self.GROW_ACTINONS[t],) + tuple(filtered_payload), count @@ -417,6 +467,8 @@ def __init__( port=6379, keys=1000, val_size=50, + huge_value_count=5, + huge_value_size=100000, batch_size=100, max_multikey=5, dbcount=1, @@ -434,7 +486,15 @@ def __init__( unsupported_types.append(ValueType.JSON) # Cluster aio client doesn't support JSON self.cluster_mode = cluster_mode - self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types) + self.gen = CommandGenerator( + keys, + val_size, + huge_value_count, + huge_value_size, + batch_size, + max_multikey, + unsupported_types, + ) self.port = port self.dbcount = dbcount self.multi_transaction_probability = multi_transaction_probability diff --git a/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json b/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json index 88e0476eaae1..85f1cb9d3c45 100644 --- a/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json +++ b/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json @@ -1301,7 +1301,8 @@ "value": 80 } ] - } + }, + "unit": "s" }, "overrides": [] }, @@ -1311,7 +1312,7 @@ "x": 12, "y": 29 }, - "id": 13, + "id": 27, "options": { "alertThreshold": true, "legend": { @@ -1335,15 +1336,14 @@ "editorMode": "code", "exemplar": true, "expr": - "sum (dragonfly_db_keys{namespace=\"$namespace\",pod=~\"$pod_name\"}) - sum (dragonfly_db_keys_expiring{namespace=\"$namespace\",pod=~\"$pod_name\"}) ", + "irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", "format": "time_series", "interval": "", "intervalFactor": 2, - "legendFormat": "not expiring", + "legendFormat": "pipeline", "range": true, "refId": "A", - "step": 240, - "target": "" + "step": 240 }, { "datasource": { @@ -1352,18 +1352,37 @@ }, "editorMode": "code", "exemplar": true, - "expr": "sum (dragonfly_db_keys_expiring{namespace=\"$namespace\",pod=~\"$pod_name\"})", + "expr": + "irate(dragonfly_cmd_squash_hop_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", "format": "time_series", + "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "expiring", - "metric": "", + "legendFormat": "execute_hop", "range": true, "refId": "B", "step": 240 + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": true, + "expr": + "irate(dragonfly_cmd_squash_hop_reply_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "reply", + "range": true, + "refId": "C", + "step": 240 } ], - "title": "Expiring vs Not-Expiring Keys", + "title": "Pipeline Latency", "type": "timeseries" }, { @@ -1428,7 +1447,7 @@ "gridPos": { "h": 7, "w": 12, - "x": 12, + "x": 0, "y": 36 }, "id": 16, @@ -1466,19 +1485,6 @@ "title": "Dragonfly connected clients", "type": "timeseries" }, - { - "collapsed": false, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 43 - }, - "id": 19, - "panels": [], - "title": "Advanced metrics", - "type": "row" - }, { "datasource": { "type": "prometheus", @@ -1520,6 +1526,7 @@ "mode": "off" } }, + "links": [], "mappings": [], "thresholds": { "mode": "absolute", @@ -1533,19 +1540,19 @@ "value": 80 } ] - }, - "unit": "s" + } }, "overrides": [] }, "gridPos": { - "h": 8, + "h": 7, "w": 12, - "x": 0, - "y": 44 + "x": 12, + "y": 36 }, - "id": 18, + "id": 13, "options": { + "alertThreshold": true, "legend": { "calcs": [], "displayMode": "list", @@ -1564,17 +1571,18 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "disableTextWrap": false, "editorMode": "code", + "exemplar": true, "expr": - "irate(dragonfly_fiber_switch_delay_seconds_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/rate(dragonfly_fiber_switch_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", - "fullMetaSearch": false, - "includeNullMetadata": false, - "instant": false, - "legendFormat": "switch", + "sum (dragonfly_db_keys{namespace=\"$namespace\",pod=~\"$pod_name\"}) - sum (dragonfly_db_keys_expiring{namespace=\"$namespace\",pod=~\"$pod_name\"}) ", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "not expiring", "range": true, "refId": "A", - "useBackend": false + "step": 240, + "target": "" }, { "datasource": { @@ -1582,16 +1590,19 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": - "irate(dragonfly_fiber_longrun_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_fiber_longrun_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", - "hide": false, - "instant": false, - "legendFormat": "longrun", + "exemplar": true, + "expr": "sum (dragonfly_db_keys_expiring{namespace=\"$namespace\",pod=~\"$pod_name\"})", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "expiring", + "metric": "", "range": true, - "refId": "B" + "refId": "B", + "step": 240 } ], - "title": "FiberSwitchDelay", + "title": "Expiring vs Not-Expiring Keys", "type": "timeseries" }, { @@ -1655,8 +1666,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 44 + "x": 0, + "y": 43 }, "id": 22, "options": { @@ -1694,6 +1705,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "", "fieldConfig": { "defaults": { "color": { @@ -1747,13 +1759,123 @@ }, "overrides": [] }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 43 + }, + "id": 28, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.10", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": + "irate(dragonfly_cmd_squash_commands_total\n{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Average Squashing Length", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 51 + }, + "id": 19, + "panels": [], + "title": "Advanced metrics", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, "gridPos": { "h": 8, "w": 12, "x": 0, "y": 52 }, - "id": 21, + "id": 18, "options": { "legend": { "calcs": [], @@ -1773,13 +1895,17 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "disableTextWrap": false, "editorMode": "code", "expr": - "dragonfly_replication_full_sync_bytes{namespace=\"$namespace\",pod=~\"$pod_name\"}", + "irate(dragonfly_fiber_switch_delay_seconds_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/rate(dragonfly_fiber_switch_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": false, "instant": false, - "legendFormat": "fullsync", + "legendFormat": "switch", "range": true, - "refId": "A" + "refId": "A", + "useBackend": false }, { "datasource": { @@ -1788,15 +1914,15 @@ }, "editorMode": "code", "expr": - "dragonfly_replication_streaming_bytes{namespace=\"$namespace\",pod=~\"$pod_name\"}", + "irate(dragonfly_fiber_longrun_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_fiber_longrun_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", "hide": false, "instant": false, - "legendFormat": "stable_sync", + "legendFormat": "longrun", "range": true, "refId": "B" } ], - "title": "Master Replication memory", + "title": "FiberSwitchDelay", "type": "timeseries" }, { @@ -1853,8 +1979,7 @@ "value": 80 } ] - }, - "unit": "s" + } }, "overrides": [] }, @@ -1864,7 +1989,7 @@ "x": 12, "y": 52 }, - "id": 23, + "id": 21, "options": { "legend": { "calcs": [], @@ -1886,14 +2011,28 @@ }, "editorMode": "code", "expr": - "irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "dragonfly_replication_full_sync_bytes{namespace=\"$namespace\",pod=~\"$pod_name\"}", "instant": false, - "legendFormat": "{{pod}}", + "legendFormat": "fullsync", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": + "dragonfly_replication_streaming_bytes{namespace=\"$namespace\",pod=~\"$pod_name\"}", + "hide": false, + "instant": false, + "legendFormat": "stable_sync", + "range": true, + "refId": "B" } ], - "title": "Pipeline latency", + "title": "Master Replication memory", "type": "timeseries" } ],