datafusion_functions_window/
rank.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of `rank`, `dense_rank`, and `percent_rank` window functions,
19//! which can be evaluated at runtime during query execution.
20
21use crate::define_udwf_and_expr;
22use arrow::datatypes::FieldRef;
23use datafusion_common::arrow::array::ArrayRef;
24use datafusion_common::arrow::array::{Float64Array, UInt64Array};
25use datafusion_common::arrow::compute::SortOptions;
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::arrow::datatypes::Field;
28use datafusion_common::utils::get_row_at_idx;
29use datafusion_common::{exec_err, Result, ScalarValue};
30use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
31use datafusion_expr::{
32    Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
33};
34use datafusion_functions_window_common::field;
35use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
36use field::WindowUDFFieldArgs;
37use std::any::Any;
38use std::fmt::Debug;
39use std::iter;
40use std::ops::Range;
41use std::sync::{Arc, LazyLock};
42
43define_udwf_and_expr!(
44    Rank,
45    rank,
46    "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
47    Rank::basic
48);
49
50define_udwf_and_expr!(
51    DenseRank,
52    dense_rank,
53    "Returns rank of the current row without gaps. This function counts peer groups",
54    Rank::dense_rank
55);
56
57define_udwf_and_expr!(
58    PercentRank,
59    percent_rank,
60    "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
61    Rank::percent_rank
62);
63
64/// Rank calculates the rank in the window function with order by
65#[derive(Debug)]
66pub struct Rank {
67    name: String,
68    signature: Signature,
69    rank_type: RankType,
70}
71
72impl Rank {
73    /// Create a new `rank` function with the specified name and rank type
74    pub fn new(name: String, rank_type: RankType) -> Self {
75        Self {
76            name,
77            signature: Signature::nullary(Volatility::Immutable),
78            rank_type,
79        }
80    }
81
82    /// Create a `rank` window function
83    pub fn basic() -> Self {
84        Rank::new("rank".to_string(), RankType::Basic)
85    }
86
87    /// Create a `dense_rank` window function
88    pub fn dense_rank() -> Self {
89        Rank::new("dense_rank".to_string(), RankType::Dense)
90    }
91
92    /// Create a `percent_rank` window function
93    pub fn percent_rank() -> Self {
94        Rank::new("percent_rank".to_string(), RankType::Percent)
95    }
96}
97
98#[derive(Debug, Copy, Clone)]
99pub enum RankType {
100    Basic,
101    Dense,
102    Percent,
103}
104
105static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
106    Documentation::builder(
107        DOC_SECTION_RANKING,
108            "Returns the rank of the current row within its partition, allowing \
109            gaps between ranks. This function provides a ranking similar to `row_number`, but \
110            skips ranks for identical values.",
111
112        "rank()")
113        .with_sql_example(r#"```sql
114    --Example usage of the rank window function:
115    SELECT department,
116           salary,
117           rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
118    FROM employees;
119```
120
121```sql
122+-------------+--------+------+
123| department  | salary | rank |
124+-------------+--------+------+
125| Sales       | 70000  | 1    |
126| Sales       | 50000  | 2    |
127| Sales       | 50000  | 2    |
128| Sales       | 30000  | 4    |
129| Engineering | 90000  | 1    |
130| Engineering | 80000  | 2    |
131+-------------+--------+------+
132```"#)
133        .build()
134});
135
136fn get_rank_doc() -> &'static Documentation {
137    &RANK_DOCUMENTATION
138}
139
140static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
141    Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
142            rows in a dense manner, meaning consecutive ranks are assigned even for identical \
143            values.", "dense_rank()")
144        .with_sql_example(r#"```sql
145    --Example usage of the dense_rank window function:
146    SELECT department,
147           salary,
148           dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
149    FROM employees;
150```
151
152```sql
153+-------------+--------+------------+
154| department  | salary | dense_rank |
155+-------------+--------+------------+
156| Sales       | 70000  | 1          |
157| Sales       | 50000  | 2          |
158| Sales       | 50000  | 2          |
159| Sales       | 30000  | 3          |
160| Engineering | 90000  | 1          |
161| Engineering | 80000  | 2          |
162+-------------+--------+------------+
163```"#)
164        .build()
165});
166
167fn get_dense_rank_doc() -> &'static Documentation {
168    &DENSE_RANK_DOCUMENTATION
169}
170
171static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
172    Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
173            The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
174        .with_sql_example(r#"```sql
175    --Example usage of the percent_rank window function:
176    SELECT employee_id,
177           salary,
178           percent_rank() OVER (ORDER BY salary) AS percent_rank
179    FROM employees;
180```
181
182```sql
183+-------------+--------+---------------+
184| employee_id | salary | percent_rank  |
185+-------------+--------+---------------+
186| 1           | 30000  | 0.00          |
187| 2           | 50000  | 0.50          |
188| 3           | 70000  | 1.00          |
189+-------------+--------+---------------+
190```"#)
191        .build()
192});
193
194fn get_percent_rank_doc() -> &'static Documentation {
195    &PERCENT_RANK_DOCUMENTATION
196}
197
198impl WindowUDFImpl for Rank {
199    fn as_any(&self) -> &dyn Any {
200        self
201    }
202
203    fn name(&self) -> &str {
204        &self.name
205    }
206
207    fn signature(&self) -> &Signature {
208        &self.signature
209    }
210
211    fn partition_evaluator(
212        &self,
213        _partition_evaluator_args: PartitionEvaluatorArgs,
214    ) -> Result<Box<dyn PartitionEvaluator>> {
215        Ok(Box::new(RankEvaluator {
216            state: RankState::default(),
217            rank_type: self.rank_type,
218        }))
219    }
220
221    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
222        let return_type = match self.rank_type {
223            RankType::Basic | RankType::Dense => DataType::UInt64,
224            RankType::Percent => DataType::Float64,
225        };
226
227        let nullable = false;
228        Ok(Field::new(field_args.name(), return_type, nullable).into())
229    }
230
231    fn sort_options(&self) -> Option<SortOptions> {
232        Some(SortOptions {
233            descending: false,
234            nulls_first: false,
235        })
236    }
237
238    fn documentation(&self) -> Option<&Documentation> {
239        match self.rank_type {
240            RankType::Basic => Some(get_rank_doc()),
241            RankType::Dense => Some(get_dense_rank_doc()),
242            RankType::Percent => Some(get_percent_rank_doc()),
243        }
244    }
245}
246
247/// State for the RANK(rank) built-in window function.
248#[derive(Debug, Clone, Default)]
249pub struct RankState {
250    /// The last values for rank as these values change, we increase n_rank
251    pub last_rank_data: Option<Vec<ScalarValue>>,
252    /// The index where last_rank_boundary is started
253    pub last_rank_boundary: usize,
254    /// Keep the number of entries in current rank
255    pub current_group_count: usize,
256    /// Rank number kept from the start
257    pub n_rank: usize,
258}
259
260/// State for the `rank` built-in window function.
261#[derive(Debug)]
262struct RankEvaluator {
263    state: RankState,
264    rank_type: RankType,
265}
266
267impl PartitionEvaluator for RankEvaluator {
268    fn is_causal(&self) -> bool {
269        matches!(self.rank_type, RankType::Basic | RankType::Dense)
270    }
271
272    fn evaluate(
273        &mut self,
274        values: &[ArrayRef],
275        range: &Range<usize>,
276    ) -> Result<ScalarValue> {
277        let row_idx = range.start;
278        // There is no argument, values are order by column values (where rank is calculated)
279        let range_columns = values;
280        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
281        let new_rank_encountered =
282            if let Some(state_last_rank_data) = &self.state.last_rank_data {
283                // if rank data changes, new rank is encountered
284                state_last_rank_data != &last_rank_data
285            } else {
286                // First rank seen
287                true
288            };
289        if new_rank_encountered {
290            self.state.last_rank_data = Some(last_rank_data);
291            self.state.last_rank_boundary += self.state.current_group_count;
292            self.state.current_group_count = 1;
293            self.state.n_rank += 1;
294        } else {
295            // data is still in the same rank
296            self.state.current_group_count += 1;
297        }
298
299        match self.rank_type {
300            RankType::Basic => Ok(ScalarValue::UInt64(Some(
301                self.state.last_rank_boundary as u64 + 1,
302            ))),
303            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
304            RankType::Percent => {
305                exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
306            }
307        }
308    }
309
310    fn evaluate_all_with_rank(
311        &self,
312        num_rows: usize,
313        ranks_in_partition: &[Range<usize>],
314    ) -> Result<ArrayRef> {
315        let result: ArrayRef = match self.rank_type {
316            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
317                ranks_in_partition
318                    .iter()
319                    .scan(1_u64, |acc, range| {
320                        let len = range.end - range.start;
321                        let result = iter::repeat_n(*acc, len);
322                        *acc += len as u64;
323                        Some(result)
324                    })
325                    .flatten(),
326            )),
327
328            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
329                ranks_in_partition
330                    .iter()
331                    .zip(1u64..)
332                    .flat_map(|(range, rank)| {
333                        let len = range.end - range.start;
334                        iter::repeat_n(rank, len)
335                    }),
336            )),
337
338            RankType::Percent => {
339                let denominator = num_rows as f64;
340
341                Arc::new(Float64Array::from_iter_values(
342                    ranks_in_partition
343                        .iter()
344                        .scan(0_u64, |acc, range| {
345                            let len = range.end - range.start;
346                            let value = (*acc as f64) / (denominator - 1.0).max(1.0);
347                            let result = iter::repeat_n(value, len);
348                            *acc += len as u64;
349                            Some(result)
350                        })
351                        .flatten(),
352                ))
353            }
354        };
355
356        Ok(result)
357    }
358
359    fn supports_bounded_execution(&self) -> bool {
360        matches!(self.rank_type, RankType::Basic | RankType::Dense)
361    }
362
363    fn include_rank(&self) -> bool {
364        true
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use datafusion_common::cast::{as_float64_array, as_uint64_array};
372
373    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
374        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
375    }
376
377    #[allow(clippy::single_range_in_vec_init)]
378    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
379        test_i32_result(expr, vec![0..8], expected)
380    }
381
382    fn test_i32_result(
383        expr: &Rank,
384        ranks: Vec<Range<usize>>,
385        expected: Vec<u64>,
386    ) -> Result<()> {
387        let args = PartitionEvaluatorArgs::default();
388        let result = expr
389            .partition_evaluator(args)?
390            .evaluate_all_with_rank(8, &ranks)?;
391        let result = as_uint64_array(&result)?;
392        let result = result.values();
393        assert_eq!(expected, *result);
394        Ok(())
395    }
396
397    fn test_f64_result(
398        expr: &Rank,
399        num_rows: usize,
400        ranks: Vec<Range<usize>>,
401        expected: Vec<f64>,
402    ) -> Result<()> {
403        let args = PartitionEvaluatorArgs::default();
404        let result = expr
405            .partition_evaluator(args)?
406            .evaluate_all_with_rank(num_rows, &ranks)?;
407        let result = as_float64_array(&result)?;
408        let result = result.values();
409        assert_eq!(expected, *result);
410        Ok(())
411    }
412
413    #[test]
414    fn test_rank() -> Result<()> {
415        let r = Rank::basic();
416        test_without_rank(&r, vec![1; 8])?;
417        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
418        Ok(())
419    }
420
421    #[test]
422    fn test_dense_rank() -> Result<()> {
423        let r = Rank::dense_rank();
424        test_without_rank(&r, vec![1; 8])?;
425        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
426        Ok(())
427    }
428
429    #[test]
430    #[allow(clippy::single_range_in_vec_init)]
431    fn test_percent_rank() -> Result<()> {
432        let r = Rank::percent_rank();
433
434        // empty case
435        let expected = vec![0.0; 0];
436        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
437
438        // singleton case
439        let expected = vec![0.0];
440        test_f64_result(&r, 1, vec![0..1], expected)?;
441
442        // uniform case
443        let expected = vec![0.0; 7];
444        test_f64_result(&r, 7, vec![0..7], expected)?;
445
446        // non-trivial case
447        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
448        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
449
450        Ok(())
451    }
452}