-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Support multi-turn evaluation in mlflow.genai.evaluate for DataFrame and list input #18971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multi-turn evaluation in mlflow.genai.evaluate for DataFrame and list input #18971
Conversation
This PR implements multi-turn evaluation capability for mlflow.genai.evaluate, enabling evaluation of entire conversation sessions grouped by session_id. Key changes: 1. Environment Variable (mlflow/environment_variables.py): - Added MLFLOW_ENABLE_MULTI_TURN_EVALUATION flag (default: False) - Feature-gated for safe rollout and testing 2. Validation Logic (mlflow/genai/evaluation/utils.py): - Added _validate_multi_turn_input() to validate multi-turn configuration - Checks: feature flag enabled, no predict_fn, DataFrame input required - Added FEATURE_DISABLED import for proper error handling 3. Multi-Turn Evaluation (mlflow/genai/evaluation/harness.py): - Added _evaluate_multi_turn_scorers() to evaluate session groups - Modified run() to classify scorers and handle multi-turn evaluation - Groups traces by session_id, evaluates on session groups - Logs assessments to chronologically first trace of each session - Adds session_id to assessment metadata 4. Integration (mlflow/genai/evaluation/base.py): - Added validation call in evaluate() function - Imports _validate_multi_turn_input 5. Tests (tests/genai/evaluate/test_utils.py): - Added 6 comprehensive validation tests - Tests feature flag, predict_fn rejection, DataFrame requirement - Tests mixed single-turn and multi-turn scorers Implementation follows the multi-turn evaluation plan (PR mlflow#3 + PR mlflow#4 combined). All tests passing (60 passed, 3 skipped). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> Signed-off-by: Avesh Singh <[email protected]>
|
@AveshCSingh Thank you for the contribution! Could you fix the following issue(s)? ⚠ DCO checkThe DCO check failed. Please sign off your commit(s) by following the instructions here. See https://github.com/mlflow/mlflow/blob/master/CONTRIBUTING.md#sign-your-work for more details. |
|
Documentation preview for 6051ef5 is available at:
More info
|
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
This commit addresses several TODO items to improve the multi-turn evaluation implementation: - Remove leading underscores from exported utility functions - Renamed _classify_scorers -> classify_scorers - Renamed _group_traces_by_session -> group_traces_by_session - Renamed _get_first_trace_in_session -> get_first_trace_in_session - Optimize trace retrieval by avoiding redundant get_trace call - Find matching eval_result from existing list instead of fetching trace - Replace hardcoded "session_id" string with TraceMetadataKey.TRACE_SESSION constant - Improves maintainability and consistency with other metadata keys - Rename validation function for clarity - Renamed _validate_multi_turn_input -> _validate_session_level_input - Updated terminology from "multi_turn" to "session_level" for consistency - Remove unused data parameter from validation function - Simplified function signature by removing parameter that was never used All tests pass successfully after these changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> Signed-off-by: Avesh Singh <[email protected]>
f72d951 to
3f6d6e3
Compare
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
40d9dca to
70c5058
Compare
Signed-off-by: Avesh Singh <[email protected]>
0a10fa2 to
dbffe7e
Compare
| def classify_scorers(scorers: list[Scorer]) -> tuple[list[Scorer], list[Scorer]]: | ||
| """ | ||
| Separate scorers into single-turn and multi-turn categories. | ||
| Args: | ||
| scorers: List of scorer instances. | ||
| Returns: | ||
| tuple: (single_turn_scorers, multi_turn_scorers) | ||
| """ | ||
| single_turn_scorers = [] | ||
| multi_turn_scorers = [] | ||
|
|
||
| for scorer in scorers: | ||
| if scorer.is_session_level_scorer: | ||
| multi_turn_scorers.append(scorer) | ||
| else: | ||
| single_turn_scorers.append(scorer) | ||
|
|
||
| return single_turn_scorers, multi_turn_scorers | ||
|
|
||
|
|
||
| def group_traces_by_session(eval_items: list["EvalItem"]) -> dict[str, list["EvalItem"]]: | ||
| """ | ||
| Group evaluation items containing traces by session_id. | ||
| Args: | ||
| eval_items: List of EvalItem objects. | ||
| Returns: | ||
| dict: {session_id: [eval_item, ...]} where eval items are grouped by session. | ||
| Only items with traces that have a session_id are included in the output. | ||
| """ | ||
| session_groups = defaultdict(list) | ||
|
|
||
| for idx, item in enumerate(eval_items): | ||
| if not hasattr(item, "trace") or item.trace is None: | ||
| continue | ||
|
|
||
| trace_metadata = item.trace.info.trace_metadata | ||
|
|
||
| if session_id := trace_metadata.get(TraceMetadataKey.TRACE_SESSION): | ||
| session_groups[session_id].append(item) | ||
|
|
||
| return dict(session_groups) | ||
|
|
||
|
|
||
| def get_first_trace_in_session(session_items: list["EvalItem"]) -> "EvalItem": | ||
| """ | ||
| Find the chronologically first trace in a session based on request_time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 3 methods were copied over from utils.py.
| # Copy trace metadata | ||
| if trace_metadata := info.get("trace_metadata"): | ||
| trace.info.trace_metadata.update(trace_metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We previously did not copy trace metadata when copying traces into EvalItems.
| class _MultiTurnTestScorer: | ||
| """Helper class for testing multi-turn scorers.""" | ||
|
|
||
| def __init__(self, name="test_multi_turn_scorer"): | ||
| self.name = name | ||
| self.is_session_level_scorer = True | ||
| self.aggregations = [] | ||
|
|
||
| def run(self, session=None, **kwargs): | ||
| return True | ||
|
|
||
| def __call__(self, traces=None, **kwargs): | ||
| return 1.0 | ||
|
|
||
|
|
||
| # ==================== Tests for classify_scorers ==================== | ||
|
|
||
|
|
||
| def test_classify_scorers_all_single_turn(): | ||
| """Test that all scorers are classified as single-turn when none are multi-turn.""" | ||
|
|
||
| @scorer | ||
| def custom_scorer1(outputs): | ||
| return 1.0 | ||
|
|
||
| @scorer | ||
| def custom_scorer2(outputs): | ||
| return 2.0 | ||
|
|
||
| scorers_list = [custom_scorer1, custom_scorer2] | ||
| single_turn, multi_turn = classify_scorers(scorers_list) | ||
|
|
||
| assert len(single_turn) == 2 | ||
| assert len(multi_turn) == 0 | ||
| assert single_turn == scorers_list | ||
|
|
||
|
|
||
| def test_classify_scorers_all_multi_turn(): | ||
| """Test that all scorers are classified as multi-turn. | ||
| When all scorers have is_session_level_scorer=True. | ||
| """ | ||
| multi_turn_scorer1 = _MultiTurnTestScorer(name="multi_turn_scorer1") | ||
| multi_turn_scorer2 = _MultiTurnTestScorer(name="multi_turn_scorer2") | ||
|
|
||
| scorers_list = [multi_turn_scorer1, multi_turn_scorer2] | ||
| single_turn, multi_turn = classify_scorers(scorers_list) | ||
|
|
||
| assert len(single_turn) == 0 | ||
| assert len(multi_turn) == 2 | ||
| assert multi_turn == scorers_list | ||
| # Verify they are actually multi-turn | ||
| assert multi_turn_scorer1.is_session_level_scorer is True | ||
| assert multi_turn_scorer2.is_session_level_scorer is True | ||
|
|
||
|
|
||
| def test_classify_scorers_mixed(): | ||
| """Test classification of mixed single-turn and multi-turn scorers.""" | ||
|
|
||
| @scorer | ||
| def single_turn_scorer(outputs): | ||
| return 1.0 | ||
|
|
||
| multi_turn_scorer = _MultiTurnTestScorer(name="multi_turn_scorer") | ||
|
|
||
| scorers_list = [single_turn_scorer, multi_turn_scorer] | ||
| single_turn, multi_turn = classify_scorers(scorers_list) | ||
|
|
||
| assert len(single_turn) == 1 | ||
| assert len(multi_turn) == 1 | ||
| assert single_turn[0] == single_turn_scorer | ||
| assert multi_turn[0] == multi_turn_scorer | ||
| # Verify properties | ||
| assert single_turn_scorer.is_session_level_scorer is False | ||
| assert multi_turn_scorer.is_session_level_scorer is True | ||
|
|
||
|
|
||
| def test_classify_scorers_empty_list(): | ||
| """Test classification of an empty list of scorers.""" | ||
| single_turn, multi_turn = classify_scorers([]) | ||
|
|
||
| assert len(single_turn) == 0 | ||
| assert len(multi_turn) == 0 | ||
|
|
||
|
|
||
| # ==================== Tests for group_traces_by_session ==================== | ||
|
|
||
|
|
||
| def _create_mock_trace(trace_id: str, session_id: str | None, request_time: int): | ||
| """Helper to create a mock trace with session_id and request_time.""" | ||
| trace_metadata = {} | ||
| if session_id is not None: | ||
| trace_metadata[TraceMetadataKey.TRACE_SESSION] = session_id | ||
|
|
||
| trace_info = TraceInfo( | ||
| trace_id=trace_id, | ||
| trace_location=TraceLocation.from_experiment_id("0"), | ||
| request_time=request_time, | ||
| execution_duration=1000, | ||
| state=TraceState.OK, | ||
| trace_metadata=trace_metadata, | ||
| tags={}, | ||
| ) | ||
|
|
||
| trace = Mock(spec=Trace) | ||
| trace.info = trace_info | ||
| trace.data = TraceData(spans=[]) | ||
| return trace | ||
|
|
||
|
|
||
| def _create_mock_eval_item(trace): | ||
| """Helper to create a mock EvalItem with a trace.""" | ||
| eval_item = Mock(spec=EvalItem) | ||
| eval_item.trace = trace | ||
| return eval_item | ||
|
|
||
|
|
||
| def test_group_traces_by_session_single_session(): | ||
| """Test grouping traces that all belong to a single session.""" | ||
| trace1 = _create_mock_trace("trace-1", "session-1", 1000) | ||
| trace2 = _create_mock_trace("trace-2", "session-1", 2000) | ||
| trace3 = _create_mock_trace("trace-3", "session-1", 3000) | ||
|
|
||
| eval_item1 = _create_mock_eval_item(trace1) | ||
| eval_item2 = _create_mock_eval_item(trace2) | ||
| eval_item3 = _create_mock_eval_item(trace3) | ||
|
|
||
| eval_items = [eval_item1, eval_item2, eval_item3] | ||
| session_groups = group_traces_by_session(eval_items) | ||
|
|
||
| assert len(session_groups) == 1 | ||
| assert "session-1" in session_groups | ||
| assert len(session_groups["session-1"]) == 3 | ||
|
|
||
| # Check that all traces are included | ||
| session_traces = [item.trace for item in session_groups["session-1"]] | ||
| assert trace1 in session_traces | ||
| assert trace2 in session_traces | ||
| assert trace3 in session_traces | ||
|
|
||
|
|
||
| def test_group_traces_by_session_multiple_sessions(): | ||
| """Test grouping traces that belong to different sessions.""" | ||
| trace1 = _create_mock_trace("trace-1", "session-1", 1000) | ||
| trace2 = _create_mock_trace("trace-2", "session-1", 2000) | ||
| trace3 = _create_mock_trace("trace-3", "session-2", 1500) | ||
| trace4 = _create_mock_trace("trace-4", "session-2", 2500) | ||
|
|
||
| eval_items = [ | ||
| _create_mock_eval_item(trace1), | ||
| _create_mock_eval_item(trace2), | ||
| _create_mock_eval_item(trace3), | ||
| _create_mock_eval_item(trace4), | ||
| ] | ||
|
|
||
| session_groups = group_traces_by_session(eval_items) | ||
|
|
||
| assert len(session_groups) == 2 | ||
| assert "session-1" in session_groups | ||
| assert "session-2" in session_groups | ||
| assert len(session_groups["session-1"]) == 2 | ||
| assert len(session_groups["session-2"]) == 2 | ||
|
|
||
|
|
||
| def test_group_traces_by_session_excludes_no_session_id(): | ||
| """Test that traces without session_id are excluded from grouping.""" | ||
| trace1 = _create_mock_trace("trace-1", "session-1", 1000) | ||
| trace2 = _create_mock_trace("trace-2", None, 2000) # No session_id | ||
| trace3 = _create_mock_trace("trace-3", "session-1", 3000) | ||
|
|
||
| eval_items = [ | ||
| _create_mock_eval_item(trace1), | ||
| _create_mock_eval_item(trace2), | ||
| _create_mock_eval_item(trace3), | ||
| ] | ||
|
|
||
| session_groups = group_traces_by_session(eval_items) | ||
|
|
||
| assert len(session_groups) == 1 | ||
| assert "session-1" in session_groups | ||
| assert len(session_groups["session-1"]) == 2 | ||
| # trace2 should not be included | ||
| session_traces = [item.trace for item in session_groups["session-1"]] | ||
| assert trace1 in session_traces | ||
| assert trace2 not in session_traces | ||
| assert trace3 in session_traces | ||
|
|
||
|
|
||
| def test_group_traces_by_session_excludes_none_traces(): | ||
| """Test that eval items without traces are excluded from grouping.""" | ||
| trace1 = _create_mock_trace("trace-1", "session-1", 1000) | ||
|
|
||
| eval_item1 = _create_mock_eval_item(trace1) | ||
| eval_item2 = Mock() | ||
| eval_item2.trace = None # No trace | ||
|
|
||
| eval_items = [eval_item1, eval_item2] | ||
| session_groups = group_traces_by_session(eval_items) | ||
|
|
||
| assert len(session_groups) == 1 | ||
| assert "session-1" in session_groups | ||
| assert len(session_groups["session-1"]) == 1 | ||
|
|
||
|
|
||
| def test_group_traces_by_session_empty_list(): | ||
| """Test grouping an empty list of eval items.""" | ||
| session_groups = group_traces_by_session([]) | ||
|
|
||
| assert len(session_groups) == 0 | ||
| assert session_groups == {} | ||
|
|
||
|
|
||
| # ==================== Tests for get_first_trace_in_session ==================== | ||
|
|
||
|
|
||
| def test_get_first_trace_in_session_chronological_order(): | ||
| """Test that the first trace is correctly identified by request_time.""" | ||
| trace1 = _create_mock_trace("trace-1", "session-1", 3000) | ||
| trace2 = _create_mock_trace("trace-2", "session-1", 1000) # Earliest | ||
| trace3 = _create_mock_trace("trace-3", "session-1", 2000) | ||
|
|
||
| eval_item1 = _create_mock_eval_item(trace1) | ||
| eval_item2 = _create_mock_eval_item(trace2) | ||
| eval_item3 = _create_mock_eval_item(trace3) | ||
|
|
||
| session_items = [eval_item1, eval_item2, eval_item3] | ||
|
|
||
| first_item = get_first_trace_in_session(session_items) | ||
|
|
||
| assert first_item.trace == trace2 | ||
| assert first_item == eval_item2 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests as well as the two below (test_get_first_trace_in_session_single_trace and test_get_first_trace_in_session_same_timestamp) were moved from test_utils.py.
|
|
||
| multi_turn_assessments = {} | ||
|
|
||
| for session_id, session_items in session_groups.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel like we can also run this in parallel? Is this planned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious whether it's possible to run single-turn and multi-turn in parallel besides running each the multi-turn scorer in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should be able to run multi-turn scorers within the same threadpool in harness.py as single-turn scorers are executed in. This will also fix the progress bar
Following the example of single-turn scorers, we can also parallelize the evaluation of multi-turn scorers on each session.
I'm planning on implementing this in a follow-up PR, since this one is already growing quite large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements multi-turn evaluation support for mlflow.genai.evaluate, enabling scorers to evaluate entire conversation sessions. The implementation focuses on DataFrame and list input evaluation, with dataset support and predict_fn integration planned for future releases.
Key Changes:
- Added session-level scorer classification and evaluation logic to group traces by session_id
- Introduced trace metadata copying functionality to preserve session information
- Created a feature flag (
MLFLOW_ENABLE_MULTI_TURN_EVALUATION) to gate the new functionality
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
tests/tracing/utils/test_copy.py |
New test file covering trace copying with metadata preservation |
tests/genai/evaluate/test_session_utils.py |
New comprehensive test suite for session-level evaluation utilities |
tests/genai/evaluate/test_utils.py |
Refactored to move multi-turn tests to dedicated session_utils test file |
mlflow/tracing/utils/copy.py |
Enhanced to copy trace metadata in addition to tags |
mlflow/genai/scorers/base.py |
Extended run() method signature to accept session parameter |
mlflow/genai/evaluation/session_utils.py |
New module containing all session-level evaluation logic and validation |
mlflow/genai/evaluation/utils.py |
Removed multi-turn helper functions (relocated to session_utils) |
mlflow/genai/evaluation/harness.py |
Integrated multi-turn scorer evaluation into the main evaluation flow |
mlflow/genai/evaluation/base.py |
Added validation for session-level evaluation inputs |
mlflow/environment_variables.py |
Added new feature flag for multi-turn evaluation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
smoorjani
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM! Conditionally stamping to unblock, just assuming all comments are addressed.
mlflow/genai/evaluation/utils.py
Outdated
| ) | ||
|
|
||
| # Check if data is a DataFrame-like object (has 'columns' attribute) | ||
| if not hasattr(data, "columns"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also raise when we receive list[dict] type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this code, since we'll be supporting other evaluation types in a follow-up PR.
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
Signed-off-by: Avesh Singh <[email protected]>
mlflow/genai/evaluation/harness.py
Outdated
| if eval_result.eval_item.trace is not None: | ||
| trace_id = eval_result.eval_item.trace.info.trace_id | ||
| try: | ||
| eval_result.eval_item.trace = mlflow.get_trace(trace_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This becomes slow once the eval dataset is big
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed with parallelization. We're replacing another get_trace call with this so it should ideally be net-zero.
serena-ruan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM! Left two comments about concerns on performance
Signed-off-by: Samraj Moorjani <[email protected]>
Signed-off-by: Samraj Moorjani <[email protected]>
🛠 DevTools 🛠
Install mlflow from this PR
For Databricks, use the following command:
What changes are proposed in this pull request?
This PR implements multi-turn evaluation for
mlflow.genai.evaluate, enabling scorers to evaluate entire conversation sessions. The support is limited to:(1) Evaluation on a DataFrame or list of traces. Dataset support will be implemented in a follow-up PR
(2) Evaluation using a static "answer sheet". Multi-turn evaluation with
predict_fnwill be implemented in a future milestone.How is this PR tested?
The below manual validations can be re-run. You must setup the following:
Manual validation: make_judge with real OpenAI LLM
Does this PR require documentation update?
We'll add documentation in a follow-up.
Release Notes
Is this a user-facing change?
Added multi-turn evaluation support to
mlflow.genai.evaluate, enabling scorers to evaluate entire conversation sessions.What component(s), interfaces, languages, and integrations does this PR affect?
Components
area/tracking: Tracking Service, tracking client APIs, autologgingarea/models: MLmodel format, model serialization/deserialization, flavorsarea/model-registry: Model Registry service, APIs, and the fluent client calls for Model Registryarea/scoring: MLflow Model server, model deployment tools, Spark UDFsarea/evaluation: MLflow model evaluation features, evaluation metrics, and evaluation workflowsarea/gateway: MLflow AI Gateway client APIs, server, and third-party integrationsarea/prompts: MLflow prompt engineering features, prompt templates, and prompt managementarea/tracing: MLflow Tracing features, tracing APIs, and LLM tracing functionalityarea/projects: MLproject format, project running backendsarea/uiux: Front-end, user experience, plotting, JavaScript, JavaScript dev serverarea/build: Build and test infrastructure for MLflowarea/docs: MLflow documentation pagesHow should the PR be classified in the release notes? Choose one:
rn/none- No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" sectionrn/breaking-change- The PR will be mentioned in the "Breaking Changes" sectionrn/feature- A new user-facing feature worth mentioning in the release notesrn/bug-fix- A user-facing bug fix worth mentioning in the release notesrn/documentation- A user-facing documentation change worth mentioning in the release notesShould this PR be included in the next patch release?
Yesshould be selected for bug fixes, documentation updates, and other small changes.Noshould be selected for new features and larger changes. If you're unsure about the release classification of this PR, leave this unchecked to let the maintainers decide.What is a minor/patch release?
Bug fixes, doc updates and new features usually go into minor releases.
Bug fixes and doc updates usually go into patch releases.