Skip to content

Trigger merges after recovery #113102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

DaveCTurner
Copy link
Contributor

We may have shut a shard down while merges were still pending (or
adjusted the merge policy while the shard was down) meaning that after
recovery its segments do not reflect the desired state according to the
merge policy. With this commit we invoke IndexWriter#maybeMerge() at
the end of recovery to check for, and execute, any such lost merges.

We may have shut a shard down while merges were still pending (or
adjusted the merge policy while the shard was down) meaning that after
recovery its segments do not reflect the desired state according to the
merge policy. With this commit we invoke `IndexWriter#maybeMerge()` at
the end of recovery to check for, and execute, any such lost merges.
@DaveCTurner DaveCTurner added >enhancement :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. v9.0.0 labels Sep 18, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@elasticsearchmachine elasticsearchmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Sep 18, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @DaveCTurner, I've created a changelog YAML for you.

Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left two comments.

return recoveryListener;
}

logger.trace(Strings.format("wrapping listener for post-recovery merge of [%s]", shardId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace(Strings.format("wrapping listener for post-recovery merge of [%s]", shardId));
logger.trace(() -> Strings.format("wrapping listener for post-recovery merge of [%s]", shardId));

(same remark for other log traces)

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, one question on how to ensure a flush happens.


ensureGreen(indexName);
assertBusy(() -> {
refresh(indexName); // pick up the result of any merges
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to let afterMerge set active to true to ensure that we get a flush after a merge always? I think otherwise we risk a flush happening too early, before the merge completes (especially if the processing is lagging due to running on one thread) and then no more flush after the merge.

And if we do that, we should probably change this to call flushOnIdle instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure that we get a flush after a merge always?

That sounds sensible to me but I'm not sure if it could have any bad consequences. Why don't we do it already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we already flush just before snapshotting, and before relocating the shard in stateless ES. Not saying that we shouldn't try and get this to flush in flushOnIdle too, but maybe we can think about that in a follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that helps. I also now realize that the check here:

&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {

is more than likely to kick in if the active->inactive flush already occurred, making this more of a benign race condition problem than something that would repeatedly happen.

I wonder if it was better to set indices.memory.shard_inactive_time=0 in this test and then avoid the refresh here, demonstrating that it will indeed flush after the merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure yes that makes sense.

Copy link
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about the throttling behaviour. I'd appreciate if you could help me understand it better. Thanks!

return;
}

indexShard.triggerPendingMerges();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ultimately calls IndexWriter#maybeMerge which kicks off actual merges with "Lucene Merge Thread". It does not wait for the merges to complete. So my understanding is that we can kick off merges for multiple shards even with the throttled task runner of a single thread? Did I miss something, or maybe this is intended?

Copy link
Contributor Author

@DaveCTurner DaveCTurner Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's intended, see comment on ES-9313. I copied this info into a code comment in e70887c.

@@ -1514,6 +1514,13 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), forceMerge.onlyExpungeDeletes(), forceMerge.forceMergeUUID());
}

public void triggerPendingMerges() throws IOException {
switch (state /* single volatile read */) {
case STARTED, POST_RECOVERY -> getEngine().forceMerge(false, ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS, false, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we add a comment about passing flush=false and null for the uuid? My understanding is that they do not make sense when the downstream code calls IndexWriter#maybeMerge. But it is not obvious from here.

Also for my knowledge: Since we don't flush actively, I guess it relies on flushes triggered by other mechanisms, such as scheduled refresh and indexing disk and memory controllers and maybe other things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments in c07131b.

Henning and I are still discussing how to ensure we flush the result of the merge.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -1600,7 +1600,7 @@ protected final BroadcastResponse flush(String... indices) {
*/
protected BroadcastResponse forceMerge() {
waitForRelocation();
BroadcastResponse actionGet = indicesAdmin().prepareForceMerge().setMaxNumSegments(1).get();
BroadcastResponse actionGet = indicesAdmin().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is the default anyway, not sure why this change is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry that was leftover from debugging the DiskThresholdDeciderIT failure, not necessary indeed. I'll remove it.

@DaveCTurner DaveCTurner added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Sep 20, 2024
@DaveCTurner DaveCTurner merged commit 33a73a8 into elastic:main Sep 20, 2024
15 checks passed
@DaveCTurner DaveCTurner deleted the 2024/09/18/trigger-merge-after-recovery branch September 20, 2024 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants