-
Notifications
You must be signed in to change notification settings - Fork 392
Set max memory of SortBasedPusher based off Spark configs #3203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
// set max task memory conf based on Spark conf | ||
if (celebornConf.clientPushSortMaxMemoryBytes() <= 0L) { | ||
double memoryStorageFraction = | ||
sparkConf.getDouble(package$.MODULE$.MEMORY_FRACTION().key(), 0.6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For spark-2.4 UT
Error: /home/runner/work/celeborn/celeborn/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:553:48: error: cannot find symbol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can use the string value directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the values of maxOffHeapMemory/maxHeapMemory from the MemoryManager through reflection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the feedback, I will try using reflection and re-submit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 5 out of 6 changed files in this pull request and generated no comments.
Files not reviewed (1)
- common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala: Language not supported
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
This issue was closed because it has been staled for 10 days with no activity. |
gentle ping @helenweng-stripe Any update? |
5590ef0
to
0dffcf6
Compare
What changes were proposed in this pull request?
Set the max memory threshold to actual memory allocated to the task. This is reverse-calculated from how Spark determines it since TaskMemoryManager in Spark does not expose how much memory is available to a task.
Calculation is based on whether mode is onheap or offheap:
spark.memory.offHeap.size
orspark.executor.memory
) - reservedMemory (hardcoded to )) *spark.memory.fraction
*celeborn.client.spark.push.sort.memory.maxMemoryFactor
/spark.executor.cores
Based on calculations here: https://github.com/apache/spark/blob/branch-3.3/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L213-L235
MaxMemory can be set statically with new setting
celeborn.client.spark.push.sort.memory.maxMemoryBytes
. It is only dynamically calculated withceleborn.client.spark.push.sort.memory.calculateMaxMemoryBytes
set to true (default false).Better solution would probably be to expose
getMaxMemory
on the spark side, however it is currently not available.Why are the changes needed?
Currently maxMemory is set to
Runtime.getRuntime().maxMemory() * maxMemoryFactor
whereRuntime.getRuntime().maxMemory()
equals the amount of memory available to the entire app. Thus for some large tasks, SortBasedPusher withceleborn.client.spark.push.sort.memory.useAdaptiveThreshold
enabled will always OOM.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
We also are running with this in prod.