Skip to content

HDDS-12209. Make replica verification concurrent #8363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ptlrs
Copy link
Contributor

@ptlrs ptlrs commented Apr 30, 2025

Please describe your PR in detail:

This PR is built on top of #8248 which provides the new JSON output format. It will be merged only after #8248 is merged to apply the new concurrency related changes.

This PR:

  1. Performs key verification in an async manner
  2. Creates a KeyVerificationResult class to hold the verification results
  3. Uses a SequenceWriter to write the JSON output incrementally

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-12209

How was this patch tested?

Tested manually
CI: https://github.com/ptlrs/ozone/actions/runs/14747349135

@adoroszlai
Copy link
Contributor

Please wait for clean CI run in fork before opening PR.

Also: "This branch has conflicts that must be resolved"

@ptlrs
Copy link
Contributor Author

ptlrs commented Apr 30, 2025

Hi @adoroszlai, this PR is dependent on #8248. I will fix any conflicts once that PR is merged.

@errose28 errose28 added the tools Tools that helps with debugging label May 9, 2025
@ptlrs ptlrs force-pushed the HDDS-12209-Make-replica-verification-concurrent branch from df4fed1 to 566dc89 Compare May 15, 2025 17:56
Copy link
Contributor

@errose28 errose28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @ptlrs. I've reviewed everything except the json generation. We have at least basic acceptance tests for this already correct?

Comment on lines 116 to 117
verificationExecutor = Executors.newFixedThreadPool(threadCount);
writerExecutor = Executors.newSingleThreadExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These executors work from an unbounded queue which could blow up memory if verifiers are running much slower than listing. We need to create back pressure on the listing thread in this case. For the verifier thread pool we can probably bound the queue at a multiple of the consuming thread pool size to create some buffer if verifiers are finishing in a bursty manner. The writer thread should always run the fastest and could probably get away without bounding the queue, but since we may be working with millions of objects we should bound it to be safe.

Here I've suggested 2*threadCount as the bound for the verifier executor so it scales with thread count while leaving a reasonable buffer to prefetch listings so the verifier threads always have work. For the writer I've suggested bounding the queue at threadCount so that all verifier threads can potentially flush results simultaneously with minimal blocking (the queue push operation is still synchronized). The writer is expected to get through all of these serially before the next round of verifiers finish since it is the only stage that does not need to do network IO. I've suggested LinkedBlockingQueue over ArrayBlockingQueue since the linked list implementation does not share a lock between the push and pop operations required by the producer and consumer.

This initial suggestion is based more on intuition, and in the future we can benchmark this and make modifications as necessary.

Suggested change
verificationExecutor = Executors.newFixedThreadPool(threadCount);
writerExecutor = Executors.newSingleThreadExecutor();
verificationExecutor = new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2 * threadCount), new ThreadPoolExecutor.CallerRunsPolicy());
writerExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadCount), new ThreadPoolExecutor.CallerRunsPolicy());


private ExecutorService verificationExecutor;
private ExecutorService writerExecutor;
private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not technically required in this context but static ThreadLocal is usually used to prevent confusion with multiple instances created by the same thread.

Suggested change
private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
private static ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe a non-static approach would be simpler here. We wouldn't want to share the ThreadLocal verifiers across instances if the replicas-verify command is executed in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parallel invocation at the command line, each command would be a separate process with a separate JVM and the static instance would not be shared. I don't think we need to worry about a CLI entrypoint having multiple instances invoked in parallel within the same process, so either option should work fine. However adding static conforms with best practices and example defined in the ThreadLocal javadoc.

ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

// Wait for all tasks to complete
verificationExecutor.awaitTermination(1, TimeUnit.DAYS);
writerExecutor.awaitTermination(1, TimeUnit.DAYS);
threadLocalVerifiers.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread invoking this never created any thread local instances so it won't have any effect. I don't think we need to call this explicitly and can just let the thread local instances get gc'ed when the threads terminate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, although the remove() was added to satisfy sonarqube


try {
createOutputDirectory();
findCandidateKeys(client, address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make findCandidateKeys not throw any checked exceptions and just continue reading keys if there are errors. Right now if there's one key that fails to read the whole thing will fail. After removing the output dir creation as suggested above this inner try/finally can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally clause is there to ensure clean up resources.

OmKeyInfo keyInfo =
ozoneClient.getProxy().getKeyInfo(volumeName, bucketName, keyName, false);

List<KeyVerificationResult.BlockVerificationData> blockResults = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need all these inner classes. KeyVerificationResult can be constructed from just a volume, bucket, and key. It can have an add method that takes a the check type, BlockVerificationResult, and DatanodeDetails and maintain this list internally. The key's pass boolean can be updated on the fly with each block addition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow this. The inner classes are there to generate the expected JSON pattern by mapping the structure of the class via:

ObjectNode keyNode = OBJECT_MAPPER.convertValue(keyResult, ObjectNode.class);

* Class to hold the verification results for a key.
*/
public class KeyVerificationResult {
private final boolean keyPass;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is in the json output currently but we probably want a keyCompleted field as well which would map to the AND of all block verifier's completed statuses. This is used to indicate that a check failed without actually being able to run the check (like datanode not reachable). We would also set this to false if the getKeyInfo call fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tools Tools that helps with debugging
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants