Skip to content

[CELEBORN-1997] RemoteShuffleInputGateDelegation should regard CelebornIOException as data consumption error for RestartPipelinedRegionFailoverStrategy #3256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

SteNicholas
Copy link
Member

@SteNicholas SteNicholas commented May 14, 2025

What changes were proposed in this pull request?

RemoteShuffleInputGateDelegation should regard CelebornIOException as data consumption error for RestartPipelinedRegionFailoverStrategy.

Why are the changes needed?

RemoteShuffleInputGateDelegation only regards PartitionUnRetryAbleException as data consumption error, which marks the corresponding data partition to be failed for failover process. Similar to Spark's behavior in handling fetch exception, RemoteShuffleInputGateDelegation should regard CelebornIOException as data consumption error to restart upstream task. The tasks needing restart of RestartPipelinedRegionFailoverStrategy is get with the following way:

/**
  * Returns a set of IDs corresponding to the set of vertices that should be restarted. In this
  * strategy, all task vertices in 'involved' regions are proposed to be restarted. The
  * 'involved' regions are calculated with rules below: 1. The region containing the failed task
  * is always involved 2. If an input result partition of an involved region is not available,
  * i.e. Missing or Corrupted, the region containing the partition producer task is involved 3.
  * If a region is involved, all of its consumer regions are involved
  *
  * @param executionVertexId ID of the failed task
  * @param cause cause of the failure
  * @return set of IDs of vertices to restart
  */
@Override
public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {

        final SchedulingPipelinedRegion failedRegion =
                topology.getPipelinedRegionOfVertex(executionVertexId);
        if (failedRegion == null) {
            // TODO: show the task name in the log
            throw new IllegalStateException(
                    "Can not find the failover region for task " + executionVertexId, cause);
        }

        // if the failure cause is data consumption error, mark the corresponding data partition to
        // be failed,
        // so that the failover process will try to recover it
        Optional<PartitionException> dataConsumptionException =
                ExceptionUtils.findThrowable(cause, PartitionException.class);
        if (dataConsumptionException.isPresent()) {
            resultPartitionAvailabilityChecker.markResultPartitionFailed(
                    dataConsumptionException.get().getPartitionId().getPartitionId());
        }

        // calculate the tasks to restart based on the result of regions to restart
        Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
        for (SchedulingPipelinedRegion region : getRegionsToRestart(failedRegion)) {
            for (SchedulingExecutionVertex vertex : region.getVertices()) {
                // we do not need to restart tasks which are already in the initial state
                if (vertex.getState() != ExecutionState.CREATED) {
                    tasksToRestart.add(vertex.getId());
                }
            }
        }

        // the previous failed partition will be recovered. remove its failed state from the checker
        if (dataConsumptionException.isPresent()) {
            resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
                    dataConsumptionException.get().getPartitionId().getPartitionId());
        }

        return tasksToRestart;
}

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manual.

…rnIOException as data consumption error for RestartPipelinedRegionFailoverStrategy
@SteNicholas
Copy link
Member Author

Ping @reswqa, @codenohup, @FMX, @RexXiong, @mridulm.

@SteNicholas SteNicholas requested review from FMX, RexXiong and reswqa May 14, 2025 02:58
Copy link
Member

@reswqa reswqa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if all CelebornIOException should trigger upstream recomputing. cc @RexXiong

if (throwable.getMessage() != null && throwable.getMessage().contains(clazz.getName())) {
String message = throwable.getMessage();
if (message != null
&& (message.contains(CelebornIOException.class.getName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openStreamInternal could trigger an exception, but it should not be considered a data consumption error because the task should retry for it. Maybe this change is too generic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FMX, is it better to change some CelebornIOException regarded as data consumption error to PartitionUnRetryAbleException?

@SteNicholas SteNicholas requested a review from FMX May 20, 2025 08:46
@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 5590ef0 to 0dffcf6 Compare May 26, 2025 09:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants