Skip to content

HDDS-10374. Make container scanner generate merkle trees during the scan #7490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 63 commits into
base: HDDS-10239-container-reconciliation
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a8b8dbc
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 Nov 22, 2024
999a913
Add code to build and write the tree from the data scanners
errose28 Nov 22, 2024
b0d1ba9
Update todo in acceptance test
errose28 Nov 25, 2024
382bce2
Add unit tests for tree generation by scanners based on container state
errose28 Nov 25, 2024
28b1889
Add initial (failing) unit test for KeyValueContaienrCheck
errose28 Nov 26, 2024
dc182e8
Update container data checksum when building the tree
errose28 Nov 26, 2024
a3401a9
Fix handling of fully truncated block of 0 size
errose28 Jan 7, 2025
a25d44d
Add unit tests for new addBlock method in tree
errose28 Jan 7, 2025
7550a3c
Test that SCM gets a checksum with the container report
errose28 Jan 7, 2025
847f8d8
Add (failing) tests that SCM sees updated checksums
errose28 Jan 7, 2025
452c294
Update acceptance test
errose28 Jan 8, 2025
dc45eca
Add javadoc for tree generation from metadata
errose28 Jan 8, 2025
1cb291f
Data integration tests passing
errose28 Jan 8, 2025
d6b21d2
Don't generate tree from metadata for unhealthy container
errose28 Jan 9, 2025
2a2dbbd
Checkstyle
errose28 Jan 9, 2025
c9a077c
Marking container unhealthy should not write a merkle tree (test fix)
errose28 Jan 9, 2025
0bbbdc5
Checkstyle
errose28 Jan 9, 2025
7b971a9
Address review comments
errose28 Jan 13, 2025
15d6848
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 Apr 11, 2025
0989881
Initial use of on demand scan in TestKeyValueHandler
errose28 Apr 11, 2025
834be96
Make on-demand scanner a normal instance
errose28 Apr 15, 2025
e73757e
Register on-demand scan callback in ContainerSet
errose28 Apr 15, 2025
f0d8efe
Migrate scanContainer usage in prod code
errose28 Apr 15, 2025
4cb054c
Switch terminology from error to scan. Add existence checks
errose28 Apr 15, 2025
8abedb6
Update tests
errose28 Apr 15, 2025
577a075
Add unit test for ContainerSet
errose28 Apr 16, 2025
4c8d843
Checkstyle
errose28 Apr 16, 2025
0bd4127
Improve comments and test
errose28 Apr 16, 2025
61fae12
Merge branch 'non-static-on-demand-scan' into HDDS-10374-scanner-buil…
errose28 Apr 16, 2025
61f30f3
WIP migrate reconciliation unit tests
errose28 Apr 17, 2025
192eb7b
Most tests passing
errose28 Apr 23, 2025
0cf79f6
Improve logging in test and prod code
errose28 Apr 28, 2025
8b30f54
Fix tree tracking during reconcile process
errose28 Apr 28, 2025
9c74f4b
Use mixin to standardize scanner operations, log checksum changes in …
errose28 Apr 29, 2025
d550669
Logging improvements
errose28 Apr 29, 2025
97e02ea
Add checksum validation, generate readable data
errose28 Apr 30, 2025
22b41b8
Use tree writer between peer updates. All tests pass
errose28 May 5, 2025
f49a9dd
Wait for on-demand scans to complete in test
errose28 May 5, 2025
f5d4dbf
Improve char data generation, reset scan metrics
errose28 May 5, 2025
1140c90
Update test name
errose28 May 5, 2025
e0aa7cb
Checkstyle
errose28 May 5, 2025
62d7794
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 6, 2025
9c3b87c
Merge branch 'reconcile-unit-test-framework' into HDDS-10374-scanner-…
errose28 May 6, 2025
9322b4a
Fix TODOs dependent on this patch
errose28 May 13, 2025
9b75957
Rename container scan helper
errose28 May 13, 2025
f615275
Add comment on failure type
errose28 May 13, 2025
dadc829
Fix checkstyle unique to this PR
errose28 May 13, 2025
076a82e
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 14, 2025
cc55527
Fix sending ICR when only checksum changes (pending test)
errose28 May 14, 2025
35879b4
Updates after reviewing diff
errose28 May 14, 2025
1ab8c14
Add unit test for KeyValueHandler#updateContainerChecksum
errose28 May 14, 2025
6c8be07
Improve and update scanner integration tests
errose28 May 14, 2025
60a1a6e
Add unit tests that checksum update failure does not stop container s…
errose28 May 14, 2025
d035c17
Checkstyle
errose28 May 14, 2025
53336ae
Fix scan gap for unit test
errose28 May 15, 2025
56e7ed4
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 16, 2025
2504638
Fix metadata scan test
errose28 May 16, 2025
4be9992
Update based on review
errose28 May 19, 2025
c0b89dd
pmd
errose28 May 19, 2025
e24a24e
Update ContainerData checksum info after reconcile with each peer
errose28 May 22, 2025
dc27f74
Support bypassing scan gap (tests are failing)
errose28 May 22, 2025
e2974b4
Checkstyle
errose28 May 27, 2025
34b4b9a
Fix scan gap bug. All tests expected to pass
errose28 May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP migrate reconciliation unit tests
  • Loading branch information
errose28 committed Apr 17, 2025
commit 61f30f304e3f6828081645bf4f30a5a957d38ef9
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ public class TestKeyValueHandler {

private static final long DUMMY_CONTAINER_ID = 9999;
private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB;
private static final int CHUNKS_PER_BLOCK = 4;
private static final String DATANODE_UUID = UUID.randomUUID().toString();
private static final String CLUSTER_ID = UUID.randomUUID().toString();

Expand All @@ -162,26 +160,6 @@ public class TestKeyValueHandler {
private OzoneConfiguration conf;
private ContainerSet mockContainerSet;

/**
* Number of corrupt blocks and chunks.
*/
public static Stream<Arguments> corruptionValues() {
return Stream.of(
Arguments.of(5, 0),
Arguments.of(0, 5),
Arguments.of(0, 10),
Arguments.of(10, 0),
Arguments.of(5, 10),
Arguments.of(10, 5),
Arguments.of(2, 3),
Arguments.of(3, 2),
Arguments.of(4, 6),
Arguments.of(6, 4),
Arguments.of(6, 9),
Arguments.of(9, 6)
);
}

@BeforeEach
public void setup() throws IOException {
// Create mock HddsDispatcher and KeyValueHandler.
Expand All @@ -204,7 +182,6 @@ public void setup() throws IOException {
mock(ContainerMetrics.class),
mock(TokenVerifier.class)
);

}

/**
Expand Down Expand Up @@ -593,151 +570,6 @@ public void testContainerChecksumInvocation(ContainerLayoutVersion layoutVersion
Assertions.assertEquals(1, icrCount.get());
}

@ParameterizedTest
@MethodSource("corruptionValues")
public void testFullContainerReconciliation(int numBlocks, int numChunks) throws Exception {
KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
ContainerChecksumTreeManager checksumManager = kvHandler.getChecksumManager();

ContainerController controller = new ContainerController(mockContainerSet,
Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler));
OnDemandContainerDataScanner.init(conf.getObject(ContainerScannerConfiguration.class), controller, checksumManager);

DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, null, null);
final long containerID = 100L;
// Create 3 containers with 15 blocks each and 3 replicas.
List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, containerID, 15, 3);
assertEquals(3, containers.size());

// Introduce corruption in each container on different replicas.
introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, false);
introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, true);
// Use synchronous on-demand scans to re-build the merkle trees after corruption.
waitForContainerScans(containers);

// Without reconciliation, checksums should be different because of the corruption.
Set<Long> checksumsBeforeReconciliation = new HashSet<>();
for (KeyValueContainer kvContainer : containers) {
Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
checksumManager.read(kvContainer.getContainerData());
assertTrue(containerChecksumInfo.isPresent());
long dataChecksum = containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
assertEquals(kvContainer.getContainerData().getDataChecksum(), dataChecksum);
checksumsBeforeReconciliation.add(dataChecksum);
}
// There should be more than 1 checksum because of the corruption.
assertTrue(checksumsBeforeReconciliation.size() > 1);

List<DatanodeDetails> datanodes = ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
randomDatanodeDetails());
Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));

// Setup mock for each datanode network calls needed for reconciliation.
try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
Mockito.mockStatic(ContainerProtocolCalls.class)) {
mockContainerProtocolCalls(containerProtocolMock, dnToContainerMap, checksumManager, kvHandler, containerID);

kvHandler.reconcileContainer(dnClient, containers.get(0), datanodes);
kvHandler.reconcileContainer(dnClient, containers.get(1), datanodes);
kvHandler.reconcileContainer(dnClient, containers.get(2), datanodes);

// After reconciliation, checksums should be the same for all containers.
// Reconciliation should have updated the tree based on the updated metadata that was obtained for the
// previously corrupted data. We do not need to wait for the full data scan to complete.
ContainerProtos.ContainerChecksumInfo prevContainerChecksumInfo = null;
for (KeyValueContainer kvContainer : containers) {
kvHandler.createContainerMerkleTreeFromMetadata(kvContainer);
Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
checksumManager.read(kvContainer.getContainerData());
assertTrue(containerChecksumInfo.isPresent());
long dataChecksum = containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
assertEquals(kvContainer.getContainerData().getDataChecksum(), dataChecksum);
if (prevContainerChecksumInfo != null) {
assertEquals(prevContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(), dataChecksum);
}
prevContainerChecksumInfo = containerChecksumInfo.get();
}
}
}

public void waitForContainerScans(List<KeyValueContainer> containers) throws Exception {
for (KeyValueContainer container: containers) {
// The on-demand scanner has been initialized to pull from the mock container set.
// Make it pull the corresponding container instance to scan in this run based on ID.
long containerID = container.getContainerData().getContainerID();
Mockito.doReturn(container).when(mockContainerSet).getContainer(containerID);

Optional<Future<?>> scanFuture = OnDemandContainerDataScanner.scanContainer(container);
assertTrue(scanFuture.isPresent());
// Wait for on-demand scan to complete.
scanFuture.get().get();
}
}

private void mockContainerProtocolCalls(MockedStatic<ContainerProtocolCalls> containerProtocolMock,
Map<String, KeyValueContainer> dnToContainerMap,
ContainerChecksumTreeManager checksumManager,
KeyValueHandler kvHandler,
long containerID) {
// Mock getContainerChecksumInfo
containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
.thenAnswer(inv -> {
XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
Pipeline pipeline = xceiverClientSpi.getPipeline();
assertEquals(1, pipeline.size());
DatanodeDetails dn = pipeline.getFirstNode();
KeyValueContainer container = dnToContainerMap.get(dn.getUuidString());
ByteString checksumInfo = checksumManager.getContainerChecksumInfo(container.getContainerData());
return ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
.setContainerID(containerID)
.setContainerChecksumInfo(checksumInfo)
.build();
});

// Mock getBlock
containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(), any(), any(), any(), anyMap()))
.thenAnswer(inv -> {
XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
Pipeline pipeline = xceiverClientSpi.getPipeline();
assertEquals(1, pipeline.size());
DatanodeDetails dn = pipeline.getFirstNode();
KeyValueContainer container = dnToContainerMap.get(dn.getUuidString());
ContainerProtos.BlockData blockData = kvHandler.getBlockManager().getBlock(container, inv.getArgument(2))
.getProtoBufMessage();
return ContainerProtos.GetBlockResponseProto.newBuilder()
.setBlockData(blockData)
.build();
});

// Mock readChunk
containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(), any(), any(), any(), any()))
.thenAnswer(inv -> {
XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
Pipeline pipeline = xceiverClientSpi.getPipeline();
assertEquals(1, pipeline.size());
DatanodeDetails dn = pipeline.getFirstNode();
KeyValueContainer container = dnToContainerMap.get(dn.getUuidString());
return createReadChunkResponse(inv, container, kvHandler);
});
}

// Helper method to create readChunk responses
private ContainerProtos.ReadChunkResponseProto createReadChunkResponse(InvocationOnMock inv,
KeyValueContainer container,
KeyValueHandler kvHandler) throws IOException {
ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2);
ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1);
return ContainerProtos.ReadChunkResponseProto.newBuilder()
.setBlockID(blockId)
.setChunkData(chunkInfo)
.setData(kvHandler.getChunkManager().readChunk(container, BlockID.getFromProtobuf(blockId),
ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString())
.build();
}

@Test
public void testGetContainerChecksumInfoOnInvalidContainerStates() {
when(handler.handleGetContainerChecksumInfo(any(), any())).thenCallRealMethod();
Expand Down Expand Up @@ -842,6 +674,7 @@ private static ContainerCommandRequestProto createContainerRequest(

private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
final ContainerSet containerSet = new ContainerSet(1000);

final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);

HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf)
Expand All @@ -859,165 +692,14 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException {
hddsVolume.getVolumeInfoStats().unregister();
hddsVolume.getVolumeIOStats().unregister();
ContainerMetrics.remove();
return kvHandler;
}

/**
* Creates a container with normal and deleted blocks.
* First it will insert normal blocks, and then it will insert
* deleted blocks.
*/
protected List<KeyValueContainer> createContainerWithBlocks(KeyValueHandler kvHandler, long containerId,
int blocks, int numContainerCopy)
throws Exception {
String strBlock = "block";
String strChunk = "chunkFile";
List<KeyValueContainer> containers = new ArrayList<>();
MutableVolumeSet volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, conf);
int bytesPerChecksum = 2 * (int) OzoneConsts.KB;
Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
bytesPerChecksum);
byte[] chunkData = RandomStringUtils.randomAscii(CHUNK_LEN).getBytes(UTF_8);
ChecksumData checksumData = checksum.computeChecksum(chunkData);

for (int j = 0; j < numContainerCopy; j++) {
KeyValueContainerData containerData = new KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK, (long) CHUNKS_PER_BLOCK * CHUNK_LEN * blocks,
UUID.randomUUID().toString(), UUID.randomUUID().toString());
Path kvContainerPath = Files.createDirectory(tempDir.resolve(containerId + "-" + j));
containerData.setMetadataPath(kvContainerPath.toString());
containerData.setDbFile(kvContainerPath.toFile());

KeyValueContainer container = new KeyValueContainer(containerData, conf);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(kvContainerPath.toFile()));
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID.randomUUID().toString());
assertNotNull(containerData.getChunksPath());
File chunksPath = new File(containerData.getChunksPath());
ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0, 0);

List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
for (int i = 0; i < blocks; i++) {
BlockID blockID = new BlockID(containerId, i);
BlockData blockData = new BlockData(blockID);

chunkList.clear();
for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++) {
String chunkName = strBlock + i + strChunk + chunkCount;
long offset = chunkCount * CHUNK_LEN;
ChunkInfo info = new ChunkInfo(chunkName, offset, CHUNK_LEN);
info.setChecksumData(checksumData);
chunkList.add(info.getProtoBufMessage());
kvHandler.getChunkManager().writeChunk(container, blockID, info,
ByteBuffer.wrap(chunkData), WRITE_STAGE);
}
kvHandler.getChunkManager().finishWriteChunks(container, blockData);
blockData.setChunks(chunkList);
blockData.setBlockCommitSequenceId(i);
kvHandler.getBlockManager().putBlock(container, blockData);
}

ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, blocks, (long) blocks * CHUNKS_PER_BLOCK);
container.markContainerForClose();
kvHandler.closeContainer(container);
containers.add(container);
}

return containers;
}

/**
* Introduce corruption in the container.
* 1. Delete blocks from the container.
* 2. Corrupt chunks at an offset.
* If revers is true, the blocks and chunks are deleted in reverse order.
*/
private void introduceCorruption(KeyValueHandler kvHandler, KeyValueContainer keyValueContainer, int numBlocks,
int numChunks, boolean reverse) throws IOException {
Random random = new Random();
KeyValueContainerData containerData = keyValueContainer.getContainerData();
// Simulate missing blocks
try (DBHandle handle = BlockUtils.getDB(containerData, conf);
BatchOperation batch = handle.getStore().getBatchHandler().initBatchOperation()) {
List<BlockData> blockDataList = kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
int size = blockDataList.size();
for (int i = 0; i < numBlocks; i++) {
BlockData blockData = reverse ? blockDataList.get(size - 1 - i) : blockDataList.get(i);
File blockFile = getBlock(keyValueContainer, blockData.getBlockID().getLocalID());
Assertions.assertTrue(blockFile.delete());
handle.getStore().getBlockDataTable().deleteWithBatch(batch, containerData.getBlockKey(blockData.getLocalID()));
}
handle.getStore().getBatchHandler().commitBatchOperation(batch);
}
// Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
// kvHandler.createContainerMerkleTreeFromMetadata(keyValueContainer);

// Corrupt chunks at an offset.
List<BlockData> blockDataList = kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100);
int size = blockDataList.size();
for (int i = 0; i < numChunks; i++) {
int blockIndex = reverse ? size - 1 - (i % size) : i % size;
BlockData blockData = blockDataList.get(blockIndex);
int chunkIndex = i / size;
File blockFile = getBlock(keyValueContainer, blockData.getBlockID().getLocalID());
List<ContainerProtos.ChunkInfo> chunks = new ArrayList<>(blockData.getChunks());
ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex);
corruptFileAtOffset(blockFile, (int) chunkInfo.getOffset(), (int) chunkInfo.getLen());

// TODO: On-demand scanner (HDDS-10374) should detect this corruption and generate container merkle tree.
// ContainerProtos.ContainerChecksumInfo.Builder builder = kvHandler.getChecksumManager()
// .read(containerData).get().toBuilder();
// List<ContainerProtos.BlockMerkleTree> blockMerkleTreeList = builder.getContainerMerkleTree()
// .getBlockMerkleTreeList();
// assertEquals(size, blockMerkleTreeList.size());

// builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree();
// for (int j = 0; j < blockMerkleTreeList.size(); j++) {
// ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = blockMerkleTreeList.get(j).toBuilder();
// if (j == blockIndex) {
// List<ContainerProtos.ChunkMerkleTree.Builder> chunkMerkleTreeBuilderList =
// blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList();
// chunkMerkleTreeBuilderList.get(chunkIndex).setIsHealthy(false).setDataChecksum(random.nextLong());
// blockMerkleTreeBuilder.setDataChecksum(random.nextLong());
// }
// builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build());
// }
// builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong());
// Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath());
// writeContainerDataTreeProto(keyValueContainer.getContainerData(), builder.getContainerMerkleTree());
}
}

/**
* Overwrite the file with random bytes at an offset within the given length.
*/
public static void corruptFileAtOffset(File file, int offset, int chunkLength) {
try {
final int fileLength = (int) file.length();
assertTrue(fileLength >= offset + chunkLength);
final int chunkEnd = offset + chunkLength;

Path path = file.toPath();
final byte[] original = IOUtils.readFully(Files.newInputStream(path), fileLength);

// Corrupt the last byte and middle bytes of the block. The scanner should log this as two errors.
final byte[] corruptedBytes = Arrays.copyOf(original, fileLength);
corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1);
final long chunkMid = offset + ((long) chunkLength - offset) / 2;
corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) (chunkMid / 2)] << 1);


Files.write(path, corruptedBytes,
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
// Register the on-demand container scanner with the container set used by the KeyValueHandler.
ContainerController controller = new ContainerController(containerSet,
Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler));
OnDemandContainerDataScanner onDemandScanner = new OnDemandContainerDataScanner(
conf.getObject(ContainerScannerConfiguration.class), controller, kvHandler.getChecksumManager());
containerSet.registerContainerScanHandler(onDemandScanner::scanContainer);

assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength))
.isEqualTo(corruptedBytes)
.isNotEqualTo(original);
} catch (IOException ex) {
// Fail the test.
throw new UncheckedIOException(ex);
}
return kvHandler;
}
}
Loading