Skip to content

Commit 280e689

Browse files
ricklavoiefacebook-github-bot
authored andcommitted
Rework SubprocessImpl for extern-worker
Summary: SubprocessImpl works, but is pretty slow. Most of this slowness comes down to how it stores blobs and it's input/output format. For blobs, it uses a single file per blob. This can result in a huge number of files, which stresses the file system. Moreover, the input/output format it uses has a lot of sub-directories filled with symlinks, which is again stressful. Rework SubprocessImpl to be more efficient. Instead of an individual file per blob, it keeps a pool of blob files. Blobs are just appended to the end of the next free blob file. Blob files are allocated per-thread, so there's no need for synchronization. As an additional optimization, blobs below a certain threshold aren't stored on disk at all, and are instead stored "inline" in the RefId. To allow for all of this, RefId now has a m_size and a m_extra field. m_size is *always* the blob size (which is useful), and can be used to distinguish inline from not. m_extra contains the offset of the blob within the blob file. To address input/output overhead, the notion of creating directories is discarded entirely. Instead the metadata is encoded in a string (using BlobEncoder and BlobDecoder), and sent over to the worker via stdin. Once the worker has done it's work and written any blobs to disk, it likewise returns it's output via a pipe, which the parent can read. This avoids writing to disk at all. Reviewed By: edwinsmith Differential Revision: D39153960 fbshipit-source-id: 933d10d39463e57897ba75e58e93b790d88be654
1 parent 2cb71fd commit 280e689

File tree

6 files changed

+1170
-379
lines changed

6 files changed

+1170
-379
lines changed

hphp/hhbbc/index.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5640,6 +5640,8 @@ materialize_inputs(const IndexData& index,
56405640
auto program = std::make_unique<php::Program>();
56415641

56425642
auto const loadAndParse = [&] (Chunk chunk) -> coro::Task<void> {
5643+
HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR;
5644+
56435645
auto [classes, funcs, units] = HPHP_CORO_AWAIT(coro::collect(
56445646
client->load(std::move(chunk.classes)),
56455647
client->load(std::move(chunk.funcs)),

hphp/util/blob-encoder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ struct BlobEncoder {
127127
void writeRaw(const char* ptr, size_t size) {
128128
auto const start = m_blob.size();
129129
m_blob.resize(start + size);
130-
std::copy(ptr, ptr + size, &m_blob[start]);
130+
std::copy(ptr, ptr + size, m_blob.data() + start);
131131
}
132132

133133
/*

hphp/util/extern-worker-detail.h

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,32 @@ auto typesToValues(F&& f) {
315315

316316
//////////////////////////////////////////////////////////////////////
317317

318+
// Abstracts away how a worker should obtain it's inputs, and write
319+
// it's outputs. These functions are tightly coupled with the logic in
320+
// JobBase::serialize and JobBase::deserialize.
321+
struct ISource {
322+
virtual ~ISource() = default;
323+
virtual std::string blob() = 0;
324+
virtual Optional<std::string> optBlob() = 0;
325+
virtual std::vector<std::string> variadic() = 0;
326+
virtual void initDone() = 0;
327+
virtual bool inputEnd() const = 0;
328+
virtual void nextInput() = 0;
329+
virtual void finish() = 0;
330+
};
331+
332+
struct ISink {
333+
virtual ~ISink() = default;
334+
virtual void blob(const std::string&) = 0;
335+
virtual void optBlob(const Optional<std::string>&) = 0;
336+
virtual void variadic(const std::vector<std::string>&) = 0;
337+
virtual void nextOutput() = 0;
338+
virtual void startFini() = 0;
339+
virtual void finish() = 0;
340+
};
341+
342+
//////////////////////////////////////////////////////////////////////
343+
318344
// Base class for Jobs. This provide a consistent interface to invoke
319345
// through.
320346
struct JobBase {
@@ -323,15 +349,16 @@ struct JobBase {
323349
explicit JobBase(const std::string& name);
324350
virtual ~JobBase() = default;
325351

326-
template <typename T> static T deserialize(const std::filesystem::path&);
327-
template <typename T> static void serialize(const T&,
328-
size_t,
329-
const std::filesystem::path&);
352+
template <typename T> static T deserialize(ISource&);
353+
template <typename T> static void serialize(const T&, ISink&);
330354

331355
private:
332-
virtual void init(const std::filesystem::path&) const = 0;
333-
virtual void fini(const std::filesystem::path&) const = 0;
334-
virtual void run(const std::filesystem::path&, const std::filesystem::path&) const = 0;
356+
template <typename T> static T deserializeBlob(std::string);
357+
template <typename T> static std::string serializeBlob(const T&);
358+
359+
virtual void init(ISource&) const = 0;
360+
virtual void fini(ISink&) const = 0;
361+
virtual void run(ISource&, ISink&) const = 0;
335362

336363
std::string m_name;
337364

hphp/util/extern-worker-inl.h

Lines changed: 99 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -43,63 +43,63 @@ Job<C>::Job() : detail::JobBase{C::name()} {}
4343
// expects, then invoke the function with those types.
4444

4545
template <typename C>
46-
void Job<C>::init(const std::filesystem::path& root) const {
46+
void Job<C>::init(detail::ISource& source) const {
4747
using namespace detail;
4848
using Args = typename Params<decltype(C::init)>::type;
4949

50-
// For each expected input, load the file, and deserialize it into
50+
// For each expected input, load it, and deserialize it into
5151
// the appropriate type. Return the types as a tuple, which can then
5252
// std::apply to C::init, passing the inputs.
53+
size_t DEBUG_ONLY nextIdx = 0;
5354
std::apply(
5455
C::init,
5556
typesToValues<Args>(
5657
[&] (size_t idx, auto tag) {
57-
return deserialize<typename decltype(tag)::Type>(
58-
root / folly::to<std::string>(idx)
59-
);
58+
assertx(idx == nextIdx++);
59+
return deserialize<typename decltype(tag)::Type>(source);
6060
}
6161
)
6262
);
63+
source.initDone();
6364

6465
using Ret = typename Return<decltype(C::init)>::type;
6566
static_assert(std::is_void_v<Ret>, "init() must return void");
6667
}
6768

6869
template <typename C>
69-
void Job<C>::fini(const std::filesystem::path& outputRoot) const {
70+
void Job<C>::fini(detail::ISink& sink) const {
7071
using namespace detail;
7172

73+
sink.startFini();
7274
using Ret = typename Return<decltype(C::fini)>::type;
7375
if constexpr (std::is_void_v<Ret>) {
7476
C::fini();
7577
} else {
76-
auto const output = outputRoot / "fini";
77-
std::filesystem::create_directory(output, outputRoot);
7878
auto const v = C::fini();
79-
time("writing fini outputs", [&] { return serialize(v, 0, output); });
79+
time("writing fini outputs", [&] { return serialize(v, sink); });
8080
}
8181
}
8282

8383
template <typename C>
84-
void Job<C>::run(const std::filesystem::path& inputRoot,
85-
const std::filesystem::path& outputRoot) const {
84+
void Job<C>::run(detail::ISource& source, detail::ISink& sink) const {
8685
using namespace detail;
8786

88-
// For each expected input, load the file, and deserialize it into
89-
// the appropriate type, turning all of the types into a tuple.
87+
// For each expected input, load it, and deserialize it into the
88+
// appropriate type, turning all of the types into a tuple.
9089
using Args = typename Params<decltype(C::run)>::type;
90+
size_t DEBUG_ONLY nextIdx = 0;
9191
auto inputs = time(
9292
"loading inputs",
9393
[&] {
9494
return typesToValues<Args>(
9595
[&] (size_t idx, auto tag) {
96-
return deserialize<typename decltype(tag)::Type>(
97-
inputRoot / folly::to<std::string>(idx)
98-
);
96+
assertx(idx == nextIdx++);
97+
return deserialize<typename decltype(tag)::Type>(source);
9998
}
10099
);
101100
}
102101
);
102+
source.nextInput();
103103

104104
// Apply the tuple to C::run, passing the types as parameters.
105105
auto outputs = time(
@@ -110,8 +110,9 @@ void Job<C>::run(const std::filesystem::path& inputRoot,
110110
using Ret = typename Return<decltype(C::run)>::type;
111111
static_assert(!std::is_void_v<Ret>, "run() must return something");
112112

113-
// Serialize the outputs into the output directory.
114-
time("writing outputs", [&] { return serialize(outputs, 0, outputRoot); });
113+
// Serialize the outputs
114+
time("writing outputs", [&] { return serialize(outputs, sink); });
115+
sink.nextOutput();
115116
}
116117

117118
//////////////////////////////////////////////////////////////////////
@@ -120,81 +121,99 @@ namespace detail {
120121

121122
//////////////////////////////////////////////////////////////////////
122123

123-
// Given a file path, load the contents of the file, deserialize them
124-
// into the type T, and return it.
124+
// Turn a blob into a specific (non-marker) type
125125
template <typename T>
126-
T JobBase::deserialize(const std::filesystem::path& path) {
126+
T JobBase::deserializeBlob(std::string blob) {
127127
using namespace detail;
128+
static_assert(!IsMarker<T>::value, "Special markers cannot be nested");
128129
if constexpr (std::is_same<T, std::string>::value) {
129130
// A std::string is always stored as itself (this lets us store
130-
// files as their contents without having to encode them).
131-
return readFile(path);
132-
} else if constexpr (IsVariadic<T>::value) {
131+
// files directly as their contents without having to encode
132+
// them).
133+
return blob;
134+
} else {
135+
// For most types, the data is encoded using BlobEncoder, so undo
136+
// that.
137+
BlobDecoder decoder{blob.data(), blob.size()};
138+
return decoder.makeWhole<T>();
139+
}
140+
}
141+
142+
// Deserialize the given input source into the type T and return
143+
// it. The type might include markers, which might trigger
144+
// sub-deserializations.
145+
template <typename T>
146+
T JobBase::deserialize(ISource& source) {
147+
using namespace detail;
148+
static_assert(!IsMulti<T>::value, "Multi can only be used as return type");
149+
150+
if constexpr (IsVariadic<T>::value) {
133151
static_assert(!IsMarker<typename T::Type>::value,
134152
"Special markers cannot be nested");
135-
// Variadic<T> is actually a directory, not a file. Recurse into
136-
// it, and do the deserialization for every file within it.
153+
auto const blobs = source.variadic();
137154
T out;
138-
for (size_t i = 0;; ++i) {
139-
auto const valPath = path / folly::to<std::string>(i);
140-
// A break in the numbering means the end of the vector.
141-
if (!std::filesystem::exists(valPath)) break;
142-
out.vals.emplace_back(deserialize<typename T::Type>(valPath));
155+
out.vals.reserve(blobs.size());
156+
for (auto const& blob : blobs) {
157+
out.vals.emplace_back(deserializeBlob<typename T::Type>(blob));
143158
}
144159
return out;
145160
} else if constexpr (IsOpt<T>::value) {
146161
static_assert(!IsMarker<typename T::Type>::value,
147162
"Special markers cannot be nested");
148-
// Opt<T> is like T, except the file may not exist (so is nullopt
163+
// Opt<T> is like T, except the data may not exist (so is nullopt
149164
// otherwise).
150165
T out;
151-
if (std::filesystem::exists(path)) {
152-
out.val.emplace(deserialize<typename T::Type>(path));
166+
if (auto const blob = source.optBlob()) {
167+
out.val.emplace(deserializeBlob<typename T::Type>(*blob));
153168
}
154169
return out;
155170
} else {
156-
// For most types, the data is encoded using BlobEncoder, so undo
157-
// that.
158-
static_assert(!IsMulti<T>::value, "Multi can only be used as return type");
159-
auto const data = readFile(path);
160-
BlobDecoder decoder{data.data(), data.size()};
161-
return decoder.makeWhole<T>();
171+
return deserializeBlob<T>(source.blob());
162172
}
163173
}
164174

165-
// Given a value, an index of that value (its positive in the output
166-
// values), and an output root, serialize the value, and write its
167-
// contents to the appropriate file.
175+
// Serialize the given (non-marker) value into a blob
168176
template <typename T>
169-
void JobBase::serialize(const T& v,
170-
size_t idx,
171-
const std::filesystem::path& root) {
177+
std::string JobBase::serializeBlob(const T& v) {
172178
using namespace detail;
179+
static_assert(!IsMarker<T>::value,
180+
"Special markers cannot be nested");
173181
if constexpr (std::is_same<T, std::string>::value) {
174-
// std::string isn't serialized, but written as itself as
175-
// root/idx.
176-
return writeFile(root / folly::to<std::string>(idx), v.data(), v.size());
177-
} else if constexpr (IsVariadic<T>::value) {
178-
// For Variadic<T>, we create a directory root/idx, and under it,
179-
// write a file for every element in the vector.
182+
// std::string always encodes to itself
183+
return v;
184+
} else {
185+
BlobEncoder encoder;
186+
encoder(v);
187+
return std::string{(const char*)encoder.data(), encoder.size()};
188+
}
189+
}
190+
191+
// Serialize the given value into a blob and write it out to the given
192+
// output sink. The value's type might be a marker, which can trigger
193+
// sub-serializations.
194+
template <typename T>
195+
void JobBase::serialize(const T& v, ISink& sink) {
196+
using namespace detail;
197+
if constexpr (IsVariadic<T>::value) {
180198
static_assert(!IsMarker<typename T::Type>::value,
181199
"Special markers cannot be nested");
182-
auto const path = root / folly::to<std::string>(idx);
183-
std::filesystem::create_directory(path, root);
184-
for (size_t i = 0; i < v.vals.size(); ++i) {
185-
serialize(v.vals[i], i, path);
186-
}
200+
using namespace folly::gen;
201+
auto const blobs = from(v.vals)
202+
| map([&] (const typename T::Type& t) { return serializeBlob(t); })
203+
| as<std::vector>();
204+
sink.variadic(blobs);
187205
} else if constexpr (IsOpt<T>::value) {
188206
// Opt<T> is like T, except nothing is written if the value isn't
189207
// present.
190208
static_assert(!IsMarker<typename T::Type>::value,
191209
"Special markers cannot be nested");
192-
if (!v.val.has_value()) return;
193-
serialize(*v.val, idx, root);
210+
sink.optBlob(
211+
v.val.has_value() ? serializeBlob(*v.val) : Optional<std::string>{}
212+
);
194213
} else if constexpr (IsMulti<T>::value) {
195214
// Treat Multi as equivalent to std::tuple (IE, write each element
196-
// to a separate file).
197-
assertx(idx == 0);
215+
// separately).
216+
size_t DEBUG_ONLY nextTupleIdx = 0;
198217
for_each(
199218
v.vals,
200219
[&] (auto const& elem, size_t tupleIdx) {
@@ -204,18 +223,12 @@ void JobBase::serialize(const T& v,
204223
>::value,
205224
"Multi cannot be nested"
206225
);
207-
serialize(elem, tupleIdx, root);
226+
assertx(tupleIdx == nextTupleIdx++);
227+
serialize(elem, sink);
208228
}
209229
);
210230
} else {
211-
// Most types are just encoded with BlobEncoder and written as
212-
// root/idx
213-
BlobEncoder encoder;
214-
encoder(v);
215-
writeFile(
216-
root / folly::to<std::string>(idx),
217-
(const char*)encoder.data(), encoder.size()
218-
);
231+
sink.blob(serializeBlob(v));
219232
}
220233
}
221234

@@ -225,24 +238,35 @@ void JobBase::serialize(const T& v,
225238

226239
//////////////////////////////////////////////////////////////////////
227240

228-
inline RefId::RefId(std::string id, size_t size)
229-
: m_id{std::move(id)}, m_size{size}
241+
inline RefId::RefId(std::string id, size_t size, size_t extra)
242+
: m_id{std::move(id)}, m_size{size}, m_extra{extra}
230243
{}
231244

232245
inline bool RefId::operator==(const RefId& o) const {
233-
return std::tie(m_id, m_size) == std::tie(o.m_id, o.m_size);
246+
return
247+
std::tie(m_id, m_extra, m_size) ==
248+
std::tie(o.m_id, o.m_extra, o.m_size);
234249
}
235250

236251
inline bool RefId::operator!=(const RefId& o) const {
237252
return !(*this == o);
238253
}
239254

240255
inline bool RefId::operator<(const RefId& o) const {
241-
return std::tie(m_size, m_id) < std::tie(o.m_size, o.m_id);
256+
return
257+
std::tie(m_id, m_extra, m_size) <
258+
std::tie(o.m_id, o.m_extra, o.m_size);
242259
}
243260

244261
inline std::string RefId::toString() const {
245-
return folly::sformat("{}:{}", m_id, m_size);
262+
// Don't print out the extra field if it's zero, to avoid clutter
263+
// for implementations which don't use it. The id might contain
264+
// binary data, so escape it before printing.
265+
if (m_extra) {
266+
return folly::sformat("{}:{}:{}", folly::humanify(m_id), m_extra, m_size);
267+
} else {
268+
return folly::sformat("{}:{}", folly::humanify(m_id), m_size);
269+
}
246270
}
247271

248272
//////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)