Skip to content

Commit efa3f60

Browse files
nicktindallgeorgewallace
authored andcommitted
Only publish desired balance gauges on master (elastic#115383)
Closes ES-9834
1 parent 6eb7f5e commit efa3f60

File tree

7 files changed

+326
-64
lines changed

7 files changed

+326
-64
lines changed

docs/changelog/115383.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115383
2+
summary: Only publish desired balance gauges on master
3+
area: Allocation
4+
type: enhancement
5+
issues: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.common.util.CollectionUtils;
13+
import org.elasticsearch.plugins.Plugin;
14+
import org.elasticsearch.plugins.PluginsService;
15+
import org.elasticsearch.telemetry.TestTelemetryPlugin;
16+
import org.elasticsearch.test.ESIntegTestCase;
17+
import org.hamcrest.Matcher;
18+
19+
import java.util.Collection;
20+
21+
import static org.hamcrest.Matchers.empty;
22+
import static org.hamcrest.Matchers.not;
23+
24+
public class DesiredBalanceReconcilerMetricsIT extends ESIntegTestCase {
25+
26+
@Override
27+
protected Collection<Class<? extends Plugin>> nodePlugins() {
28+
return CollectionUtils.appendToCopy(super.nodePlugins(), TestTelemetryPlugin.class);
29+
}
30+
31+
public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() throws Exception {
32+
internalCluster().ensureAtLeastNumDataNodes(2);
33+
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
34+
ensureGreen();
35+
36+
assertOnlyMasterIsPublishingMetrics();
37+
38+
// fail over and check again
39+
int numFailOvers = randomIntBetween(1, 3);
40+
for (int i = 0; i < numFailOvers; i++) {
41+
internalCluster().restartNode(internalCluster().getMasterName());
42+
ensureGreen();
43+
44+
assertOnlyMasterIsPublishingMetrics();
45+
}
46+
}
47+
48+
private static void assertOnlyMasterIsPublishingMetrics() {
49+
String masterNodeName = internalCluster().getMasterName();
50+
String[] nodeNames = internalCluster().getNodeNames();
51+
for (String nodeName : nodeNames) {
52+
assertMetricsAreBeingPublished(nodeName, nodeName.equals(masterNodeName));
53+
}
54+
}
55+
56+
private static void assertMetricsAreBeingPublished(String nodeName, boolean shouldBePublishing) {
57+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
58+
.filterPlugins(TestTelemetryPlugin.class)
59+
.findFirst()
60+
.orElseThrow();
61+
testTelemetryPlugin.resetMeter();
62+
testTelemetryPlugin.collect();
63+
Matcher<Collection<?>> matcher = shouldBePublishing ? not(empty()) : empty();
64+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNASSIGNED_SHARDS_METRIC_NAME), matcher);
65+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.TOTAL_SHARDS_METRIC_NAME), matcher);
66+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_COUNT_METRIC_NAME), matcher);
67+
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_RATIO_METRIC_NAME), matcher);
68+
}
69+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
13+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
14+
import org.elasticsearch.telemetry.metric.MeterRegistry;
15+
16+
import java.util.List;
17+
18+
public class DesiredBalanceMetrics {
19+
20+
public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP);
21+
public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current";
22+
public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current";
23+
public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current";
24+
public static final String UNDESIRED_ALLOCATION_RATIO_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.ratio";
25+
26+
private volatile boolean nodeIsMaster = false;
27+
28+
/**
29+
* Number of unassigned shards during last reconciliation
30+
*/
31+
private volatile long unassignedShards;
32+
/**
33+
* Total number of assigned shards during last reconciliation
34+
*/
35+
private volatile long totalAllocations;
36+
/**
37+
* Number of assigned shards during last reconciliation that are not allocated on desired node and need to be moved
38+
*/
39+
private volatile long undesiredAllocations;
40+
41+
public void updateMetrics(long unassignedShards, long totalAllocations, long undesiredAllocations) {
42+
this.unassignedShards = unassignedShards;
43+
this.totalAllocations = totalAllocations;
44+
this.undesiredAllocations = undesiredAllocations;
45+
}
46+
47+
public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
48+
meterRegistry.registerLongsGauge(
49+
UNASSIGNED_SHARDS_METRIC_NAME,
50+
"Current number of unassigned shards",
51+
"{shard}",
52+
this::getUnassignedShardsMetrics
53+
);
54+
meterRegistry.registerLongsGauge(TOTAL_SHARDS_METRIC_NAME, "Total number of shards", "{shard}", this::getTotalAllocationsMetrics);
55+
meterRegistry.registerLongsGauge(
56+
UNDESIRED_ALLOCATION_COUNT_METRIC_NAME,
57+
"Total number of shards allocated on undesired nodes excluding shutting down nodes",
58+
"{shard}",
59+
this::getUndesiredAllocationsMetrics
60+
);
61+
meterRegistry.registerDoublesGauge(
62+
UNDESIRED_ALLOCATION_RATIO_METRIC_NAME,
63+
"Ratio of undesired allocations to shard count excluding shutting down nodes",
64+
"1",
65+
this::getUndesiredAllocationsRatioMetrics
66+
);
67+
}
68+
69+
public void setNodeIsMaster(boolean nodeIsMaster) {
70+
this.nodeIsMaster = nodeIsMaster;
71+
}
72+
73+
public long unassignedShards() {
74+
return unassignedShards;
75+
}
76+
77+
public long totalAllocations() {
78+
return totalAllocations;
79+
}
80+
81+
public long undesiredAllocations() {
82+
return undesiredAllocations;
83+
}
84+
85+
private List<LongWithAttributes> getUnassignedShardsMetrics() {
86+
return getIfPublishing(unassignedShards);
87+
}
88+
89+
private List<LongWithAttributes> getTotalAllocationsMetrics() {
90+
return getIfPublishing(totalAllocations);
91+
}
92+
93+
private List<LongWithAttributes> getUndesiredAllocationsMetrics() {
94+
return getIfPublishing(undesiredAllocations);
95+
}
96+
97+
private List<LongWithAttributes> getIfPublishing(long value) {
98+
if (nodeIsMaster) {
99+
return List.of(new LongWithAttributes(value));
100+
}
101+
return List.of();
102+
}
103+
104+
private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
105+
if (nodeIsMaster) {
106+
var total = totalAllocations;
107+
var undesired = undesiredAllocations;
108+
return List.of(new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0));
109+
}
110+
return List.of();
111+
}
112+
113+
public void zeroAllMetrics() {
114+
unassignedShards = 0;
115+
totalAllocations = 0;
116+
undesiredAllocations = 0;
117+
}
118+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 4 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@
3030
import org.elasticsearch.gateway.PriorityComparator;
3131
import org.elasticsearch.index.IndexVersions;
3232
import org.elasticsearch.index.shard.ShardId;
33-
import org.elasticsearch.telemetry.metric.DoubleGauge;
34-
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
35-
import org.elasticsearch.telemetry.metric.LongGaugeMetric;
36-
import org.elasticsearch.telemetry.metric.MeterRegistry;
3733
import org.elasticsearch.threadpool.ThreadPool;
3834

3935
import java.util.Comparator;
@@ -73,23 +69,10 @@ public class DesiredBalanceReconciler {
7369
private double undesiredAllocationsLogThreshold;
7470
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
7571
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
72+
private final DesiredBalanceMetrics desiredBalanceMetrics;
7673

77-
// stats
78-
/**
79-
* Number of unassigned shards during last reconciliation
80-
*/
81-
protected final LongGaugeMetric unassignedShards;
82-
/**
83-
* Total number of assigned shards during last reconciliation
84-
*/
85-
protected final LongGaugeMetric totalAllocations;
86-
/**
87-
* Number of assigned shards during last reconciliation that are not allocated on desired node and need to be moved
88-
*/
89-
protected final LongGaugeMetric undesiredAllocations;
90-
private final DoubleGauge undesiredAllocationsRatio;
91-
92-
public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool, MeterRegistry meterRegistry) {
74+
public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool, DesiredBalanceMetrics desiredBalanceMetrics) {
75+
this.desiredBalanceMetrics = desiredBalanceMetrics;
9376
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
9477
threadPool.relativeTimeInMillisSupplier(),
9578
TimeValue.timeValueMinutes(5)
@@ -99,35 +82,6 @@ public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool thre
9982
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
10083
value -> this.undesiredAllocationsLogThreshold = value
10184
);
102-
103-
unassignedShards = LongGaugeMetric.create(
104-
meterRegistry,
105-
"es.allocator.desired_balance.shards.unassigned.current",
106-
"Current number of unassigned shards",
107-
"{shard}"
108-
);
109-
totalAllocations = LongGaugeMetric.create(
110-
meterRegistry,
111-
"es.allocator.desired_balance.shards.current",
112-
"Total number of shards",
113-
"{shard}"
114-
);
115-
undesiredAllocations = LongGaugeMetric.create(
116-
meterRegistry,
117-
"es.allocator.desired_balance.allocations.undesired.current",
118-
"Total number of shards allocated on undesired nodes excluding shutting down nodes",
119-
"{shard}"
120-
);
121-
undesiredAllocationsRatio = meterRegistry.registerDoubleGauge(
122-
"es.allocator.desired_balance.allocations.undesired.ratio",
123-
"Ratio of undesired allocations to shard count excluding shutting down nodes",
124-
"1",
125-
() -> {
126-
var total = totalAllocations.get();
127-
var undesired = undesiredAllocations.get();
128-
return new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0);
129-
}
130-
);
13185
}
13286

13387
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
@@ -578,9 +532,7 @@ private void balance() {
578532
}
579533
}
580534

581-
DesiredBalanceReconciler.this.unassignedShards.set(unassignedShards);
582-
DesiredBalanceReconciler.this.undesiredAllocations.set(undesiredAllocationsExcludingShuttingDownNodes);
583-
DesiredBalanceReconciler.this.totalAllocations.set(totalAllocations);
535+
desiredBalanceMetrics.updateMetrics(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes);
584536

585537
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
586538
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
6464
private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL;
6565
private volatile boolean resetCurrentDesiredBalance = false;
6666
private final Set<String> processedNodeShutdowns = new HashSet<>();
67+
private final DesiredBalanceMetrics desiredBalanceMetrics;
6768

6869
// stats
6970
protected final CounterMetric computationsSubmitted = new CounterMetric();
@@ -104,14 +105,15 @@ public DesiredBalanceShardsAllocator(
104105
DesiredBalanceReconcilerAction reconciler,
105106
TelemetryProvider telemetryProvider
106107
) {
108+
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
107109
this.delegateAllocator = delegateAllocator;
108110
this.threadPool = threadPool;
109111
this.reconciler = reconciler;
110112
this.desiredBalanceComputer = desiredBalanceComputer;
111113
this.desiredBalanceReconciler = new DesiredBalanceReconciler(
112114
clusterService.getClusterSettings(),
113115
threadPool,
114-
telemetryProvider.getMeterRegistry()
116+
desiredBalanceMetrics
115117
);
116118
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) {
117119

@@ -168,6 +170,10 @@ public String toString() {
168170
if (event.localNodeMaster() == false) {
169171
onNoLongerMaster();
170172
}
173+
// Only update on change, to minimise volatile writes
174+
if (event.localNodeMaster() != event.previousState().nodes().isLocalNodeElectedMaster()) {
175+
desiredBalanceMetrics.setNodeIsMaster(event.localNodeMaster());
176+
}
171177
});
172178
}
173179

@@ -306,9 +312,9 @@ public DesiredBalanceStats getStats() {
306312
computedShardMovements.sum(),
307313
cumulativeComputationTime.count(),
308314
cumulativeReconciliationTime.count(),
309-
desiredBalanceReconciler.unassignedShards.get(),
310-
desiredBalanceReconciler.totalAllocations.get(),
311-
desiredBalanceReconciler.undesiredAllocations.get()
315+
desiredBalanceMetrics.unassignedShards(),
316+
desiredBalanceMetrics.totalAllocations(),
317+
desiredBalanceMetrics.undesiredAllocations()
312318
);
313319
}
314320

@@ -318,10 +324,7 @@ private void onNoLongerMaster() {
318324
queue.completeAllAsNotMaster();
319325
pendingDesiredBalanceMoves.clear();
320326
desiredBalanceReconciler.clear();
321-
322-
desiredBalanceReconciler.unassignedShards.set(0);
323-
desiredBalanceReconciler.totalAllocations.set(0);
324-
desiredBalanceReconciler.undesiredAllocations.set(0);
327+
desiredBalanceMetrics.zeroAllMetrics();
325328
}
326329
}
327330

0 commit comments

Comments
 (0)