Skip to content

HDDS-13067. Container Balancer delete commands should not be sent with an expiration time in the past #8491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -69,8 +68,7 @@ public final class MoveManager implements
private final ContainerManager containerManager;
private final Clock clock;

private final Map<ContainerID,
Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> pendingMoves =
private final Map<ContainerID, MoveOperation> pendingMoves =
new ConcurrentHashMap<>();

public MoveManager(final ReplicationManager replicationManager,
Expand All @@ -83,8 +81,7 @@ public MoveManager(final ReplicationManager replicationManager,
/**
* get all the pending move operations.
*/
public Map<ContainerID,
Pair<CompletableFuture<MoveResult>, MoveDataNodePair>> getPendingMove() {
public Map<ContainerID, MoveOperation> getPendingMove() {
return pendingMoves;
}

Expand All @@ -98,10 +95,9 @@ void resetState() {
* @param cid Container id to which the move option is finished
*/
private void completeMove(final ContainerID cid, final MoveResult mr) {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> move =
pendingMoves.remove(cid);
MoveOperation move = pendingMoves.remove(cid);
if (move != null) {
CompletableFuture<MoveResult> future = move.getLeft();
CompletableFuture<MoveResult> future = move.getResult();
if (future != null && mr != null) {
// when we know the future is null, and we want to complete
// the move , then we set mr to null.
Expand All @@ -125,9 +121,8 @@ private void completeMove(final ContainerID cid, final MoveResult mr) {
private void startMove(
final ContainerInfo containerInfo, final DatanodeDetails src,
final DatanodeDetails tgt, final CompletableFuture<MoveResult> ret) {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> move =
pendingMoves.putIfAbsent(containerInfo.containerID(),
Pair.of(ret, new MoveDataNodePair(src, tgt)));
MoveOperation move = pendingMoves.putIfAbsent(containerInfo.containerID(),
new MoveOperation(ret, new MoveDataNodePair(src, tgt)));
if (move == null) {
// A move for this container did not exist, so send a replicate command
try {
Expand Down Expand Up @@ -264,10 +259,9 @@ CompletableFuture<MoveResult> move(
*/
private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
ContainerID containerID) {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> pair =
pendingMoves.get(containerID);
MoveOperation pair = pendingMoves.get(containerID);
if (pair != null) {
MoveDataNodePair mdnp = pair.getRight();
MoveDataNodePair mdnp = pair.getMoveDataNodePair();
PendingOpType opType = containerReplicaOp.getOpType();
DatanodeDetails dn = containerReplicaOp.getTarget();
if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
Expand All @@ -278,7 +272,7 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
LOG.warn("Failed to handle successful Add for container {} being " +
"moved from source {} to target {}.", containerID,
mdnp.getSrc(), mdnp.getTgt(), e);
pair.getLeft().complete(MoveResult.FAIL_UNEXPECTED_ERROR);
pair.getResult().complete(MoveResult.FAIL_UNEXPECTED_ERROR);
}
} else if (
opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
Expand All @@ -295,10 +289,9 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
*/
private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,
ContainerID containerID) {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> pair =
pendingMoves.get(containerID);
MoveOperation pair = pendingMoves.get(containerID);
if (pair != null) {
MoveDataNodePair mdnp = pair.getRight();
MoveDataNodePair mdnp = pair.getMoveDataNodePair();
PendingOpType opType = containerReplicaOp.getOpType();
DatanodeDetails dn = containerReplicaOp.getTarget();
if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) {
Expand All @@ -314,12 +307,11 @@ private void handleSuccessfulAdd(final ContainerID cid)
throws ContainerNotFoundException,
ContainerReplicaNotFoundException, NodeNotFoundException,
NotLeaderException {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> pair =
pendingMoves.get(cid);
if (pair == null) {
MoveOperation moveOp = pendingMoves.get(cid);
if (moveOp == null) {
return;
}
MoveDataNodePair movePair = pair.getRight();
MoveDataNodePair movePair = moveOp.getMoveDataNodePair();
final DatanodeDetails src = movePair.getSrc();
final DatanodeDetails tgt = movePair.getTgt();
LOG.debug("Handling successful addition of Container {} from" +
Expand Down Expand Up @@ -356,7 +348,7 @@ private void handleSuccessfulAdd(final ContainerID cid)

if (healthResult.getHealthState() ==
ContainerHealthResult.HealthState.HEALTHY) {
sendDeleteCommand(containerInfo, src);
sendDeleteCommand(containerInfo, src, moveOp.getReplicateScheduledTime());
} else {
LOG.info("Cannot remove source replica as the container health would " +
"be {}", healthResult.getHealthState());
Expand Down Expand Up @@ -403,6 +395,7 @@ private void sendReplicateCommand(
long now = clock.millis();
replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo,
replicaIndex, src, tgt, now + replicationTimeout);
pendingMoves.get(containerInfo.containerID()).setReplicateScheduledTime(now);
}

/**
Expand All @@ -411,17 +404,18 @@ private void sendReplicateCommand(
*
* @param containerInfo Container to be deleted
* @param datanode The datanode on which the replica should be deleted
* @param scheduledTime The time at which the replicate command for the container was scheduled
*/
private void sendDeleteCommand(
final ContainerInfo containerInfo, final DatanodeDetails datanode)
final ContainerInfo containerInfo, final DatanodeDetails datanode,
long scheduledTime)
throws ContainerReplicaNotFoundException, ContainerNotFoundException,
NotLeaderException {
int replicaIndex = getContainerReplicaIndex(
containerInfo.containerID(), datanode);
long deleteTimeout = moveTimeout - replicationTimeout;
long now = clock.millis();
replicationManager.sendDeleteCommand(
containerInfo, replicaIndex, datanode, true, now + deleteTimeout);
containerInfo, replicaIndex, datanode, true, scheduledTime + deleteTimeout);
}

private int getContainerReplicaIndex(
Expand Down Expand Up @@ -454,6 +448,45 @@ void setReplicationTimeout(long replicationTimeout) {
this.replicationTimeout = replicationTimeout;
}

/**
* All details about a move operation.
*/
private static class MoveOperation {
private CompletableFuture<MoveResult> result;
private MoveDataNodePair moveDataNodePair;
private long replicateScheduledTime;

MoveOperation(CompletableFuture<MoveResult> result, MoveDataNodePair srcTgt) {
this.result = result;
this.moveDataNodePair = srcTgt;
}

public CompletableFuture<MoveResult> getResult() {
return result;
}

public MoveDataNodePair getMoveDataNodePair() {
return moveDataNodePair;
}

public long getReplicateScheduledTime() {
return replicateScheduledTime;
}

public void setResult(
CompletableFuture<MoveResult> result) {
this.result = result;
}

public void setMoveDataNodePair(MoveDataNodePair srcTgt) {
this.moveDataNodePair = srcTgt;
}

public void setReplicateScheduledTime(long replicatescheduledTime) {
this.replicateScheduledTime = replicatescheduledTime;
}
}

/**
* Various move return results.
*/
Expand Down