@@ -270,21 +270,23 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
270
270
val partitionType = lifecycleManager.getPartitionType(shuffleId)
271
271
commitHandlers.computeIfAbsent(
272
272
partitionType,
273
- {
274
- case PartitionType .REDUCE => new ReducePartitionCommitHandler (
275
- appUniqueId,
276
- conf,
277
- lifecycleManager.shuffleAllocatedWorkers,
278
- committedPartitionInfo,
279
- lifecycleManager.workerStatusTracker)
280
- case PartitionType .MAP => new MapPartitionCommitHandler (
281
- appUniqueId,
282
- conf,
283
- lifecycleManager.shuffleAllocatedWorkers,
284
- committedPartitionInfo,
285
- lifecycleManager.workerStatusTracker)
286
- case partitionType => throw new UnsupportedOperationException (
287
- s " Unexpected ShufflePartitionType for CommitManager: $partitionType" )
273
+ (partitionType : PartitionType ) => {
274
+ partitionType match {
275
+ case PartitionType .REDUCE => new ReducePartitionCommitHandler (
276
+ appUniqueId,
277
+ conf,
278
+ lifecycleManager.shuffleAllocatedWorkers,
279
+ committedPartitionInfo,
280
+ lifecycleManager.workerStatusTracker)
281
+ case PartitionType .MAP => new MapPartitionCommitHandler (
282
+ appUniqueId,
283
+ conf,
284
+ lifecycleManager.shuffleAllocatedWorkers,
285
+ committedPartitionInfo,
286
+ lifecycleManager.workerStatusTracker)
287
+ case _ => throw new UnsupportedOperationException (
288
+ s " Unexpected ShufflePartitionType for CommitManager: $partitionType" )
289
+ }
288
290
})
289
291
}
290
292
0 commit comments