Skip to content

HDDS-11904. Fix HealthyPipelineSafeModeRule logic. #8386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
Expand Down Expand Up @@ -59,13 +66,15 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
private final int minHealthyPipelines;
private final SCMContext scmContext;
private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>();
private final NodeManager nodeManager;

HealthyPipelineSafeModeRule(EventQueue eventQueue,
PipelineManager pipelineManager, SCMSafeModeManager manager,
ConfigurationSource configuration, SCMContext scmContext) {
ConfigurationSource configuration, SCMContext scmContext, NodeManager nodeManager) {
super(manager, NAME, eventQueue);
this.pipelineManager = pipelineManager;
this.scmContext = scmContext;
this.nodeManager = nodeManager;
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
Expand Down Expand Up @@ -122,21 +131,65 @@ protected synchronized boolean validate() {

@Override
protected synchronized void process(Pipeline pipeline) {
Preconditions.checkNotNull(pipeline);

// When SCM is in safe mode for long time, already registered
// datanode can send pipeline report again, or SCMPipelineManager will
// create new pipelines.
Preconditions.checkNotNull(pipeline);
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
((RatisReplicationConfig) pipeline.getReplicationConfig())
.getReplicationFactor() == HddsProtos.ReplicationFactor.THREE &&
!processedPipelineIDs.contains(pipeline.getId())) {
getSafeModeMetrics().incCurrentHealthyPipelinesCount();
currentHealthyPipelineCount++;
processedPipelineIDs.add(pipeline.getId());
unProcessedPipelineSet.remove(pipeline.getId());

// Only handle RATIS + 3-replica pipelines.
if (pipeline.getType() != HddsProtos.ReplicationType.RATIS ||
((RatisReplicationConfig) pipeline.getReplicationConfig()).getReplicationFactor() !=
HddsProtos.ReplicationFactor.THREE) {
SCMSafeModeManager.getLogger().warn(
"Skipping pipeline safemode report processing as Replication type isn't RATIS " +
"or replication factor isn't 3.");
return;
}

// Skip already processed ones.
if (processedPipelineIDs.contains(pipeline.getId())) {
LOG.info("Skipping pipeline safemode report processing check as pipeline: {} is already recorded.",
pipeline.getId());
return;
}

List<DatanodeDetails> pipelineDns = pipeline.getNodes();
if (pipelineDns.size() != 3) {
LOG.warn("Only {} DNs reported this pipeline: {}, all 3 DNs should report the pipeline", pipelineDns.size(),
pipeline.getId());
return;
}

Map<DatanodeDetails, String> badDnsWithReasons = new LinkedHashMap<>();

for (DatanodeDetails dn : pipelineDns) {
try {
NodeStatus status = nodeManager.getNodeStatus(dn);
if (!status.equals(NodeStatus.inServiceHealthy())) {
String reason = String.format("Health: %s, Operational State: %s",
status.getHealth(), status.getOperationalState());
badDnsWithReasons.put(dn, reason);
}
} catch (NodeNotFoundException e) {
badDnsWithReasons.put(dn, "DN not registered with SCM");
}
}

if (!badDnsWithReasons.isEmpty()) {
LOG.warn("Below DNs reported by Pipeline: {} are either in bad health or un-registered with SCMs",
pipeline.getId());
for (Map.Entry<DatanodeDetails, String> entry : badDnsWithReasons.entrySet()) {
LOG.warn("DN {}: {}", entry.getKey().getID(), entry.getValue());
}
return;
}

getSafeModeMetrics().incCurrentHealthyPipelinesCount();
currentHealthyPipelineCount++;
processedPipelineIDs.add(pipeline.getId());
unProcessedPipelineSet.remove(pipeline.getId());

if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. Healthy pipelines reported count is {}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void loadRules() {

if (pipelineManager != null) {
safeModeRules.add(new HealthyPipelineSafeModeRule(eventQueue, pipelineManager,
safeModeManager, config, scmContext));
safeModeManager, config, scmContext, nodeManager));
safeModeRules.add(new OneReplicaPipelineSafeModeRule(eventQueue, pipelineManager,
safeModeManager, config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines()
firePipelineEvent(pipeline1, eventQueue);

GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
"reported count is 1"),
"replication factor isn't 3"),
1000, 5000);
assertFalse(healthyPipelineSafeModeRule.validate());

Expand All @@ -299,6 +300,78 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines()

}

@Test
public void testPipelineIgnoredWhenDnIsUnhealthy() throws Exception {
EventQueue eventQueue = new EventQueue();
SCMServiceManager serviceManager = new SCMServiceManager();
SCMContext scmContext = SCMContext.emptyContext();
List<ContainerInfo> containers =
new ArrayList<>(HddsTestUtils.getContainerInfo(1));

OzoneConfiguration config = new OzoneConfiguration();
MockNodeManager nodeManager = new MockNodeManager(true, 12);
ContainerManager containerManager = mock(ContainerManager.class);
when(containerManager.getContainers()).thenReturn(containers);
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFile.getPath());
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);

SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
try {
PipelineManagerImpl pipelineManager =
PipelineManagerImpl.newPipelineManager(
config,
SCMHAManagerStub.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
scmContext,
serviceManager,
Clock.system(ZoneOffset.UTC));

PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);

// Create a Ratis pipeline with 3 replicas
Pipeline pipeline =
pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE));
pipelineManager.openPipeline(pipeline.getId());
pipeline = pipelineManager.getPipeline(pipeline.getId());
MockRatisPipelineProvider.markPipelineHealthy(pipeline);

// Mark one DN as DEAD
DatanodeDetails dnDead = pipeline.getNodes().get(0);
nodeManager.setNodeState(dnDead, HddsProtos.NodeState.DEAD);

SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
config, containerManager, pipelineManager, nodeManager, eventQueue,
serviceManager, scmContext);

LogCapturer logCapturer = LogCapturer.captureLogs(
HealthyPipelineSafeModeRule.class);

HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
scmSafeModeManager.getHealthyPipelineSafeModeRule();

// Fire the pipeline report
firePipelineEvent(pipeline, eventQueue);

// Wait for log message indicating the pipeline's DN is in bad health.
GenericTestUtils.waitFor(
() -> logCapturer.getOutput().contains("are either in bad health or un-registered with SCMs"),
100, 5000);

// Ensure the rule is NOT satisfied due to unhealthy DN
assertFalse(healthyPipelineSafeModeRule.validate());
} finally {
scmMetadataStore.getStore().close();
}
}

private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
Expand Down