Skip to content

Smarter shard packing into node requests #129242

Open
@idegtiarenko

Description

@idegtiarenko

Today selectNodeRequests shard packing into node requests is mostly dictated by order of nodes in nodesIt.
We do not explicitly order nodes in that iterator upon constructing a data structure.

In general packing shards into node requests is a bin-packing problem and it is NP hard.
However we can improve existing approach (without having to iterate all possible allocations).

Assuming the following shard allocation:

shard1=[node1,node2]
shard2=[node2,node1]
shard3=[node3,node1]

We could introduce a node selection strategy (such as spread to as many or as few nodes)
and create following requests: node1->{shard1},node2->{shard2},node3->{shard3} or node1->{shard1,shard2,shard3} depending on configuration using something like: select(nodeIds, strategy, pendingRequests) where we pick the next node based on order (if no strategy is supplied) or based on its presence or absence in pendingRequests.

Spread strategy could speed up complex queries while the opposite might be good for CCS to minimize amount of requests to remote clusters.

See

private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
assert sendingLock.isHeldByCurrentThread();
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new LinkedHashMap<>();
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
while (shardsIt.hasNext()) {
ShardId shardId = shardsIt.next();
ShardFailure failure = shardFailures.get(shardId);
if (failure != null && failure.fatal) {
shardsIt.remove();
continue;
}
TargetShard shard = targetShards.getShard(shardId);
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
while (nodesIt.hasNext()) {
DiscoveryNode node = nodesIt.next();
List<ShardId> pendingRequest = nodeToShardIds.get(node);
if (pendingRequest != null) {
pendingRequest.add(shard.shardId);
nodesIt.remove();
shardsIt.remove();
break;
}
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
if (nodePermits.computeIfAbsent(node, n -> new Semaphore(1)).tryAcquire()) {
pendingRequest = new ArrayList<>();
pendingRequest.add(shard.shardId);
nodeToShardIds.put(node, pendingRequest);
nodesIt.remove();
shardsIt.remove();
break;
} else if (concurrentRequests != null) {
concurrentRequests.release();
}
}
}
}
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
for (var entry : nodeToShardIds.entrySet()) {
var node = entry.getKey();
var shardIds = entry.getValue();
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
for (ShardId shardId : shardIds) {
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
if (aliasFilter != null) {
aliasFilters.put(shardId.getIndex(), aliasFilter);
}
}
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
}
return nodeRequests;
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions