Skip to content

Reindex data stream indices on different nodes #125171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Prev Previous commit
Next Next commit
Adding a comment
  • Loading branch information
masseyke committed Mar 18, 2025
commit 8eb2144848894cd9a6897c94cb4d290c1e42e29f
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
private final ClusterService clusterService;
private final Client client;
private final TransportService transportService;
private final AtomicInteger ingestNodeGenerator = new AtomicInteger(Randomness.get().nextInt());
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt());

@Inject
public ReindexDataStreamIndexTransportAction(
Expand Down Expand Up @@ -314,11 +314,16 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
listener.onResponse(bulkByScrollResponse);
}
}, listener::onFailure);
/*
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
* data stream has many indices, this can improve performance a good bit.
*/
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
if (ingestNodes.length == 0) {
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
} else {
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeGenerator.incrementAndGet(), ingestNodes.length)];
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
Copy link
Contributor

@lukewhiting lukewhiting Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the API's for Math.floorMod and AtomicInteger, I think this line should correctly handle the case where the AtomicInteger overflows and the dividend becomes negative but is it worth adding an test for that case to future proof or are the current tests OK to handle it passively?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried it out, and it is a minor bug -- Math.floorMod(Integer.MAX_VALUE, 17) is equal to Math.floorMod(Integer.MAX_VALUE + 1, 17). So if the test were to start with a value near Integer.MAX_VALUE it would fail (although I'm not too worried about the round-robin repeating a node twice in that very rare situation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the maximum initial value of that random number to be much smaller so that it never exceeds MAX_VALUE and the test will never fail. I think the actual behavior (choosing the same node twice once every 4.3 billion times) is harmless, and not worth the complexity of fixing.

logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
Expand Down