Skip to content

HDDS-8781. Allow on demand metadata scanning of open containers #8442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
Expand Down Expand Up @@ -419,7 +419,7 @@ && getMissingContainerSet().contains(containerID)) {
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
OnDemandContainerScanner.scanContainer(container);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@

/**
* Class for performing on demand scans of containers.
* Note: [OPEN] containers are scanned only for metadata,
* [CLOSED, QUASI_CLOSED] containers are scanned for metadata and data.
*/
public final class OnDemandContainerDataScanner {
public final class OnDemandContainerScanner {
private static final Logger LOG =
LoggerFactory.getLogger(OnDemandContainerDataScanner.class);
LoggerFactory.getLogger(OnDemandContainerScanner.class);

private static volatile OnDemandContainerDataScanner instance;
private static volatile OnDemandContainerScanner instance;

private final ExecutorService scanExecutor;
private final ContainerController containerController;
Expand All @@ -56,7 +58,7 @@ public final class OnDemandContainerDataScanner {
private final OnDemandScannerMetrics metrics;
private final long minScanGap;

private OnDemandContainerDataScanner(
private OnDemandContainerScanner(
ContainerScannerConfiguration conf, ContainerController controller) {
containerController = controller;
throttler = new DataTransferThrottler(
Expand All @@ -75,7 +77,7 @@ public static synchronized void init(
" a second time on a datanode.");
return;
}
instance = new OnDemandContainerDataScanner(conf, controller);
instance = new OnDemandContainerScanner(conf, controller);
}

private static boolean shouldScan(Container<?> container) {
Expand All @@ -97,7 +99,7 @@ private static boolean shouldScan(Container<?> container) {
}

return !ContainerUtils.recentlyScanned(container, instance.minScanGap,
LOG) && container.shouldScanData();
LOG) && (container.shouldScanMetadata() || container.shouldScanData());
}

public static Optional<Future<?>> scanContainer(Container<?> container) {
Expand Down Expand Up @@ -135,13 +137,18 @@ private static void performOnDemandScan(Container<?> container) {
ContainerData containerData = container.getContainerData();
logScanStart(containerData);

ScanResult result =
container.scanData(instance.throttler, instance.canceler);
// Metrics for skipped containers should not be updated.
if (result.getFailureType() == DELETED_CONTAINER) {
LOG.error("Container [{}] has been deleted.",
containerId, result.getException());
return;
ScanResult result = ScanResult.healthy();
// OPEN containers are scanned here for metadata only
if (container.shouldScanMetadata() && !container.shouldScanData()) {
result = container.scanMetaData();
} else if (container.shouldScanData()) {
result = container.scanData(instance.throttler, instance.canceler);
// Metrics for skipped containers should not be updated.
if (result.getFailureType() == DELETED_CONTAINER) {
LOG.error("Container [{}] has been deleted.",
containerId, result.getException());
return;
}
}
if (!result.isHealthy()) {
LOG.error("Corruption detected in container [{}]." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) {
"so the on-demand container data scanner will not start.");
return;
}
OnDemandContainerDataScanner.init(c, controller);
OnDemandContainerScanner.init(c, controller);
}

/**
Expand All @@ -453,7 +453,7 @@ private void stopContainerScrub() {
for (BackgroundContainerDataScanner s : dataScanners) {
s.shutdown();
}
OnDemandContainerDataScanner.shutdown();
OnDemandContainerScanner.shutdown();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Unit tests for the on-demand container scanner.
*/
@MockitoSettings(strictness = Strictness.LENIENT)
public class TestOnDemandContainerDataScanner extends
public class TestOnDemandContainerScanner extends
TestContainerScannersAbstract {

@Override
Expand Down Expand Up @@ -95,14 +95,14 @@ public void testUnscannedContainerIsScanned() throws Exception {

@AfterEach
public void tearDown() {
OnDemandContainerDataScanner.shutdown();
OnDemandContainerScanner.shutdown();
}

@Test
public void testScanTimestampUpdated() throws Exception {
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerScanner.init(conf, controller);
Optional<Future<?>> scanFuture =
OnDemandContainerDataScanner.scanContainer(healthy);
OnDemandContainerScanner.scanContainer(healthy);
assertTrue(scanFuture.isPresent());
scanFuture.get().get();
verify(controller, atLeastOnce())
Expand All @@ -111,7 +111,7 @@ public void testScanTimestampUpdated() throws Exception {

// Metrics for deleted container should not be updated.
scanFuture =
OnDemandContainerDataScanner.scanContainer(healthy);
OnDemandContainerScanner.scanContainer(healthy);
assertTrue(scanFuture.isPresent());
scanFuture.get().get();
verify(controller, never())
Expand All @@ -121,33 +121,33 @@ public void testScanTimestampUpdated() throws Exception {

@Test
public void testContainerScannerMultipleInitsAndShutdowns() throws Exception {
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerDataScanner.shutdown();
OnDemandContainerDataScanner.shutdown();
OnDemandContainerScanner.init(conf, controller);
OnDemandContainerScanner.init(conf, controller);
OnDemandContainerScanner.shutdown();
OnDemandContainerScanner.shutdown();
//There shouldn't be an interaction after shutdown:
OnDemandContainerDataScanner.scanContainer(corruptData);
OnDemandContainerScanner.scanContainer(corruptData);
verifyContainerMarkedUnhealthy(corruptData, never());
}

@Test
public void testSameContainerQueuedMultipleTimes() throws Exception {
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerScanner.init(conf, controller);
//Given a container that has not finished scanning
CountDownLatch latch = new CountDownLatch(1);
when(corruptData.scanData(
OnDemandContainerDataScanner.getThrottler(),
OnDemandContainerDataScanner.getCanceler()))
OnDemandContainerScanner.getThrottler(),
OnDemandContainerScanner.getCanceler()))
.thenAnswer((Answer<ScanResult>) invocation -> {
latch.await();
return getUnhealthyScanResult();
});
Optional<Future<?>> onGoingScan = OnDemandContainerDataScanner
Optional<Future<?>> onGoingScan = OnDemandContainerScanner
.scanContainer(corruptData);
assertTrue(onGoingScan.isPresent());
assertFalse(onGoingScan.get().isDone());
//When scheduling the same container again
Optional<Future<?>> secondScan = OnDemandContainerDataScanner
Optional<Future<?>> secondScan = OnDemandContainerScanner
.scanContainer(corruptData);
//Then the second scan is not scheduled and the first scan can still finish
assertFalse(secondScan.isPresent());
Expand All @@ -160,31 +160,31 @@ public void testSameContainerQueuedMultipleTimes() throws Exception {
@Test
@Override
public void testScannerMetrics() throws Exception {
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerScanner.init(conf, controller);
ArrayList<Optional<Future<?>>> resultFutureList = Lists.newArrayList();
resultFutureList.add(OnDemandContainerDataScanner.scanContainer(
resultFutureList.add(OnDemandContainerScanner.scanContainer(
corruptData));
resultFutureList.add(
OnDemandContainerDataScanner.scanContainer(openContainer));
OnDemandContainerScanner.scanContainer(openContainer));
resultFutureList.add(
OnDemandContainerDataScanner.scanContainer(openCorruptMetadata));
resultFutureList.add(OnDemandContainerDataScanner.scanContainer(healthy));
OnDemandContainerScanner.scanContainer(openCorruptMetadata));
resultFutureList.add(OnDemandContainerScanner.scanContainer(healthy));
waitOnScannerToFinish(resultFutureList);
OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics();
OnDemandScannerMetrics metrics = OnDemandContainerScanner.getMetrics();
//Containers with shouldScanData = false shouldn't increase
// the number of scanned containers
assertEquals(1, metrics.getNumUnHealthyContainers());
assertEquals(2, metrics.getNumContainersScanned());
assertEquals(2, metrics.getNumUnHealthyContainers());
assertEquals(4, metrics.getNumContainersScanned());
}

@Test
@Override
public void testScannerMetricsUnregisters() {
OnDemandContainerDataScanner.init(conf, controller);
String metricsName = OnDemandContainerDataScanner.getMetrics().getName();
OnDemandContainerScanner.init(conf, controller);
String metricsName = OnDemandContainerScanner.getMetrics().getName();
assertNotNull(DefaultMetricsSystem.instance().getSource(metricsName));
OnDemandContainerDataScanner.shutdown();
OnDemandContainerDataScanner.scanContainer(healthy);
OnDemandContainerScanner.shutdown();
OnDemandContainerScanner.scanContainer(healthy);
assertNull(DefaultMetricsSystem.instance().getSource(metricsName));
}

Expand All @@ -193,15 +193,15 @@ public void testScannerMetricsUnregisters() {
public void testUnhealthyContainersDetected() throws Exception {
// Without initialization,
// there shouldn't be interaction with containerController
OnDemandContainerDataScanner.scanContainer(corruptData);
OnDemandContainerScanner.scanContainer(corruptData);
verifyNoInteractions(controller);

scanContainer(healthy);
verifyContainerMarkedUnhealthy(healthy, never());
scanContainer(corruptData);
verifyContainerMarkedUnhealthy(corruptData, atLeastOnce());
scanContainer(openCorruptMetadata);
verifyContainerMarkedUnhealthy(openCorruptMetadata, never());
verifyContainerMarkedUnhealthy(openCorruptMetadata, atLeastOnce());
scanContainer(openContainer);
verifyContainerMarkedUnhealthy(openContainer, never());
// Deleted containers should not be marked unhealthy
Expand All @@ -220,8 +220,8 @@ public void testUnhealthyContainersDetected() throws Exception {
public void testWithVolumeFailure() throws Exception {
when(vol.isFailed()).thenReturn(true);

OnDemandContainerDataScanner.init(conf, controller);
OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics();
OnDemandContainerScanner.init(conf, controller);
OnDemandScannerMetrics metrics = OnDemandContainerScanner.getMetrics();

scanContainer(healthy);
verifyContainerMarkedUnhealthy(healthy, never());
Expand All @@ -245,11 +245,11 @@ public void testShutdownDuringScan() throws Exception {
});

// Start the blocking scan.
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerDataScanner.scanContainer(healthy);
OnDemandContainerScanner.init(conf, controller);
OnDemandContainerScanner.scanContainer(healthy);
// Shut down the on demand scanner. This will interrupt the blocked scan
// on the healthy container.
OnDemandContainerDataScanner.shutdown();
OnDemandContainerScanner.shutdown();
// Interrupting the healthy container's scan should not mark it unhealthy.
verifyContainerMarkedUnhealthy(healthy, never());
}
Expand All @@ -266,7 +266,7 @@ public void testUnhealthyContainerNotRescanned() throws Exception {
// First iteration should find the unhealthy container.
scanContainer(unhealthy);
verifyContainerMarkedUnhealthy(unhealthy, atMostOnce());
OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics();
OnDemandScannerMetrics metrics = OnDemandContainerScanner.getMetrics();
assertEquals(1, metrics.getNumContainersScanned());
assertEquals(1, metrics.getNumUnHealthyContainers());

Expand All @@ -291,9 +291,9 @@ public void testUnhealthyContainerNotRescanned() throws Exception {
}

private void scanContainer(Container<?> container) throws Exception {
OnDemandContainerDataScanner.init(conf, controller);
OnDemandContainerScanner.init(conf, controller);
Optional<Future<?>> scanFuture =
OnDemandContainerDataScanner.scanContainer(container);
OnDemandContainerScanner.scanContainer(container);
if (scanFuture.isPresent()) {
scanFuture.get().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -36,7 +36,7 @@
* is triggered when there is an error while a client interacts with a
* container.
*/
class TestOnDemandContainerDataScannerIntegration
class TestOnDemandContainerScannerIntegration
extends TestContainerScannerIntegrationAbstract {

private final GenericTestUtils.LogCapturer logCapturer =
Expand Down Expand Up @@ -64,6 +64,17 @@ static Collection<ContainerCorruptions> supportedCorruptionTypes() {
ContainerCorruptions.TRUNCATED_BLOCK);
}

static Collection<ContainerCorruptions> supportedCorruptionTypesForOpen() {
return ContainerCorruptions.getAllParamsExcept(
ContainerCorruptions.MISSING_METADATA_DIR,
ContainerCorruptions.MISSING_CONTAINER_FILE,
ContainerCorruptions.CORRUPT_CONTAINER_FILE,
ContainerCorruptions.TRUNCATED_CONTAINER_FILE,
ContainerCorruptions.CORRUPT_BLOCK,
ContainerCorruptions.TRUNCATED_BLOCK,
ContainerCorruptions.MISSING_BLOCK);
}

@BeforeAll
static void init() throws Exception {
OzoneConfiguration ozoneConfig = new OzoneConfiguration();
Expand All @@ -82,7 +93,7 @@ static void init() throws Exception {
}

/**
* {@link OnDemandContainerDataScanner} should detect corrupted blocks
* {@link OnDemandContainerScanner} should detect corrupted blocks
* in a closed container when a client reads from it.
*/
@ParameterizedTest
Expand All @@ -109,4 +120,36 @@ void testCorruptionDetected(ContainerCorruptions corruption)
waitForScmToSeeUnhealthyReplica(containerID);
corruption.assertLogged(logCapturer);
}

/**
* {@link OnDemandContainerScanner} should detect corrupted blocks
* in a open container when a client reads from it.
*/
@ParameterizedTest
@MethodSource("supportedCorruptionTypesForOpen")
void testCorruptionDetectedForOpenContainers(ContainerCorruptions corruption)
throws Exception {
String keyName = "keyName";

long openContainerID = writeDataToOpenContainer();
Container<?> openContainer = getDnContainer(openContainerID);
assertEquals(State.OPEN, openContainer.getContainerState());

// Corrupt the container.
corruption.applyTo(openContainer);
// This method will check that reading from the corrupted key returns an
// error to the client.
readFromCorruptedKey(keyName);

// Reading from the corrupted key should have triggered an on-demand scan
// of the container, which will detect the corruption.
GenericTestUtils.waitFor(
() -> openContainer.getContainerState() == State.UNHEALTHY,
500, 5000);

// Wait for SCM to get a report of the unhealthy replica.
waitForScmToSeeUnhealthyReplica(openContainerID);
corruption.assertLogged(logCapturer);
}

}