Skip to content

HDDS-9151. Close EC Pipeline when container transitions to closing #5191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,27 @@ public int getContainerStateCount(final LifeCycleState state) {
public ContainerInfo allocateContainer(
final ReplicationConfig replicationConfig, final String owner)
throws IOException {
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
ContainerInfo containerInfo = null;
lock.lock();
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
}
} finally {
pipelineManager.releaseReadLock();
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
}

if (pipelines.isEmpty()) {
Expand All @@ -202,22 +205,26 @@ public ContainerInfo allocateContainer(
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
pipelineManager.acquireReadLock();

lock.lock();
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
} else {
throw new IOException("Could not allocate container. Cannot get any" +
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN");
pipelineManager.acquireReadLock();
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
} else {
throw new IOException("Could not allocate container. Cannot get " +
" any matching pipeline for replicationConfig: " +
replicationConfig + ", State:PipelineState.OPEN");
}
} finally {
pipelineManager.releaseReadLock();
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
}
}
return containerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,27 @@ public void addContainerToPipelineSCMStart(
@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
// should not lock here, since no ratis operation happens.
stateManager.removeContainerFromPipeline(pipelineID, containerID);
acquireWriteLock();
try {
Pipeline pipeline = stateManager.getPipeline(pipelineID);
stateManager.removeContainerFromPipeline(pipelineID, containerID);
if (pipeline.getReplicationConfig().getReplicationType()
== ReplicationType.EC) {
// For EC, a pipeline is used for only a single container. When that
// container is closed or removed from the pipeline, the pipeline should
// also be closed. For EC, if a pipeline had a container and then the
// container is removed via this method, the pipeline is no longer
// useful - nothing will allocate a new container on it. Therefore, we
// close the pipeline here to free up the pipeline slot for a new
// pipeline to get created
closePipeline(pipeline, true);
}
} catch (PipelineNotFoundException e) {
LOG.warn("Pipeline {} not found when removing container {}",
pipelineID, containerID, e);
} finally {
releaseWriteLock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
Expand Down Expand Up @@ -352,6 +353,60 @@ public void testRemovePipeline() throws Exception {
}
}

@Test
public void testRemoveContainerFromPipeline() throws Exception {
try (PipelineManagerImpl pipelineManager = createPipelineManager(true)) {
Pipeline ratisPipeline = pipelineManager.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
pipelineManager.openPipeline(ratisPipeline.getId());
ECReplicationConfig ecRepConfig = new ECReplicationConfig(3, 2);
Pipeline ecPipeline = pipelineManager.createPipeline(ecRepConfig);
pipelineManager.openPipeline(ecPipeline.getId());

ContainerInfo ratisContainerInfo = HddsTestUtils.
getContainer(HddsProtos.LifeCycleState.OPEN, ratisPipeline.getId());
ContainerInfo ecContainerInfo = HddsTestUtils.getECContainer(
HddsProtos.LifeCycleState.OPEN, ecPipeline.getId(), ecRepConfig);

pipelineManager.addContainerToPipeline(ratisPipeline.getId(),
ratisContainerInfo.containerID());
pipelineManager.addContainerToPipeline(ecPipeline.getId(),
ecContainerInfo.containerID());

List<Pipeline> ratisPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN);

List<Pipeline> ecPipelines = pipelineManager.getPipelines(
ecRepConfig, Pipeline.PipelineState.OPEN);

// Background pipeline creator could create additional Ratis pipelines,
// so an equals check may not work. EC pipelines are created on demand,
// so an equals check is fine.
Assertions.assertTrue(ratisPipelines.contains(ratisPipeline));
Assertions.assertEquals(1, ecPipelines.size());

// Ensure both pipelines have a single container
Assertions.assertEquals(1, pipelineManager
.getContainersInPipeline(ratisPipeline.getId()).size());
Assertions.assertEquals(1, pipelineManager
.getContainersInPipeline(ecPipeline.getId()).size());

// Remove the container from the Ratis pipeline - this should leave the
// pipeline in the OPEN state
pipelineManager.removeContainerFromPipeline(ratisPipeline.getId(),
ratisContainerInfo.containerID());
Assertions.assertEquals(OPEN, pipelineManager
.getPipeline(ratisPipeline.getId()).getPipelineState());

// Removing from the EC pipeline should trigger the pipeline to close.
pipelineManager.removeContainerFromPipeline(ecPipeline.getId(),
ecContainerInfo.containerID());
Assertions.assertEquals(CLOSED, pipelineManager
.getPipeline(ecPipeline.getId()).getPipelineState());
}
}

@Test
public void testClosePipelineShouldFailOnFollower() throws Exception {
try (PipelineManagerImpl pipelineManager = createPipelineManager(true)) {
Expand Down Expand Up @@ -539,7 +594,7 @@ public void testScrubPipelines() throws Exception {
Assertions.assertTrue(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.CLOSED).contains(closedPipeline));
CLOSED).contains(closedPipeline));

// Set the clock to "now". All pipelines were created before this.
testClock.set(Instant.now());
Expand All @@ -557,7 +612,7 @@ public void testScrubPipelines() throws Exception {
Assertions.assertFalse(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.CLOSED).contains(closedPipeline));
CLOSED).contains(closedPipeline));

testClock.fastForward((60000));

Expand Down Expand Up @@ -592,7 +647,7 @@ public void testScrubOpenWithUnregisteredNodes() throws Exception {

pipelineManager.scrubPipelines();
pipeline = pipelineManager.getPipeline(pipeline.getId());
Assertions.assertEquals(Pipeline.PipelineState.CLOSED,
Assertions.assertEquals(CLOSED,
pipeline.getPipelineState());
}

Expand Down