Skip to content

HDDS-12209. Make replica verification concurrent #8363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
HDDS-12715. fix review comments
  • Loading branch information
ptlrs committed May 20, 2025
commit ad2c3d1b68d311e3db7937059ce41662f9da847d
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -104,17 +101,21 @@ static class Verification {
private ExecutorService verificationExecutor;
private ExecutorService writerExecutor;
private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not technically required in this context but static ThreadLocal is usually used to prevent confusion with multiple instances created by the same thread.

Suggested change
private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
private static ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe a non-static approach would be simpler here. We wouldn't want to share the ThreadLocal verifiers across instances if the replicas-verify command is executed in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parallel invocation at the command line, each command would be a separate process with a separate JVM and the static instance would not be shared. I don't think we need to worry about a CLI entrypoint having multiple instances invoked in parallel within the same process, so either option should work fine. However adding static conforms with best practices and example defined in the ThreadLocal javadoc.

ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

private List<CompletableFuture<Void>> allFutures;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(SerializationFeature.INDENT_OUTPUT, true);
private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();

@Override
protected void execute(OzoneClient client, OzoneAddress address) throws IOException {
allFutures = new ArrayList<>();
verificationExecutor = Executors.newFixedThreadPool(threadCount);
writerExecutor = Executors.newSingleThreadExecutor();
if (threadCount < 1 || threadCount > 100) {
LOG.error("Thread count must be between 1 and 100");
return;
}
verificationExecutor = new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2 * threadCount), new ThreadPoolExecutor.CallerRunsPolicy());
writerExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadCount), new ThreadPoolExecutor.CallerRunsPolicy());
threadLocalVerifiers = ThreadLocal.withInitial(() -> {
List<ReplicaVerifier> verifiers = new ArrayList<>();
try {
Expand All @@ -132,50 +133,32 @@ protected void execute(OzoneClient client, OzoneAddress address) throws IOExcept
return verifiers;
});

try {
createOutputDirectory();
findCandidateKeys(client, address);
} finally {
verificationExecutor.shutdown();
writerExecutor.shutdown();

try {
// Wait for all tasks to complete
verificationExecutor.awaitTermination(1, TimeUnit.DAYS);
writerExecutor.awaitTermination(1, TimeUnit.DAYS);
threadLocalVerifiers.remove();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while waiting for verification to complete", e);
}
}
findCandidateKeys(client, address);
}

@Override
protected OzoneAddress getAddress() throws OzoneClientException {
return new OzoneAddress(uri);
}

void findCandidateKeys(OzoneClient ozoneClient, OzoneAddress address) throws IOException {
void findCandidateKeys(OzoneClient ozoneClient, OzoneAddress address) {
ObjectStore objectStore = ozoneClient.getObjectStore();
String volumeName = address.getVolumeName();
String bucketName = address.getBucketName();
String keyName = address.getKeyName();

AtomicBoolean allKeysPassed = new AtomicBoolean(true);
File outputFile = new File(outputDir, "replicas-verify-result.json");

try (OutputStream outputStream = Files.newOutputStream(outputFile.toPath());
JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8)) {
try (JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(System.out, JsonEncoding.UTF8)) {
// open json
jsonGenerator.useDefaultPrettyPrinter();
jsonGenerator.writeStartObject();
jsonGenerator.writeFieldName("keys");
jsonGenerator.writeStartArray();
jsonGenerator.flush();

// Process keys based on the provided address
try (SequenceWriter sequenceWriter = createSequenceWriter(false, jsonGenerator)) {
// Process keys based on the provided address
if (!keyName.isEmpty()) {
processKey(ozoneClient, volumeName, bucketName, keyName, sequenceWriter, allKeysPassed);
} else if (!bucketName.isEmpty()) {
Expand All @@ -190,65 +173,73 @@ void findCandidateKeys(OzoneClient ozoneClient, OzoneAddress address) throws IOE
checkVolume(ozoneClient, it.next(), sequenceWriter, allKeysPassed);
}
}

// Wait for all futures to complete
CompletableFuture<Void> allOf = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
} catch (IOException e) {
LOG.error("Error while verifying keys", e);
} finally {
verificationExecutor.shutdown();
writerExecutor.shutdown();
try {
allOf.join();
} catch (Exception e) {
LOG.error("Error during verification", e);
// Wait for all tasks to complete
if (!verificationExecutor.awaitTermination(1, TimeUnit.DAYS) ||
!writerExecutor.awaitTermination(1, TimeUnit.DAYS)) {
LOG.warn("Failed to wait for all tasks to complete");
}
threadLocalVerifiers.remove();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for verification to complete", e);
}
}

// close json
try {
jsonGenerator.writeEndArray();
jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
jsonGenerator.writeEndObject();
} catch (Exception e) {
LOG.error("Exception in closing the JSON structure", e);
}
jsonGenerator.writeEndArray();
jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
jsonGenerator.writeEndObject();
} catch (IOException e) {
LOG.error("Error generating verification result", e);
}
}

void checkVolume(OzoneClient ozoneClient, OzoneVolume volume,
SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws IOException {
SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) {
for (Iterator<? extends OzoneBucket> it = volume.listBuckets(null); it.hasNext();) {
OzoneBucket bucket = it.next();
checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
}
}

void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket,
SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws IOException {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null); it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), sequenceWriter, allKeysPassed);
SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) {
try {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null); it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), sequenceWriter, allKeysPassed);
}
}
} catch (IOException e) {
LOG.error("Error processing bucket {}/{}", bucket.getVolumeName(), bucket.getName(), e);
}
}

private void processKey(OzoneClient ozoneClient, String volumeName, String bucketName,
String keyName, SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() ->
verifyKey(ozoneClient, volumeName, bucketName, keyName), verificationExecutor)
.handleAsync((keyResult, throwable) -> {
if (throwable != null) {
LOG.error("Error verifying key: {}/{}/{}", volumeName, bucketName, keyName, throwable);
return new KeyVerificationResult(volumeName, bucketName, keyName, new ArrayList<>(), false);
}
return keyResult;
}, verificationExecutor)
.thenAcceptAsync(keyResult ->
writeVerificationResult(sequenceWriter, allKeysPassed, keyResult), writerExecutor);
allFutures.add(future);
OmKeyInfo keyInfo;
try {
keyInfo = ozoneClient.getProxy().getKeyInfo(volumeName, bucketName, keyName, false);
} catch (IOException e) {
LOG.error("Error processing key {}/{}/{}", volumeName, bucketName, keyName, e);
return;
}

CompletableFuture.supplyAsync(() -> verifyKey(volumeName, bucketName, keyName, keyInfo), verificationExecutor)
.thenAcceptAsync(keyResult ->
writeVerificationResult(sequenceWriter, allKeysPassed, keyResult), writerExecutor);
}

private void writeVerificationResult(SequenceWriter sequenceWriter,
private synchronized void writeVerificationResult(SequenceWriter sequenceWriter,
AtomicBoolean allKeysPassed, KeyVerificationResult keyResult) {
try {
allKeysPassed.compareAndSet(true, keyResult.isKeyPass());
Expand All @@ -262,78 +253,59 @@ private void writeVerificationResult(SequenceWriter sequenceWriter,
}
}

private KeyVerificationResult verifyKey(OzoneClient ozoneClient, String volumeName,
String bucketName, String keyName) {
try {
boolean keyPass = true;
OmKeyInfo keyInfo =
ozoneClient.getProxy().getKeyInfo(volumeName, bucketName, keyName, false);

List<KeyVerificationResult.BlockVerificationData> blockResults = new ArrayList<>();
List<ReplicaVerifier> localVerifiers = threadLocalVerifiers.get();

for (OmKeyLocationInfo keyLocation : keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
long containerID = keyLocation.getContainerID();
long localID = keyLocation.getLocalID();

List<KeyVerificationResult.ReplicaVerificationData> replicaResults = new ArrayList<>();
boolean blockPass = true;

for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
List<KeyVerificationResult.CheckData> checkResults = new ArrayList<>();
boolean replicaPass = true;
int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);

for (ReplicaVerifier verifier : localVerifiers) {
BlockVerificationResult result = verifier.verifyBlock(datanode, keyLocation, replicaIndex);
KeyVerificationResult.CheckData checkResult = new KeyVerificationResult.CheckData(verifier.getType(),
result.isCompleted(), result.passed(), result.getFailures());
checkResults.add(checkResult);

if (!result.passed()) {
replicaPass = false;
}
}
private KeyVerificationResult verifyKey(String volumeName, String bucketName, String keyName, OmKeyInfo keyInfo) {
boolean keyPass = true;
List<KeyVerificationResult.BlockVerificationData> blockResults = new ArrayList<>();
List<ReplicaVerifier> localVerifiers = threadLocalVerifiers.get();

for (OmKeyLocationInfo keyLocation : Objects.requireNonNull(keyInfo.getLatestVersionLocations())
.getBlocksLatestVersionOnly()) {
long containerID = keyLocation.getContainerID();
long localID = keyLocation.getLocalID();

List<KeyVerificationResult.ReplicaVerificationData> replicaResults = new ArrayList<>();
boolean blockPass = true;

KeyVerificationResult.ReplicaVerificationData replicaResult =
new KeyVerificationResult.ReplicaVerificationData(datanode, replicaIndex, checkResults, replicaPass);
replicaResults.add(replicaResult);
for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
List<KeyVerificationResult.CheckData> checkResults = new ArrayList<>();
boolean replicaPass = true;
int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);

if (!replicaPass) {
blockPass = false;
for (ReplicaVerifier verifier : localVerifiers) {
BlockVerificationResult result = verifier.verifyBlock(datanode, keyLocation, replicaIndex);
KeyVerificationResult.CheckData checkResult = new KeyVerificationResult.CheckData(verifier.getType(),
result.isCompleted(), result.passed(), result.getFailures());
checkResults.add(checkResult);

if (!result.passed()) {
replicaPass = false;
}
}

KeyVerificationResult.BlockVerificationData blockResult =
new KeyVerificationResult.BlockVerificationData(containerID, localID, replicaResults, blockPass);
blockResults.add(blockResult);
KeyVerificationResult.ReplicaVerificationData replicaResult =
new KeyVerificationResult.ReplicaVerificationData(datanode, replicaIndex, checkResults, replicaPass);
replicaResults.add(replicaResult);

if (!blockPass) {
keyPass = false;
if (!replicaPass) {
blockPass = false;
}
}

return new KeyVerificationResult(volumeName, bucketName, keyName, blockResults, keyPass);
} catch (IOException e) {
throw new CompletionException(e);
KeyVerificationResult.BlockVerificationData blockResult =
new KeyVerificationResult.BlockVerificationData(containerID, localID, replicaResults, blockPass);
blockResults.add(blockResult);

if (!blockPass) {
keyPass = false;
}
}

return new KeyVerificationResult(volumeName, bucketName, keyName, blockResults, keyPass);
}

private SequenceWriter createSequenceWriter(boolean doWrapinArray, JsonGenerator jsonGenerator) throws IOException {
SequenceWriter sequenceWriter = WRITER.writeValues(jsonGenerator);
sequenceWriter.init(doWrapinArray);
return sequenceWriter;
}

private void createOutputDirectory() throws IOException {
Path path = Paths.get(outputDir);
if (!Files.exists(path)) {
try {
Files.createDirectories(path);
System.out.println("Successfully created directory: " + path.toAbsolutePath());
} catch (IOException e) {
throw new IOException(String.format("Failed to create directory %s: %s", path, e.getMessage()));
}
}
}
}