Skip to content

Infrastructure for assuming cluster features in the next major version #118143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ org.elasticsearch.cluster.ClusterState#compatibilityVersions()

@defaultMessage ClusterFeatures#nodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
@defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature)

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118143
summary: Infrastructure for assuming cluster features in the next major version
area: "Infra/Core"
type: feature
issues: []
56 changes: 45 additions & 11 deletions server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.xcontent.ToXContent;

Expand Down Expand Up @@ -79,28 +80,61 @@ public Map<String, Set<String>> nodeFeatures() {
return nodeFeatures;
}

/**
* The features in all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
public Set<String> allNodeFeatures() {
private Set<String> allNodeFeatures() {
if (allNodeFeatures == null) {
allNodeFeatures = Set.copyOf(calculateAllNodeFeatures(nodeFeatures.values()));
}
return allNodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return node.getBuildVersion().canRemoveAssumedFeatures();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled over remove in canRemoveAssumedFeatures, that was confusing to me and I kept looking for where we remove those. Maybe turning it around makes it easier to understand: canAssumeRemovedFeatures

Copy link
Member Author

@thecoop thecoop Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not quite right though, its not that all features that are missing can be assumed, it's that the features that this ES has marked as assumed can be taken as being met in a node with this version, regardless of whether they're published by the node or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right 👍
If we can find something better than remove, it'd be great, but certainly not worth delaying this further.
canInferAssumedFeatures, canExpectAssumedFeatures, canSkipAssumedFeatures
Or more targeting the versioning side of this: hasPassedNextCompatiblityBoundary or similar.

}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures());
}

/**
* {@code true} if {@code feature} is present on all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
@SuppressForbidden(reason = "directly reading cluster features")
public boolean clusterHasFeature(NodeFeature feature) {
return allNodeFeatures().contains(feature.id());
public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) {
assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
: "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();

// basic case
boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
if (allNodesHaveFeature) {
return true;
}

// if the feature is assumed, check the versions more closely
// it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
// TODO: do we need some kind of transient cache of this calculation?
if (feature.assumedAfterNextCompatibilityBoundary()) {
for (var nf : nodeFeatures.entrySet()) {
if (nf.getValue().contains(feature.id()) == false
&& featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) {
return false;
}
}

// all nodes missing the feature can assume it - so that's alright then
return true;
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state
Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures);

assert nodesBuilder.isLocalNodeElectedMaster();

Expand Down Expand Up @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());

nodesBuilder.add(node);
compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
// store the actual node features here, not including assumed features, as this is persisted in cluster state
nodeFeatures.put(node.getId(), features);
allNodesFeatures.retainAll(features);
effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures);

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers
}
}

/**
* Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster,
* that are also present across the whole cluster as a result.
*/
private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
if (featureService.featuresCanBeAssumedForNodes(nodes)) {
Set<String> assumedFeatures = featureService.getNodeFeatures()
.values()
.stream()
.filter(NodeFeature::assumedAfterNextCompatibilityBoundary)
.map(NodeFeature::id)
.collect(Collectors.toSet());

// add all assumed features to the featureset of all nodes of the next major version
nodeFeatures = new HashMap<>(nodeFeatures);
for (var node : nodes.getNodes().entrySet()) {
if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features";
nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> {
var newFeatures = new HashSet<>(v);
return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
});
}
}
}

return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
}

/**
* Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
Expand Down Expand Up @@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC
}
}

private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
/**
* Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
*
* @return The set of features that this node has (including assumed features)
*/
private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) {
// prevent join if it does not have one or more features that all other nodes have
Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures);
missingFeatures.removeAll(newNodeFeatures);

if (missingFeatures.isEmpty() == false) {
throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
if (missingFeatures.isEmpty()) {
// nothing missing - all ok
return newNodeFeatures;
}

if (featureService.featuresCanBeAssumedForNode(node)) {
// it might still be ok for this node to join if this node can have assumed features,
// and all the missing features are assumed
// we can get the NodeFeature object direct from this node's registered features
// as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
newNodeFeatures = new HashSet<>(newNodeFeatures);
for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
String feature = it.next();
NodeFeature nf = featureService.getNodeFeatures().get(feature);
if (nf.assumedAfterNextCompatibilityBoundary()) {
// its ok for this feature to be missing from this node
it.remove();
// and it should be assumed to still be in the cluster
newNodeFeatures.add(feature);
}
// even if we don't remove it, still continue, so the exception message below is accurate
}
}

if (missingFeatures.isEmpty()) {
return newNodeFeatures;
} else {
throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/env/BuildVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
*/
public abstract class BuildVersion implements ToXContentFragment, Writeable {

/**
* Checks if this version can operate properly in a cluster without features
* that are assumed in the currently running Elasticsearch.
*/
public abstract boolean canRemoveAssumedFeatures();

/**
* Check whether this version is on or after a minimum threshold.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ final class DefaultBuildVersion extends BuildVersion {
this(in.readVInt());
}

@Override
public boolean canRemoveAssumedFeatures() {
/*
* We can remove assumed features if the node version is the next major version.
* This is because the next major version can only form a cluster with the
* latest minor version of the previous major, so any features introduced before that point
* (that are marked as assumed in the running code version) are automatically met by that version.
*/
return version.major == Version.CURRENT.major + 1;
}

@Override
public boolean onOrAfterMinimumCompatible() {
return Version.CURRENT.minimumCompatibilityVersion().onOrBefore(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

package org.elasticsearch.features;

import org.elasticsearch.cluster.ClusterFeatures;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -38,9 +41,7 @@ public class FeatureService {
* as the local node's supported feature set
*/
public FeatureService(List<? extends FeatureSpecification> specs) {

var featureData = FeatureData.createFromSpecifications(specs);
nodeFeatures = featureData.getNodeFeatures();
this.nodeFeatures = FeatureData.createFromSpecifications(specs).getNodeFeatures();

logger.info("Registered local node features {}", nodeFeatures.keySet().stream().sorted().toList());
}
Expand All @@ -53,11 +54,25 @@ public Map<String, NodeFeature> getNodeFeatures() {
return nodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
*/
public boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return ClusterFeatures.featuresCanBeAssumedForNode(node);
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
*/
public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return ClusterFeatures.featuresCanBeAssumedForNodes(nodes);
}

/**
* Returns {@code true} if all nodes in {@code state} support feature {@code feature}.
*/
@SuppressForbidden(reason = "We need basic feature information from cluster state")
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
return state.clusterFeatures().clusterHasFeature(feature);
return state.clusterFeatures().clusterHasFeature(state.nodes(), feature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
* A feature published by a node.
*
* @param id The feature id. Must be unique in the node.
* @param assumedAfterNextCompatibilityBoundary
* {@code true} if this feature is removed at the next compatibility boundary (ie next major version),
* and so should be assumed to be true for all nodes after that boundary.
*/
public record NodeFeature(String id) {
public record NodeFeature(String id, boolean assumedAfterNextCompatibilityBoundary) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming might need a bit of work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggle coming up with something better... assumePresenceAfterNextCompatibilityBoundary, but that's even harder to read.... assumedOnNextMajor maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though next major is not correct in the context of serverless...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly, it was assumedOnNextMajor, but it's always true for serverless, so...


public NodeFeature {
Objects.requireNonNull(id);
}

public NodeFeature(String id) {
this(id, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ protected boolean areFileSettingsApplied(ClusterState clusterState) {
}

@SuppressForbidden(reason = "need to check file settings support on exact cluster state")
private static boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
private boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
}

private void setReady(boolean ready) {
Expand Down
Loading