Skip to content

Commit 47bb048

Browse files
Bikramjeet Vigfacebook-github-bot
Bikramjeet Vig
authored andcommitted
Add option for admission control for filesystem resources (facebookincubator#10452)
Summary: Pull Request resolved: facebookincubator#10452 This change adds a generic admission controller class that can be used for filesystem resources like read bytes in flight or number of read requests in flight. It also provides a way to report stats for resource usage, queued count, queued wait times by allowing the client to specify a metric name. Reviewed By: kevinwilfong, Yuhta Differential Revision: D59643306 fbshipit-source-id: ddc4ea9273d3148c248cddf8a5543c5bf3df75bb
1 parent d95d29f commit 47bb048

5 files changed

+231
-0
lines changed

velox/common/base/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ velox_add_library(
2828
BitUtil.cpp
2929
Counters.cpp
3030
Fs.cpp
31+
GenericAdmissionController.cpp
3132
PeriodicStatsReporter.cpp
3233
RandomUtil.cpp
3334
RawVector.cpp
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/common/base/GenericAdmissionController.h"
18+
19+
#include "velox/common/base/Exceptions.h"
20+
#include "velox/common/base/StatsReporter.h"
21+
#include "velox/common/time/Timer.h"
22+
23+
namespace facebook::velox::common {
24+
25+
void GenericAdmissionController::accept(uint64_t resourceUnits) {
26+
ContinueFuture future;
27+
uint64_t updatedValue = 0;
28+
VELOX_CHECK_LE(
29+
resourceUnits,
30+
config_.maxLimit,
31+
"A single request cannot exceed the max limit");
32+
{
33+
std::lock_guard<std::mutex> l(mtx_);
34+
if (unitsUsed_ + resourceUnits > config_.maxLimit) {
35+
auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract();
36+
Request req;
37+
req.unitsRequested = resourceUnits;
38+
req.promise = std::move(unblockPromise);
39+
queue_.push_back(std::move(req));
40+
future = std::move(unblockFuture);
41+
} else {
42+
updatedValue = unitsUsed_ += resourceUnits;
43+
}
44+
}
45+
if (future.valid()) {
46+
if (!config_.resourceQueuedCountMetric.empty()) {
47+
RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric);
48+
}
49+
uint64_t waitTimeUs{0};
50+
{
51+
MicrosecondTimer timer(&waitTimeUs);
52+
future.wait();
53+
}
54+
if (!config_.resourceQueuedTimeMsMetric.empty()) {
55+
RECORD_HISTOGRAM_METRIC_VALUE(
56+
config_.resourceQueuedTimeMsMetric, waitTimeUs / 1'000);
57+
}
58+
return;
59+
}
60+
// Only upadate if there was no wait, as the releasing thread is responsible
61+
// for updating the metric.
62+
if (!config_.resourceUsageMetric.empty()) {
63+
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
64+
}
65+
}
66+
67+
void GenericAdmissionController::release(uint64_t resourceUnits) {
68+
uint64_t updatedValue = 0;
69+
{
70+
std::lock_guard<std::mutex> l(mtx_);
71+
VELOX_CHECK_LE(
72+
resourceUnits,
73+
unitsUsed_,
74+
"Cannot release more units than have been acquired");
75+
unitsUsed_ -= resourceUnits;
76+
while (!queue_.empty()) {
77+
auto& request = queue_.front();
78+
if (unitsUsed_ + request.unitsRequested > config_.maxLimit) {
79+
break;
80+
}
81+
unitsUsed_ += request.unitsRequested;
82+
request.promise.setValue();
83+
queue_.pop_front();
84+
}
85+
updatedValue = unitsUsed_;
86+
}
87+
if (!config_.resourceUsageMetric.empty()) {
88+
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
89+
}
90+
}
91+
} // namespace facebook::velox::common
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <deque>
19+
#include <mutex>
20+
#include "velox/common/future/VeloxPromise.h"
21+
22+
namespace facebook::velox::common {
23+
24+
/// A generic admission controller that can be used to limit the number of
25+
/// resources in use and can log metrics like resource usage, queued count,
26+
/// queued wait times. When a calling thread's request for resources surpasses
27+
/// the set limit, it will be placed in a FIFO queue. The thread must then wait
28+
/// until sufficient resources are freed by other threads, addressing all
29+
/// preceding requests in the queue, before its own request can be granted.
30+
class GenericAdmissionController {
31+
public:
32+
struct Config {
33+
uint64_t maxLimit;
34+
/// The metric name for resource usage. If not set, it will not be reported.
35+
std::string resourceUsageMetric;
36+
/// The metric name for resource queued count. If not set, it will not be
37+
/// reported
38+
std::string resourceQueuedCountMetric;
39+
/// The metric name for resource queued wait time. If not set, it will not
40+
/// be reported
41+
std::string resourceQueuedTimeMsMetric;
42+
};
43+
explicit GenericAdmissionController(Config config) : config_(config) {}
44+
45+
void accept(uint64_t resourceUnits);
46+
void release(uint64_t resourceUnits);
47+
48+
uint64_t currentResourceUsage() const {
49+
std::lock_guard<std::mutex> l(mtx_);
50+
return unitsUsed_;
51+
}
52+
53+
private:
54+
struct Request {
55+
uint64_t unitsRequested;
56+
ContinuePromise promise;
57+
};
58+
Config config_;
59+
mutable std::mutex mtx_;
60+
uint64_t unitsUsed_{0};
61+
std::deque<Request> queue_;
62+
};
63+
} // namespace facebook::velox::common

velox/common/base/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ add_executable(
2121
ConcurrentCounterTest.cpp
2222
ExceptionTest.cpp
2323
FsTest.cpp
24+
GenericAdmissionControllerTest.cpp
2425
RangeTest.cpp
2526
RawVectorTest.cpp
2627
RuntimeMetricsTest.cpp
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/common/base/GenericAdmissionController.h"
18+
#include <gtest/gtest.h>
19+
#include <atomic>
20+
#include "velox/common/base/VeloxException.h"
21+
#include "velox/common/base/tests/GTestUtils.h"
22+
23+
using namespace facebook::velox;
24+
namespace facebook::velox::common {
25+
TEST(GenericAdmissionController, basic) {
26+
const uint64_t kLimit = 100000;
27+
GenericAdmissionController::Config config;
28+
config.maxLimit = kLimit;
29+
GenericAdmissionController admissionController(config);
30+
EXPECT_EQ(admissionController.currentResourceUsage(), 0);
31+
32+
admissionController.accept(100);
33+
EXPECT_EQ(admissionController.currentResourceUsage(), 100);
34+
35+
admissionController.accept(100);
36+
EXPECT_EQ(admissionController.currentResourceUsage(), 200);
37+
38+
admissionController.release(100);
39+
EXPECT_EQ(admissionController.currentResourceUsage(), 100);
40+
41+
VELOX_ASSERT_THROW(
42+
admissionController.release(101),
43+
"Cannot release more units than have been acquired");
44+
45+
VELOX_ASSERT_THROW(
46+
admissionController.accept(kLimit + 1),
47+
"A single request cannot exceed the max limit");
48+
}
49+
50+
TEST(GenericAdmissionController, multiThreaded) {
51+
// Ensure that resource usage never exceeds the limit set in the admission
52+
// controller.
53+
const uint64_t kLimit = 10;
54+
std::atomic_uint64_t currentUsage{0};
55+
GenericAdmissionController::Config config;
56+
config.maxLimit = kLimit;
57+
GenericAdmissionController admissionController(config);
58+
59+
std::vector<std::thread> threads;
60+
for (int i = 0; i < 20; i++) {
61+
threads.push_back(std::thread([&]() {
62+
for (int j = 0; j < 1000; j++) {
63+
admissionController.accept(1);
64+
uint64_t curr = currentUsage.fetch_add(1);
65+
ASSERT_LE(curr + 1, kLimit);
66+
currentUsage--;
67+
admissionController.release(1);
68+
}
69+
}));
70+
}
71+
for (auto& thread : threads) {
72+
thread.join();
73+
}
74+
}
75+
} // namespace facebook::velox::common

0 commit comments

Comments
 (0)