rustc_thread_pool/scope/
mod.rs

1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use std::any::Any;
9use std::collections::HashSet;
10use std::marker::PhantomData;
11use std::mem::ManuallyDrop;
12use std::sync::atomic::{AtomicPtr, Ordering};
13use std::sync::{Arc, Mutex};
14use std::{fmt, ptr};
15
16use crate::broadcast::BroadcastContext;
17use crate::job::{ArcJob, HeapJob, JobFifo, JobRef, JobRefId};
18use crate::latch::{CountLatch, Latch};
19use crate::registry::{Registry, WorkerThread, global_registry, in_worker};
20use crate::tlv::{self, Tlv};
21use crate::unwind;
22
23#[cfg(test)]
24mod tests;
25
26/// Represents a fork-join scope which can be used to spawn any number of tasks.
27/// See [`scope()`] for more information.
28///
29///[`scope()`]: fn.scope.html
30pub struct Scope<'scope> {
31    base: ScopeBase<'scope>,
32}
33
34/// Represents a fork-join scope which can be used to spawn any number of tasks.
35/// Those spawned from the same thread are prioritized in relative FIFO order.
36/// See [`scope_fifo()`] for more information.
37///
38///[`scope_fifo()`]: fn.scope_fifo.html
39pub struct ScopeFifo<'scope> {
40    base: ScopeBase<'scope>,
41    fifos: Vec<JobFifo>,
42}
43
44struct ScopeBase<'scope> {
45    /// thread registry where `scope()` was executed or where `in_place_scope()`
46    /// should spawn jobs.
47    registry: Arc<Registry>,
48
49    /// if some job panicked, the error is stored here; it will be
50    /// propagated to the one who created the scope
51    panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
52
53    /// latch to track job counts
54    job_completed_latch: CountLatch,
55
56    /// Jobs that have been spawned, but not yet started.
57    #[allow(rustc::default_hash_types)]
58    pending_jobs: Mutex<HashSet<JobRefId>>,
59
60    /// The worker which will wait on scope completion, if any.
61    worker: Option<usize>,
62
63    /// You can think of a scope as containing a list of closures to execute,
64    /// all of which outlive `'scope`. They're not actually required to be
65    /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
66    /// the closures are only *moved* across threads to be executed.
67    #[allow(clippy::type_complexity)]
68    marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
69
70    /// The TLV at the scope's creation. Used to set the TLV for spawned jobs.
71    tlv: Tlv,
72}
73
74/// Creates a "fork-join" scope `s` and invokes the closure with a
75/// reference to `s`. This closure can then spawn asynchronous tasks
76/// into `s`. Those tasks may run asynchronously with respect to the
77/// closure; they may themselves spawn additional tasks into `s`. When
78/// the closure returns, it will block until all tasks that have been
79/// spawned into `s` complete.
80///
81/// `scope()` is a more flexible building block compared to `join()`,
82/// since a loop can be used to spawn any number of tasks without
83/// recursing. However, that flexibility comes at a performance price:
84/// tasks spawned using `scope()` must be allocated onto the heap,
85/// whereas `join()` can make exclusive use of the stack. **Prefer
86/// `join()` (or, even better, parallel iterators) where possible.**
87///
88/// # Example
89///
90/// The Rayon `join()` function launches two closures and waits for them
91/// to stop. One could implement `join()` using a scope like so, although
92/// it would be less efficient than the real implementation:
93///
94/// ```rust
95/// # use rustc_thread_pool as rayon;
96/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
97///     where A: FnOnce() -> RA + Send,
98///           B: FnOnce() -> RB + Send,
99///           RA: Send,
100///           RB: Send,
101/// {
102///     let mut result_a: Option<RA> = None;
103///     let mut result_b: Option<RB> = None;
104///     rayon::scope(|s| {
105///         s.spawn(|_| result_a = Some(oper_a()));
106///         s.spawn(|_| result_b = Some(oper_b()));
107///     });
108///     (result_a.unwrap(), result_b.unwrap())
109/// }
110/// ```
111///
112/// # A note on threading
113///
114/// The closure given to `scope()` executes in the Rayon thread-pool,
115/// as do those given to `spawn()`. This means that you can't access
116/// thread-local variables (well, you can, but they may have
117/// unexpected values).
118///
119/// # Task execution
120///
121/// Task execution potentially starts as soon as `spawn()` is called.
122/// The task will end sometime before `scope()` returns. Note that the
123/// *closure* given to scope may return much earlier. In general
124/// the lifetime of a scope created like `scope(body)` goes something like this:
125///
126/// - Scope begins when `scope(body)` is called
127/// - Scope body `body()` is invoked
128///     - Scope tasks may be spawned
129/// - Scope body returns
130/// - Scope tasks execute, possibly spawning more tasks
131/// - Once all tasks are done, scope ends and `scope()` returns
132///
133/// To see how and when tasks are joined, consider this example:
134///
135/// ```rust
136/// # use rustc_thread_pool as rayon;
137/// // point start
138/// rayon::scope(|s| {
139///     s.spawn(|s| { // task s.1
140///         s.spawn(|s| { // task s.1.1
141///             rayon::scope(|t| {
142///                 t.spawn(|_| ()); // task t.1
143///                 t.spawn(|_| ()); // task t.2
144///             });
145///         });
146///     });
147///     s.spawn(|s| { // task s.2
148///     });
149///     // point mid
150/// });
151/// // point end
152/// ```
153///
154/// The various tasks that are run will execute roughly like so:
155///
156/// ```notrust
157/// | (start)
158/// |
159/// | (scope `s` created)
160/// +-----------------------------------------------+ (task s.2)
161/// +-------+ (task s.1)                            |
162/// |       |                                       |
163/// |       +---+ (task s.1.1)                      |
164/// |       |   |                                   |
165/// |       |   | (scope `t` created)               |
166/// |       |   +----------------+ (task t.2)       |
167/// |       |   +---+ (task t.1) |                  |
168/// | (mid) |   |   |            |                  |
169/// :       |   + <-+------------+ (scope `t` ends) |
170/// :       |   |                                   |
171/// |<------+---+-----------------------------------+ (scope `s` ends)
172/// |
173/// | (end)
174/// ```
175///
176/// The point here is that everything spawned into scope `s` will
177/// terminate (at latest) at the same point -- right before the
178/// original call to `rayon::scope` returns. This includes new
179/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
180/// scope is created (such as `t`), the things spawned into that scope
181/// will be joined before that scope returns, which in turn occurs
182/// before the creating task (task `s.1.1` in this case) finishes.
183///
184/// There is no guaranteed order of execution for spawns in a scope,
185/// given that other threads may steal tasks at any time. However, they
186/// are generally prioritized in a LIFO order on the thread from which
187/// they were spawned. So in this example, absent any stealing, we can
188/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
189/// threads always steal from the other end of the deque, like FIFO
190/// order. The idea is that "recent" tasks are most likely to be fresh
191/// in the local CPU's cache, while other threads can steal older
192/// "stale" tasks. For an alternate approach, consider
193/// [`scope_fifo()`] instead.
194///
195/// [`scope_fifo()`]: fn.scope_fifo.html
196///
197/// # Accessing stack data
198///
199/// In general, spawned tasks may access stack data in place that
200/// outlives the scope itself. Other data must be fully owned by the
201/// spawned task.
202///
203/// ```rust
204/// # use rustc_thread_pool as rayon;
205/// let ok: Vec<i32> = vec![1, 2, 3];
206/// rayon::scope(|s| {
207///     let bad: Vec<i32> = vec![4, 5, 6];
208///     s.spawn(|_| {
209///         // We can access `ok` because outlives the scope `s`.
210///         println!("ok: {:?}", ok);
211///
212///         // If we just try to use `bad` here, the closure will borrow `bad`
213///         // (because we are just printing it out, and that only requires a
214///         // borrow), which will result in a compilation error. Read on
215///         // for options.
216///         // println!("bad: {:?}", bad);
217///    });
218/// });
219/// ```
220///
221/// As the comments example above suggest, to reference `bad` we must
222/// take ownership of it. One way to do this is to detach the closure
223/// from the surrounding stack frame, using the `move` keyword. This
224/// will cause it to take ownership of *all* the variables it touches,
225/// in this case including both `ok` *and* `bad`:
226///
227/// ```rust
228/// # use rustc_thread_pool as rayon;
229/// let ok: Vec<i32> = vec![1, 2, 3];
230/// rayon::scope(|s| {
231///     let bad: Vec<i32> = vec![4, 5, 6];
232///     s.spawn(move |_| {
233///         println!("ok: {:?}", ok);
234///         println!("bad: {:?}", bad);
235///     });
236///
237///     // That closure is fine, but now we can't use `ok` anywhere else,
238///     // since it is owned by the previous task:
239///     // s.spawn(|_| println!("ok: {:?}", ok));
240/// });
241/// ```
242///
243/// While this works, it could be a problem if we want to use `ok` elsewhere.
244/// There are two choices. We can keep the closure as a `move` closure, but
245/// instead of referencing the variable `ok`, we create a shadowed variable that
246/// is a borrow of `ok` and capture *that*:
247///
248/// ```rust
249/// # use rustc_thread_pool as rayon;
250/// let ok: Vec<i32> = vec![1, 2, 3];
251/// rayon::scope(|s| {
252///     let bad: Vec<i32> = vec![4, 5, 6];
253///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
254///     s.spawn(move |_| {
255///         println!("ok: {:?}", ok); // captures the shadowed version
256///         println!("bad: {:?}", bad);
257///     });
258///
259///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
260///     // can be shared freely. Note that we need a `move` closure here though,
261///     // because otherwise we'd be trying to borrow the shadowed `ok`,
262///     // and that doesn't outlive `scope`.
263///     s.spawn(move |_| println!("ok: {:?}", ok));
264/// });
265/// ```
266///
267/// Another option is not to use the `move` keyword but instead to take ownership
268/// of individual variables:
269///
270/// ```rust
271/// # use rustc_thread_pool as rayon;
272/// let ok: Vec<i32> = vec![1, 2, 3];
273/// rayon::scope(|s| {
274///     let bad: Vec<i32> = vec![4, 5, 6];
275///     s.spawn(|_| {
276///         // Transfer ownership of `bad` into a local variable (also named `bad`).
277///         // This will force the closure to take ownership of `bad` from the environment.
278///         let bad = bad;
279///         println!("ok: {:?}", ok); // `ok` is only borrowed.
280///         println!("bad: {:?}", bad); // refers to our local variable, above.
281///     });
282///
283///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
284/// });
285/// ```
286///
287/// # Panics
288///
289/// If a panic occurs, either in the closure given to `scope()` or in
290/// any of the spawned jobs, that panic will be propagated and the
291/// call to `scope()` will panic. If multiple panics occurs, it is
292/// non-deterministic which of their panic values will propagate.
293/// Regardless, once a task is spawned using `scope.spawn()`, it will
294/// execute, even if the spawning task should later panic. `scope()`
295/// returns once all spawned jobs have completed, and any panics are
296/// propagated at that point.
297pub fn scope<'scope, OP, R>(op: OP) -> R
298where
299    OP: FnOnce(&Scope<'scope>) -> R + Send,
300    R: Send,
301{
302    in_worker(|owner_thread, _| {
303        let scope = Scope::<'scope>::new(Some(owner_thread), None);
304        scope.base.complete(Some(owner_thread), || op(&scope))
305    })
306}
307
308/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
309/// closure with a reference to `s`. This closure can then spawn
310/// asynchronous tasks into `s`. Those tasks may run asynchronously with
311/// respect to the closure; they may themselves spawn additional tasks
312/// into `s`. When the closure returns, it will block until all tasks
313/// that have been spawned into `s` complete.
314///
315/// # Task execution
316///
317/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
318/// difference in the order of execution. Consider a similar example:
319///
320/// [`scope()`]: fn.scope.html
321///
322/// ```rust
323/// # use rustc_thread_pool as rayon;
324/// // point start
325/// rayon::scope_fifo(|s| {
326///     s.spawn_fifo(|s| { // task s.1
327///         s.spawn_fifo(|s| { // task s.1.1
328///             rayon::scope_fifo(|t| {
329///                 t.spawn_fifo(|_| ()); // task t.1
330///                 t.spawn_fifo(|_| ()); // task t.2
331///             });
332///         });
333///     });
334///     s.spawn_fifo(|s| { // task s.2
335///     });
336///     // point mid
337/// });
338/// // point end
339/// ```
340///
341/// The various tasks that are run will execute roughly like so:
342///
343/// ```notrust
344/// | (start)
345/// |
346/// | (FIFO scope `s` created)
347/// +--------------------+ (task s.1)
348/// +-------+ (task s.2) |
349/// |       |            +---+ (task s.1.1)
350/// |       |            |   |
351/// |       |            |   | (FIFO scope `t` created)
352/// |       |            |   +----------------+ (task t.1)
353/// |       |            |   +---+ (task t.2) |
354/// | (mid) |            |   |   |            |
355/// :       |            |   + <-+------------+ (scope `t` ends)
356/// :       |            |   |
357/// |<------+------------+---+ (scope `s` ends)
358/// |
359/// | (end)
360/// ```
361///
362/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
363/// the thread from which they were spawned, as opposed to `scope()`'s
364/// LIFO. So in this example, we can expect `s.1` to execute before
365/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
366/// FIFO order, as usual. Overall, this has roughly the same order as
367/// the now-deprecated [`breadth_first`] option, except the effect is
368/// isolated to a particular scope. If spawns are intermingled from any
369/// combination of `scope()` and `scope_fifo()`, or from different
370/// threads, their order is only specified with respect to spawns in the
371/// same scope and thread.
372///
373/// For more details on this design, see Rayon [RFC #1].
374///
375/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
376/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
377///
378/// # Panics
379///
380/// If a panic occurs, either in the closure given to `scope_fifo()` or
381/// in any of the spawned jobs, that panic will be propagated and the
382/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
383/// non-deterministic which of their panic values will propagate.
384/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
385/// will execute, even if the spawning task should later panic.
386/// `scope_fifo()` returns once all spawned jobs have completed, and any
387/// panics are propagated at that point.
388pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
389where
390    OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
391    R: Send,
392{
393    in_worker(|owner_thread, _| {
394        let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
395        scope.base.complete(Some(owner_thread), || op(&scope))
396    })
397}
398
399/// Creates a "fork-join" scope `s` and invokes the closure with a
400/// reference to `s`. This closure can then spawn asynchronous tasks
401/// into `s`. Those tasks may run asynchronously with respect to the
402/// closure; they may themselves spawn additional tasks into `s`. When
403/// the closure returns, it will block until all tasks that have been
404/// spawned into `s` complete.
405///
406/// This is just like `scope()` except the closure runs on the same thread
407/// that calls `in_place_scope()`. Only work that it spawns runs in the
408/// thread pool.
409///
410/// # Panics
411///
412/// If a panic occurs, either in the closure given to `in_place_scope()` or in
413/// any of the spawned jobs, that panic will be propagated and the
414/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
415/// non-deterministic which of their panic values will propagate.
416/// Regardless, once a task is spawned using `scope.spawn()`, it will
417/// execute, even if the spawning task should later panic. `in_place_scope()`
418/// returns once all spawned jobs have completed, and any panics are
419/// propagated at that point.
420pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
421where
422    OP: FnOnce(&Scope<'scope>) -> R,
423{
424    do_in_place_scope(None, op)
425}
426
427pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
428where
429    OP: FnOnce(&Scope<'scope>) -> R,
430{
431    let thread = unsafe { WorkerThread::current().as_ref() };
432    let scope = Scope::<'scope>::new(thread, registry);
433    scope.base.complete(thread, || op(&scope))
434}
435
436/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
437/// closure with a reference to `s`. This closure can then spawn
438/// asynchronous tasks into `s`. Those tasks may run asynchronously with
439/// respect to the closure; they may themselves spawn additional tasks
440/// into `s`. When the closure returns, it will block until all tasks
441/// that have been spawned into `s` complete.
442///
443/// This is just like `scope_fifo()` except the closure runs on the same thread
444/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
445/// thread pool.
446///
447/// # Panics
448///
449/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
450/// any of the spawned jobs, that panic will be propagated and the
451/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
452/// non-deterministic which of their panic values will propagate.
453/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
454/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
455/// returns once all spawned jobs have completed, and any panics are
456/// propagated at that point.
457pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
458where
459    OP: FnOnce(&ScopeFifo<'scope>) -> R,
460{
461    do_in_place_scope_fifo(None, op)
462}
463
464pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
465where
466    OP: FnOnce(&ScopeFifo<'scope>) -> R,
467{
468    let thread = unsafe { WorkerThread::current().as_ref() };
469    let scope = ScopeFifo::<'scope>::new(thread, registry);
470    scope.base.complete(thread, || op(&scope))
471}
472
473impl<'scope> Scope<'scope> {
474    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
475        let base = ScopeBase::new(owner, registry);
476        Scope { base }
477    }
478
479    /// Spawns a job into the fork-join scope `self`. This job will
480    /// execute sometime before the fork-join scope completes. The
481    /// job is specified as a closure, and this closure receives its
482    /// own reference to the scope `self` as argument. This can be
483    /// used to inject new jobs into `self`.
484    ///
485    /// # Returns
486    ///
487    /// Nothing. The spawned closures cannot pass back values to the
488    /// caller directly, though they can write to local variables on
489    /// the stack (if those variables outlive the scope) or
490    /// communicate through shared channels.
491    ///
492    /// (The intention is to eventually integrate with Rust futures to
493    /// support spawns of functions that compute a value.)
494    ///
495    /// # Examples
496    ///
497    /// ```rust
498    /// # use rustc_thread_pool as rayon;
499    /// let mut value_a = None;
500    /// let mut value_b = None;
501    /// let mut value_c = None;
502    /// rayon::scope(|s| {
503    ///     s.spawn(|s1| {
504    ///           // ^ this is the same scope as `s`; this handle `s1`
505    ///           //   is intended for use by the spawned task,
506    ///           //   since scope handles cannot cross thread boundaries.
507    ///
508    ///         value_a = Some(22);
509    ///
510    ///         // the scope `s` will not end until all these tasks are done
511    ///         s1.spawn(|_| {
512    ///             value_b = Some(44);
513    ///         });
514    ///     });
515    ///
516    ///     s.spawn(|_| {
517    ///         value_c = Some(66);
518    ///     });
519    /// });
520    /// assert_eq!(value_a, Some(22));
521    /// assert_eq!(value_b, Some(44));
522    /// assert_eq!(value_c, Some(66));
523    /// ```
524    ///
525    /// # See also
526    ///
527    /// The [`scope` function] has more extensive documentation about
528    /// task spawning.
529    ///
530    /// [`scope` function]: fn.scope.html
531    pub fn spawn<BODY>(&self, body: BODY)
532    where
533        BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
534    {
535        let scope_ptr = ScopePtr(self);
536        let job = HeapJob::new(self.base.tlv, move |id| unsafe {
537            // SAFETY: this job will execute before the scope ends.
538            let scope = scope_ptr.as_ref();
539
540            // Mark this job is started.
541            scope.base.pending_jobs.lock().unwrap().remove(&id);
542
543            ScopeBase::execute_job(&scope.base, move || body(scope))
544        });
545        let job_ref = self.base.heap_job_ref(job);
546
547        // Mark this job as pending.
548        self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
549        // Since `Scope` implements `Sync`, we can't be sure that we're still in a
550        // thread of this pool, so we can't just push to the local worker thread.
551        // Also, this might be an in-place scope.
552        self.base.registry.inject_or_push(job_ref);
553    }
554
555    /// Spawns a job into every thread of the fork-join scope `self`. This job will
556    /// execute on each thread sometime before the fork-join scope completes. The
557    /// job is specified as a closure, and this closure receives its own reference
558    /// to the scope `self` as argument, as well as a `BroadcastContext`.
559    pub fn spawn_broadcast<BODY>(&self, body: BODY)
560    where
561        BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
562    {
563        let scope_ptr = ScopePtr(self);
564        let job = ArcJob::new(move |id| unsafe {
565            // SAFETY: this job will execute before the scope ends.
566            let scope = scope_ptr.as_ref();
567            let body = &body;
568
569            let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
570            if current_index == scope.base.worker {
571                // Mark this job as started on the scope's worker thread.
572                scope.base.pending_jobs.lock().unwrap().remove(&id);
573            }
574
575            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
576            ScopeBase::execute_job(&scope.base, func)
577        });
578        self.base.inject_broadcast(job)
579    }
580}
581
582impl<'scope> ScopeFifo<'scope> {
583    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
584        let base = ScopeBase::new(owner, registry);
585        let num_threads = base.registry.num_threads();
586        let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
587        ScopeFifo { base, fifos }
588    }
589
590    /// Spawns a job into the fork-join scope `self`. This job will
591    /// execute sometime before the fork-join scope completes. The
592    /// job is specified as a closure, and this closure receives its
593    /// own reference to the scope `self` as argument. This can be
594    /// used to inject new jobs into `self`.
595    ///
596    /// # See also
597    ///
598    /// This method is akin to [`Scope::spawn()`], but with a FIFO
599    /// priority. The [`scope_fifo` function] has more details about
600    /// this distinction.
601    ///
602    /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
603    /// [`scope_fifo` function]: fn.scope_fifo.html
604    pub fn spawn_fifo<BODY>(&self, body: BODY)
605    where
606        BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
607    {
608        let scope_ptr = ScopePtr(self);
609        let job = HeapJob::new(self.base.tlv, move |id| unsafe {
610            // SAFETY: this job will execute before the scope ends.
611            let scope = scope_ptr.as_ref();
612
613            // Mark this job is started.
614            scope.base.pending_jobs.lock().unwrap().remove(&id);
615
616            ScopeBase::execute_job(&scope.base, move || body(scope))
617        });
618        let job_ref = self.base.heap_job_ref(job);
619
620        // Mark this job as pending.
621        self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
622
623        // Since `ScopeFifo` implements `Sync`, we can't be sure that we're still in a
624        // thread of this pool, so we can't just push to the local worker thread.
625        // Also, this might be an in-place scope.
626        self.base.registry.inject_or_push(job_ref);
627    }
628
629    /// Spawns a job into every thread of the fork-join scope `self`. This job will
630    /// execute on each thread sometime before the fork-join scope completes. The
631    /// job is specified as a closure, and this closure receives its own reference
632    /// to the scope `self` as argument, as well as a `BroadcastContext`.
633    pub fn spawn_broadcast<BODY>(&self, body: BODY)
634    where
635        BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
636    {
637        let scope_ptr = ScopePtr(self);
638        let job = ArcJob::new(move |id| unsafe {
639            // SAFETY: this job will execute before the scope ends.
640            let scope = scope_ptr.as_ref();
641
642            let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
643            if current_index == scope.base.worker {
644                // Mark this job as started on the scope's worker thread.
645                scope.base.pending_jobs.lock().unwrap().remove(&id);
646            }
647            let body = &body;
648            let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
649            ScopeBase::execute_job(&scope.base, func)
650        });
651        self.base.inject_broadcast(job)
652    }
653}
654
655impl<'scope> ScopeBase<'scope> {
656    /// Creates the base of a new scope for the given registry
657    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
658        let registry = registry.unwrap_or_else(|| match owner {
659            Some(owner) => owner.registry(),
660            None => global_registry(),
661        });
662
663        ScopeBase {
664            registry: Arc::clone(registry),
665            panic: AtomicPtr::new(ptr::null_mut()),
666            job_completed_latch: CountLatch::new(owner),
667            #[allow(rustc::default_hash_types)]
668            pending_jobs: Mutex::new(HashSet::new()),
669            worker: owner.map(|w| w.index()),
670            marker: PhantomData,
671            tlv: tlv::get(),
672        }
673    }
674
675    fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
676    where
677        FUNC: FnOnce(JobRefId) + Send + 'scope,
678    {
679        unsafe {
680            self.job_completed_latch.increment();
681            job.into_job_ref()
682        }
683    }
684
685    fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
686    where
687        FUNC: Fn(JobRefId) + Send + Sync + 'scope,
688    {
689        if self.worker.is_some() {
690            let id = unsafe { ArcJob::as_job_ref(&job).id() };
691            self.pending_jobs.lock().unwrap().insert(id);
692        }
693        let n_threads = self.registry.num_threads();
694        let job_refs = (0..n_threads).map(|_| unsafe {
695            self.job_completed_latch.increment();
696            ArcJob::as_job_ref(&job)
697        });
698
699        self.registry.inject_broadcast(job_refs);
700    }
701
702    /// Executes `func` as a job, either aborting or executing as
703    /// appropriate.
704    fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
705    where
706        FUNC: FnOnce() -> R,
707    {
708        let result = unsafe { Self::execute_job_closure(self, func) };
709        self.job_completed_latch.wait(
710            owner,
711            || self.pending_jobs.lock().unwrap().is_empty(),
712            |job| self.pending_jobs.lock().unwrap().contains(&job.id()),
713        );
714
715        // Restore the TLV if we ran some jobs while waiting
716        tlv::set(self.tlv);
717
718        self.maybe_propagate_panic();
719        result.unwrap() // only None if `op` panicked, and that would have been propagated
720    }
721
722    /// Executes `func` as a job, either aborting or executing as
723    /// appropriate.
724    unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
725    where
726        FUNC: FnOnce(),
727    {
728        let _: Option<()> = unsafe { Self::execute_job_closure(this, func) };
729    }
730
731    /// Executes `func` as a job in scope. Adjusts the "job completed"
732    /// counters and also catches any panic and stores it into
733    /// `scope`.
734    unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
735    where
736        FUNC: FnOnce() -> R,
737    {
738        let result = match unwind::halt_unwinding(func) {
739            Ok(r) => Some(r),
740            Err(err) => {
741                unsafe { (*this).job_panicked(err) };
742                None
743            }
744        };
745        unsafe { Latch::set(&(*this).job_completed_latch) };
746        result
747    }
748
749    fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
750        // capture the first error we see, free the rest
751        if self.panic.load(Ordering::Relaxed).is_null() {
752            let nil = ptr::null_mut();
753            let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
754            let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
755            if self
756                .panic
757                .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
758                .is_ok()
759            {
760                // ownership now transferred into self.panic
761            } else {
762                // another panic raced in ahead of us, so drop ours
763                let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
764            }
765        }
766    }
767
768    fn maybe_propagate_panic(&self) {
769        // propagate panic, if any occurred; at this point, all
770        // outstanding jobs have completed, so we can use a relaxed
771        // ordering:
772        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
773        if !panic.is_null() {
774            let value = unsafe { Box::from_raw(panic) };
775
776            // Restore the TLV if we ran some jobs while waiting
777            tlv::set(self.tlv);
778
779            unwind::resume_unwinding(*value);
780        }
781    }
782}
783
784impl<'scope> fmt::Debug for Scope<'scope> {
785    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
786        fmt.debug_struct("Scope")
787            .field("pool_id", &self.base.registry.id())
788            .field("panic", &self.base.panic)
789            .field("job_completed_latch", &self.base.job_completed_latch)
790            .finish()
791    }
792}
793
794impl<'scope> fmt::Debug for ScopeFifo<'scope> {
795    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
796        fmt.debug_struct("ScopeFifo")
797            .field("num_fifos", &self.fifos.len())
798            .field("pool_id", &self.base.registry.id())
799            .field("panic", &self.base.panic)
800            .field("job_completed_latch", &self.base.job_completed_latch)
801            .finish()
802    }
803}
804
805/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
806///
807/// Unsafe code is still required to dereference the pointer, but that's fine in
808/// scope jobs that are guaranteed to execute before the scope ends.
809struct ScopePtr<T>(*const T);
810
811// SAFETY: !Send for raw pointers is not for safety, just as a lint
812unsafe impl<T: Sync> Send for ScopePtr<T> {}
813
814// SAFETY: !Sync for raw pointers is not for safety, just as a lint
815unsafe impl<T: Sync> Sync for ScopePtr<T> {}
816
817impl<T> ScopePtr<T> {
818    // Helper to avoid disjoint captures of `scope_ptr.0`
819    unsafe fn as_ref(&self) -> &T {
820        unsafe { &*self.0 }
821    }
822}