Skip to content

HDDS-12825. ReconIncrementalContainerReportHandler is not synchronized on datanode. #8272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ abstract class AbstractContainerReportHandler {
protected abstract Logger getLogger();

/** @return the container in SCM and the replica from a datanode details for logging. */
static Object getDetailsForLogging(ContainerInfo container, ContainerReplicaProto replica, DatanodeDetails datanode) {
protected static Object getDetailsForLogging(ContainerInfo container, ContainerReplicaProto replica,
DatanodeDetails datanode) {
Objects.requireNonNull(replica, "replica == null");
Objects.requireNonNull(datanode, "datanode == null");
if (container != null) {
Expand Down Expand Up @@ -93,21 +94,6 @@ public String toString() {
};
}

/**
* Process the given ContainerReplica received from specified datanode.
*
* @param datanodeDetails DatanodeDetails for the DN
* @param replicaProto Protobuf representing the replicas
* @param publisher EventPublisher instance
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
throws IOException, InvalidStateTransitionException {
ContainerInfo container = getContainerManager().getContainer(
ContainerID.valueOf(replicaProto.getContainerID()));
processContainerReplica(
datanodeDetails, container, replicaProto, publisher);
}

/**
* Process the given ContainerReplica received from specified datanode.
Expand All @@ -120,18 +106,15 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerInfo containerInfo,
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
final ContainerReplicaProto replicaProto, final EventPublisher publisher, Object detailsForLogging)
throws IOException, InvalidStateTransitionException {
final ContainerID containerId = containerInfo.containerID();
final Object detailsForLogging = getDetailsForLogging(containerInfo, replicaProto, datanodeDetails);

getLogger().debug("Processing replica {}", detailsForLogging);
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerInfo) {
updateContainerStats(datanodeDetails, containerInfo, replicaProto, detailsForLogging);
if (!updateContainerState(datanodeDetails, containerInfo, replicaProto, publisher, detailsForLogging)) {
updateContainerReplica(datanodeDetails, containerId, replicaProto);
updateContainerReplica(datanodeDetails, containerInfo.containerID(), replicaProto);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ private void processSingleReplica(final DatanodeDetails datanodeDetails,
return;
}
try {
processContainerReplica(
datanodeDetails, container, replicaProto, publisher);
processContainerReplica(datanodeDetails, container, replicaProto, publisher, detailsForLogging);
} catch (IOException | InvalidStateTransitionException e) {
getLogger().error("Failed to process {}", detailsForLogging, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
/**
* Handles incremental container reports from datanode.
*/
public class IncrementalContainerReportHandler extends
AbstractContainerReportHandler
public class IncrementalContainerReportHandler
extends AbstractContainerReportHandler
implements EventHandler<IncrementalContainerReportFromDatanode> {

private static final Logger LOG = LoggerFactory.getLogger(
IncrementalContainerReportHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(IncrementalContainerReportHandler.class);

public IncrementalContainerReportHandler(
final NodeManager nodeManager,
Expand All @@ -58,14 +57,25 @@ protected Logger getLogger() {
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
final DatanodeDetails datanode = getDatanodeDetails(report);
if (datanode == null) {
return;
}
processICR(report, publisher, datanode);
}

protected DatanodeDetails getDatanodeDetails(final IncrementalContainerReportFromDatanode report) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
getLogger().debug("Processing incremental container report from datanode {}", dnFromReport);
final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
if (dd == null) {
getLogger().warn("Datanode not found: {}", dnFromReport);
return;
}
return dd;
}

protected void processICR(IncrementalContainerReportFromDatanode report,
EventPublisher publisher, DatanodeDetails dd) {
boolean success = false;
// HDDS-5249 - we must ensure that an ICR and FCR for the same datanode
// do not run at the same time or it can result in a data consistency
Expand All @@ -74,13 +84,15 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
synchronized (dd) {
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
Object detailsForLogging = getDetailsForLogging(null, replicaProto, dd);
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
ContainerInfo container = null;
final ContainerInfo container;
try {
try {
container = getContainerManager().getContainer(id);
// Ensure we reuse the same ContainerID instance in containerInfo
id = container.containerID();
detailsForLogging = getDetailsForLogging(container, replicaProto, dd);
} finally {
if (replicaProto.getState() == State.DELETED) {
getNodeManager().removeContainer(dd, id);
Expand All @@ -89,27 +101,23 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
}
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher);
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);
}
success = true;
} catch (ContainerNotFoundException e) {
getLogger().warn("Container {} not found!", replicaProto.getContainerID());
getLogger().warn("Container not found: {}", detailsForLogging);
} catch (NodeNotFoundException ex) {
getLogger().error("Datanode not found {}", report.getDatanodeDetails(), ex);
getLogger().error("{}: {}", ex, detailsForLogging);
} catch (ContainerReplicaNotFoundException e) {
getLogger().warn("Container {} replica not found!",
replicaProto.getContainerID());
getLogger().warn("Container replica not found: {}", detailsForLogging, e);
} catch (SCMException ex) {
if (ex.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
getLogger().info("Failed to process {} container {}: {}",
replicaProto.getState(), id, ex.getMessage());
getLogger().info("SCM_NOT_LEADER: Failed to process {}", detailsForLogging);
} else {
getLogger().error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), ex);
getLogger().info("Failed to process {}", detailsForLogging, ex);
}
} catch (IOException | InvalidStateTransitionException e) {
getLogger().error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), e);
getLogger().info("Failed to process {}", detailsForLogging, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.hadoop.ozone.recon.scm;

import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,10 +33,9 @@
public class ReconIncrementalContainerReportHandler
extends IncrementalContainerReportHandler {

private static final Logger LOG = LoggerFactory.getLogger(
ReconIncrementalContainerReportHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(ReconIncrementalContainerReportHandler.class);

public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
ReconIncrementalContainerReportHandler(NodeManager nodeManager,
ContainerManager containerManager, SCMContext scmContext) {
super(nodeManager, containerManager, scmContext);
}
Expand All @@ -55,16 +48,8 @@ protected Logger getLogger() {
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing incremental container report from data node {}",
dnFromReport);
}

final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
if (dd == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
final DatanodeDetails datanode = getDatanodeDetails(report);
if (datanode == null) {
return;
}

Expand All @@ -77,36 +62,6 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
LOG.error("Exception while checking and adding new container.", ioEx);
return;
}
boolean success = true;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
ContainerInfo container = null;
try {
try {
container = getContainerManager().getContainer(id);
// Ensure we reuse the same ContainerID instance in containerInfo
id = container.containerID();
} finally {
if (replicaProto.getState().equals(
ContainerReplicaProto.State.DELETED)) {
getNodeManager().removeContainer(dd, id);
} else {
getNodeManager().addContainer(dd, id);
}
}
processContainerReplica(dd, replicaProto, publisher);
success = true;
} catch (NodeNotFoundException ex) {
success = false;
LOG.error("Received ICR from unknown datanode {}.",
report.getDatanodeDetails(), ex);
} catch (IOException | InvalidStateTransitionException e) {
success = false;
LOG.error("Exception while processing ICR for container {}",
replicaProto.getContainerID());
}
}
containerManager.notifyContainerReportProcessing(false, success);
processICR(report, publisher, datanode);
}
}
Loading