-
Notifications
You must be signed in to change notification settings - Fork 392
[CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond, numBytesOutPerSecond metrics for RemoteShuffleServiceFactory #3272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
568839a
to
6022be3
Compare
...k/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
Outdated
Show resolved
Hide resolved
6022be3
to
80d956c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @SteNicholas
Thank you for your contribution! This PR looks good to me. Could you please clarify the related metric names and information in the documentation?
@codenohup, I will clarify the related metric names and information in the documentation in another pull request. |
...flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
Show resolved
Hide resolved
fbd0997
to
f920934
Compare
4c201e3
to
1541c89
Compare
…BytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics for RemoteShuffleServiceFactory
1541c89
to
1cfeedb
Compare
@reswqa, @codenohup, thanks for review. I have already added |
...n/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
Outdated
Show resolved
Hide resolved
…BytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics for RemoteShuffleServiceFactory
…BytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics for RemoteShuffleServiceFactory
...n/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution; this PR looks good to me.
It would be advisable to test this PR by running a Flink job and obtaining shuffle metrics through the Flink metric reporter. Then compare the metrics from Flink and Celeborn to ensure they are effective and reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks. Merged into main(v0.7.0).
…, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics ### What changes were proposed in this pull request? Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics. Follow up #3272. ### Why are the changes needed? `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved: ``` 2025-05-28 10:48:54,433 WARN [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60]), 2, Shuffle, Remote, 1] ``` ### Does this PR introduce _any_ user-facing change? Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle: - Variables: - Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`. - Metrics: Scope | Metrics | Description | Type -- | -- | -- | -- Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter | Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter | Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter | Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter | Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter | Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter | ### How was this patch tested? Manual test.   Closes #3296 from SteNicholas/CELEBORN-2005. Authored-by: SteNicholas <[email protected]> Signed-off-by: Weijie Guo <[email protected]>
…d, numBytesOutPerSecond metrics for RemoteShuffleServiceFactory ### What changes were proposed in this pull request? Introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for `RemoteShuffleServiceFactory`. Scope | Infix | Metrics | Description | Type -- | -- | -- | -- | -- Task | Shuffle.Remote.[ShuffleId] | numBytesIn | The total number of bytes this shuffle has read. | Counter | Task | Shuffle.Remote.[ShuffleId] | numBytesOut | The total number of bytes this shuffle has written. | Counter | Task | Shuffle.Remote.[ShuffleId] | numRecordsOut | The total number of records this shuffle has written. | Counter | Task | Shuffle.Remote.[ShuffleId] | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter | Task | Shuffle.Remote.[ShuffleId] | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter | Task | Shuffle.Remote.[ShuffleId] | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter | Note: - `numBytesIn` and `numBytesOut` metrics include the total number of bytes for records and events. - `numRecordsOut` metric only includes the total number of records, instead of records and events. ### Why are the changes needed? There is no any metrics related to shuffle read operations and operations writing shuffle data for flink shuffle. It's proposed to introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for `RemoteShuffleServiceFactory`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `RemoteShuffleOutputGateSuiteJ#testSimpleWriteData` - `RemoteShuffleResultPartitionSuiteJ` Closes #3272 from SteNicholas/CELEBORN-2005. Authored-by: SteNicholas <[email protected]> Signed-off-by: mingji <[email protected]>
…, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics ### What changes were proposed in this pull request? Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics. Follow up #3272. ### Why are the changes needed? `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved: ``` 2025-05-28 10:48:54,433 WARN [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60]), 2, Shuffle, Remote, 1] ``` ### Does this PR introduce _any_ user-facing change? Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle: - Variables: - Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`. - Metrics: Scope | Metrics | Description | Type -- | -- | -- | -- Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter | Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter | Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter | Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter | Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter | Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter | ### How was this patch tested? Manual test.   Closes #3296 from SteNicholas/CELEBORN-2005. Authored-by: SteNicholas <[email protected]> Signed-off-by: Weijie Guo <[email protected]>
What changes were proposed in this pull request?
Introduce
numBytesIn
,numBytesOut
,numRecordsOut
,numBytesInPerSecond
,numBytesOutPerSecond
,numRecordsOutPerSecond
metrics forRemoteShuffleServiceFactory
.Note:
numBytesIn
andnumBytesOut
metrics include the total number of bytes for records and events.numRecordsOut
metric only includes the total number of records, instead of records and events.Why are the changes needed?
There is no any metrics related to shuffle read operations and operations writing shuffle data for flink shuffle. It's proposed to introduce
numBytesIn
,numBytesOut
,numRecordsOut
,numBytesInPerSecond
,numBytesOutPerSecond
,numRecordsOutPerSecond
metrics forRemoteShuffleServiceFactory
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
RemoteShuffleOutputGateSuiteJ#testSimpleWriteData
RemoteShuffleResultPartitionSuiteJ