Skip to content

[CELEBORN-1903] Enable connection heartbeat and close idle connection #3148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Deprecate celeborn.worker.closeIdleConnections and celeborn.client.cl…
…oseIdleConnections
  • Loading branch information
Austinfjq committed May 28, 2025
commit 94b38f474ecdc65bc9a7c8505674f452a5990fd9
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public FlinkShuffleClientImpl(
Utils.fromCelebornConf(conf, module, conf.networkIoThreads(module));
this.context =
new TransportContext(
dataTransportConf, readClientHandler, conf.clientCloseIdleConnections());
dataTransportConf, readClientHandler, conf.closeIdleConnections(module));
this.setupLifecycleManagerRef(driverHost, port);
this.driverTimestamp = driverTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private void initDataClientFactoryIfNeeded() {
}
this.transportContext =
new TransportContext(
dataTransportConf, new BaseMessageHandler(), conf.clientCloseIdleConnections());
dataTransportConf, new BaseMessageHandler(), dataTransportConf.closeIdleConnections());
if (!authEnabled) {
logger.info("Initializing data client factory for {}.", appUniqueId);
dataClientFactory = transportContext.createClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ public TransportContext(
AbstractSource source) {
this.conf = conf;
this.msgHandler = msgHandler;
this.closeIdleConnections = closeIdleConnections || conf.closeIdleConnections();
this.closeIdleConnections = closeIdleConnections;
this.sslFactory = SSLFactory.createSslFactory(conf);
this.channelsLimiter = channelsLimiter;
// either explicitly enabled, or from the transport conf
this.enableHeartbeat = enableHeartbeat || conf.channelHeartbeatEnabled();
this.enableHeartbeat = enableHeartbeat;
this.source = source;

logger.info(
Expand Down Expand Up @@ -133,11 +132,11 @@ public TransportContext(

public TransportContext(
TransportConf conf, BaseMessageHandler msgHandler, boolean closeIdleConnections) {
this(conf, msgHandler, closeIdleConnections, null, false, null);
this(conf, msgHandler, closeIdleConnections, null, conf.channelHeartbeatEnabled(), null);
}

public TransportContext(TransportConf conf, BaseMessageHandler msgHandler) {
this(conf, msgHandler, false, false, null);
this(conf, msgHandler, conf.closeIdleConnections(), conf.channelHeartbeatEnabled(), null);
}

public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
def workerCloseIdleConnections: Boolean = get(WORKER_CLOSE_IDLE_CONNECTIONS)
def workerReplicateFastFailDuration: Long = get(WORKER_REPLICATE_FAST_FAIL_DURATION)
def workerReplicateRandomConnectionEnabled: Boolean =
get(WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED)
Expand Down Expand Up @@ -951,7 +950,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Client //
// //////////////////////////////////////////////////////
def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
Expand Down Expand Up @@ -1717,7 +1715,15 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.worker.fetch.heartbeat.enabled",
"0.6.0",
"Please use celeborn.<module>.heartbeat.enabled"))
"Please use celeborn.<module>.heartbeat.enabled"),
DeprecatedConfig(
"celeborn.worker.closeIdleConnections",
"0.6.0",
"Please use celeborn.<module>.closeIdleConnections"),
DeprecatedConfig(
"celeborn.client.closeIdleConnections",
"0.6.0",
"Please use celeborn.<module>.closeIdleConnections"))

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
Expand Down Expand Up @@ -3613,14 +3619,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")

val WORKER_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
buildConf("celeborn.worker.closeIdleConnections")
.categories("worker")
.doc("Whether worker will close idle connections.")
.version("0.2.0")
.booleanConf
.createWithDefault(false)

val WORKER_REPLICATE_FAST_FAIL_DURATION: ConfigEntry[Long] =
buildConf("celeborn.worker.replicate.fastFail.duration")
.categories("worker")
Expand Down Expand Up @@ -5325,14 +5323,6 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(10000)

val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
buildConf("celeborn.client.closeIdleConnections")
.categories("client")
.doc("Whether client will close idle connections.")
.version("0.3.0")
.booleanConf
.createWithDefault(true)

val CLIENT_PUSH_DYNAMIC_WRITE_MODE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.push.dynamicWriteMode.enabled")
.categories("client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The followings are best practices of naming configs for some common cases:
- celeborn.master.ha.enabled
- celeborn.master.ha.client.maxRetries
2. When adding a boolean config, the name should be a verb that describes what
happens if this config is set to true, e.g. `celeborn.worker.closeIdleConnections`.
happens if this config is set to true, e.g. `celeborn.push.closeIdleConnections`.
3. When adding a config to specify a time duration, it's better to put the time unit
in the config name. For example, `featureName.timeoutMs`, which clearly indicates
that the time unit is millisecond. The config entry should be created with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class NettyRpcEnv(
new TransportContext(
transportConf,
new NettyRpcHandler(dispatcher, this),
false,
false,
transportConf.closeIdleConnections(),
transportConf.channelHeartbeatEnabled(),
config.source.orNull)

private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ license: |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | |
| celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
| celeborn.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | |
| celeborn.worker.fetch.port | 0 | false | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | |
| celeborn.worker.flusher.buffer.size | 256k | false | Size of buffer used by a single flusher. | 0.2.0 | |
Expand Down Expand Up @@ -156,7 +156,7 @@ license: |
| celeborn.worker.partition.initial.readBuffersMin | 1 | false | Min number of initial read buffers | 0.3.0 | |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | false | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. If this value is set to 0, partition files sorter will skip memory check and ServingState check. | 0.2.0 | |
| celeborn.worker.pinnedMemoryRatioToResume | 0.3 | false | If pinned memory usage is less than this limit, worker will resume, only takes effect when celeborn.network.memory.allocator.pooled and celeborn.worker.monitor.pinnedMemory.check.enabled are enabled | 0.6.0 | |
| celeborn.worker.push.heartbeat.enabled | false | false | enable the heartbeat from worker to client when pushing data | 0.3.0 | |
| celeborn.push.heartbeat.enabled | false | false | enable the heartbeat from worker to client when pushing data | 0.3.0 | |
| celeborn.worker.push.io.threads | &lt;undefined&gt; | false | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 | |
| celeborn.worker.push.port | 0 | false | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | |
| celeborn.worker.readBuffer.allocationWait | 50ms | false | The time to wait when buffer dispatcher can not allocate a buffer. | 0.3.0 | |
Expand Down
6 changes: 3 additions & 3 deletions service/src/test/resources/celeborn-0.6.0-h2-ut-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ INSERT INTO `celeborn_cluster_system_config` ( `id`, `cluster_id`, `config_key`,
VALUES
( 1, 1, 'celeborn.client.push.buffer.initial.size', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 2, 1, 'celeborn.client.push.buffer.max.size', '1024000', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'celeborn.worker.fetch.heartbeat.enabled', 'true', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'celeborn.fetch.heartbeat.enabled', 'true', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 5, 1, 'celeborn.client.push.buffer.initial.size.only', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 6, 1, 'celeborn.test.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'celeborn.test.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
Expand All @@ -31,14 +31,14 @@ INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`,
VALUES
( 1, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 2, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'tenant_id', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'tenant_id', 'TENANT', '', 'celeborn.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 4, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 5, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 8, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 9, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 10, 1, 'tenant_id1', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 10, 1, 'tenant_id1', 'TENANT', '', 'celeborn.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 11, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 12, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 13, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
Expand Down
6 changes: 3 additions & 3 deletions service/src/test/resources/dynamicConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
config:
celeborn.client.push.buffer.initial.size: 100k
celeborn.client.push.buffer.max.size: 1000k
celeborn.worker.fetch.heartbeat.enabled: true
celeborn.fetch.heartbeat.enabled: true
celeborn.client.push.buffer.initial.size.only: 10k
celeborn.test.timeoutMs.only: 100s
celeborn.test.enabled.only: false
Expand All @@ -36,7 +36,7 @@
config:
celeborn.client.push.buffer.initial.size: 10k
celeborn.client.push.buffer.initial.size.only: 100k
celeborn.worker.fetch.heartbeat.enabled: false
celeborn.fetch.heartbeat.enabled: false
celeborn.test.tenant.timeoutMs.only: 100s
celeborn.test.tenant.enabled.only: false
celeborn.test.tenant.int.only: 10
Expand All @@ -46,7 +46,7 @@
config:
celeborn.client.push.buffer.initial.size: 10k
celeborn.client.push.buffer.initial.size.only: 100k
celeborn.worker.fetch.heartbeat.enabled: false
celeborn.fetch.heartbeat.enabled: false
celeborn.test.tenant.timeoutMs.only: 100s
celeborn.test.tenant.enabled.only: false
celeborn.test.tenant.int.only: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,18 @@ private[celeborn] class Worker(

val pushDataHandler = new PushDataHandler(workerSource)
private val (pushServerTransportContext, pushServer) = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE, numThreads)
val closeIdleConnections = transportConf.closeIdleConnections()
val pushServerLimiter = new ChannelsLimiter(TransportModuleConstants.PUSH_MODULE, conf)
val transportContext: TransportContext =
new TransportContext(
transportConf,
pushDataHandler,
closeIdleConnections,
pushServerLimiter,
conf.workerPushHeartbeatEnabled,
transportConf.channelHeartbeatEnabled(),
workerSource)
(
transportContext,
Expand All @@ -230,11 +230,11 @@ private[celeborn] class Worker(

val replicateHandler = new PushDataHandler(workerSource)
val (replicateTransportContext, replicateServer, replicateClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
conf.workerReplicateIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE, numThreads)
val closeIdleConnections = transportConf.closeIdleConnections()
val replicateLimiter = new ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE, conf)
val transportContext: TransportContext =
new TransportContext(
Expand All @@ -252,17 +252,17 @@ private[celeborn] class Worker(

var fetchHandler: FetchHandler = _
private val (fetchServerTransportContext, fetchServer) = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
val closeIdleConnections = transportConf.closeIdleConnections()
fetchHandler = new FetchHandler(conf, transportConf, workerSource)
val transportContext: TransportContext =
new TransportContext(
transportConf,
fetchHandler,
closeIdleConnections,
conf.workerFetchHeartbeatEnabled,
transportConf.channelHeartbeatEnabled(),
workerSource,
conf.metricsCollectCriticalEnabled)
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
def getTestHeartbeatFromWorker2ClientConf: (Map[String, String], CelebornConf) = {
val workerConf = Map(
"celeborn.push.heartbeat.interval" -> "4s",
"celeborn.worker.push.heartbeat.enabled" -> "true",
"celeborn.worker.fetch.heartbeat.enabled" -> "true",
"celeborn.push.heartbeat.enabled" -> "true",
"celeborn.fetch.heartbeat.enabled" -> "true",
"celeborn.fetch.heartbeat.interval" -> "4s")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")
Expand All @@ -87,8 +87,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
val workerConf = Map(
"celeborn.push.heartbeat.interval" -> "4s",
"celeborn.fetch.heartbeat.interval" -> "4s",
"celeborn.worker.push.heartbeat.enabled" -> "false",
"celeborn.worker.fetch.heartbeat.enabled" -> "false")
"celeborn.push.heartbeat.enabled" -> "false",
"celeborn.fetch.heartbeat.enabled" -> "false")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")
(workerConf, clientConf)
Expand Down Expand Up @@ -117,9 +117,10 @@ trait HeartbeatFeature extends MiniClusterFeature {
"celeborn.push.io.connectionTimeout" -> "3s",
"celeborn.push.heartbeat.interval" -> "4s",
"celeborn.fetch.heartbeat.interval" -> "4s",
"celeborn.worker.push.heartbeat.enabled" -> "true",
"celeborn.worker.fetch.heartbeat.enabled" -> "true",
CelebornConf.WORKER_CLOSE_IDLE_CONNECTIONS.key -> "true")
"celeborn.push.heartbeat.enabled" -> "true",
"celeborn.fetch.heartbeat.enabled" -> "true",
"celeborn.push.closeIdleConnections" -> "true",
"celeborn.fetch.closeIdleConnections" -> "true")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")
(workerConf, clientConf)
Expand Down
Loading